Files
karakeep-mirror/packages/shared/queueing.ts
Mohamed Bassem ddd4b578cd fix: preserve failure count when rescheduling rate limited domains (#2303)
* fix: preserve retry count when rate-limited jobs are rescheduled

Previously, when a domain was rate-limited in the crawler worker,
the job would be re-enqueued as a new job, which reset the failure
count. This meant rate-limited jobs could retry indefinitely without
respecting the max retry limit.

This commit introduces a RateLimitRetryError exception that signals
the queue system to retry the job after a delay without counting it
as a failed attempt. The job is retried within the same invocation,
preserving the original retry count.

Changes:
- Add RateLimitRetryError class to shared/queueing.ts
- Update crawler worker to throw RateLimitRetryError instead of re-enqueuing
- Update Restate queue service to handle RateLimitRetryError with delay
- Update Liteque queue wrapper to handle RateLimitRetryError with delay

This ensures that rate-limited jobs respect the configured retry limits
while still allowing for delayed retries when domains are rate-limited.

* refactor: use liteque's native RetryAfterError for rate limiting

Instead of manually handling retries in a while loop, translate
RateLimitRetryError to liteque's native RetryAfterError. This is
cleaner and lets liteque handle the retry logic using its built-in
mechanism.

* test: add tests for RateLimitRetryError handling in restate queue

Added comprehensive tests to verify that:
1. RateLimitRetryError delays retry appropriately
2. Rate-limited retries don't count against the retry limit
3. Jobs can be rate-limited more times than the retry limit
4. Regular errors still respect the retry limit

These tests ensure the queue correctly handles rate limiting
without exhausting retry attempts.

* lint & format

* fix: prevent onError callback for RateLimitRetryError

Fixed two issues with RateLimitRetryError handling in restate queue:

1. RateLimitRetryError now doesn't trigger the onError callback since
   it's not a real error - it's an expected rate limiting behavior

2. Check for RateLimitRetryError in runWorkerLogic before calling onError,
   ensuring the instanceof check works correctly before the error gets
   further wrapped by restate

Updated tests to verify onError is not called for rate limit retries.

* fix: catch RateLimitRetryError before ctx.run wraps it

Changed approach to use a discriminated union instead of throwing
and catching RateLimitRetryError. Now we catch the error inside the
ctx.run callback before it gets wrapped by restate's TerminalError,
and return a RunResult type that indicates success, rate limit, or error.

This fixes the issue where instanceof checks would fail because
ctx.run wraps all errors in TerminalError.

* more fixes

* rename error name

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-12-25 12:46:45 +00:00

102 lines
2.2 KiB
TypeScript

import { ZodType } from "zod";
import { PluginManager, PluginType } from "./plugins";
/**
* Special error that indicates a job should be retried after a delay
* without counting against the retry attempts limit.
* Useful for handling rate limiting scenarios.
*/
export class QueueRetryAfterError extends Error {
constructor(
message: string,
public readonly delayMs: number,
) {
super(message);
this.name = "QueueRetryAfterError";
}
}
export interface EnqueueOptions {
idempotencyKey?: string;
priority?: number;
delayMs?: number;
groupId?: string;
}
export interface QueueOptions {
defaultJobArgs: {
numRetries: number;
};
keepFailedJobs: boolean;
}
export interface DequeuedJob<T> {
id: string;
data: T;
priority: number;
runNumber: number;
abortSignal: AbortSignal;
}
export interface DequeuedJobError<T> {
id: string;
data?: T;
priority: number;
error: Error;
runNumber: number;
numRetriesLeft: number;
}
export interface RunnerFuncs<T, R = void> {
run: (job: DequeuedJob<T>) => Promise<R>;
onComplete?: (job: DequeuedJob<T>, result: R) => Promise<void>;
onError?: (job: DequeuedJobError<T>) => Promise<void>;
}
export interface RunnerOptions<T> {
pollIntervalMs?: number;
timeoutSecs: number;
concurrency: number;
validator?: ZodType<T>;
}
export interface Queue<T> {
opts: QueueOptions;
name(): string;
enqueue(payload: T, options?: EnqueueOptions): Promise<string | undefined>;
stats(): Promise<{
pending: number;
pending_retry: number;
running: number;
failed: number;
}>;
cancelAllNonRunning?(): Promise<number>;
}
export interface Runner<_T> {
run(): Promise<void>;
stop(): void;
runUntilEmpty?(): Promise<void>;
}
export interface QueueClient {
prepare(): Promise<void>;
start(): Promise<void>;
createQueue<T>(name: string, options: QueueOptions): Queue<T>;
createRunner<T, R = void>(
queue: Queue<T>,
funcs: RunnerFuncs<T, R>,
opts: RunnerOptions<T>,
): Runner<T>;
shutdown?(): Promise<void>;
}
export async function getQueueClient(): Promise<QueueClient> {
const client = await PluginManager.getClient(PluginType.Queue);
if (!client) {
throw new Error("Failed to get queue client");
}
return client;
}