diff --git a/apps/server/src/api/groups/root.ts b/apps/server/src/api/groups/root.ts index 391a6d3..f4d51af 100644 --- a/apps/server/src/api/groups/root.ts +++ b/apps/server/src/api/groups/root.ts @@ -6,6 +6,8 @@ import {TaskNotFoundError} from "@ecrawler/api/schemas/Task.ts" import {HttpApiBuilder} from "@effect/platform" import {Array, Effect, Layer, Schedule, Duration, pipe} from "effect" import {and, arrayContains, asc, eq, gte, lt, SQL} from "drizzle-orm" + +const LEASE_DURATION_MS = Duration.toMillis(Duration.minutes(5)) import * as schema from "../../database/schemas" import {Database} from "../../database/index.ts" @@ -190,7 +192,7 @@ const dispatcherGroup = Layer.unwrapEffect( UnknownError.mapError ) ) - .handle("nextTask", ({urlParams}) => + .handle("nextTask", ({urlParams, payload}) => pipe( db.transaction(tx => Effect.gen(function* () { @@ -224,7 +226,11 @@ const dispatcherGroup = Layer.unwrapEffect( const [updated] = yield* tx .update(schema.tasks) - .set({status: "processing"}) + .set({ + status: "processing", + worker_id: payload.workerId, + lease_expires_at: new Date(Date.now() + LEASE_DURATION_MS) + }) .where(eq(schema.tasks.id, task.id)) .returning() @@ -242,6 +248,28 @@ const dispatcherGroup = Layer.unwrapEffect( }) ) ) + .handle("renewLease", ({path, payload}) => + pipe( + db + .update(schema.tasks) + .set({lease_expires_at: new Date(Date.now() + LEASE_DURATION_MS)}) + .where( + and( + eq(schema.tasks.id, path.id), + eq(schema.tasks.worker_id, payload.workerId), + eq(schema.tasks.status, "processing") + ) + ) + .returning({id: schema.tasks.id}), + UnknownError.mapError, + Effect.flatMap(rows => + Array.head(rows).pipe( + Effect.mapError(() => new TaskNotFoundError()), + Effect.asVoid + ) + ) + ) + ) ) }) ) diff --git a/apps/server/src/database/migrations/20260323000000_task_lease/migration.sql b/apps/server/src/database/migrations/20260323000000_task_lease/migration.sql new file mode 100644 index 0000000..789be24 --- /dev/null +++ b/apps/server/src/database/migrations/20260323000000_task_lease/migration.sql @@ -0,0 +1,2 @@ +ALTER TABLE "tasks" ADD COLUMN "worker_id" text;--> statement-breakpoint +ALTER TABLE "tasks" ADD COLUMN "lease_expires_at" timestamp; diff --git a/apps/server/src/database/schemas/tasks.ts b/apps/server/src/database/schemas/tasks.ts index 7a7b527..12fa1a4 100644 --- a/apps/server/src/database/schemas/tasks.ts +++ b/apps/server/src/database/schemas/tasks.ts @@ -23,6 +23,9 @@ export const tasks = pgTable("tasks", { link: text("link").notNull(), meta: jsonb("meta"), + worker_id: text("worker_id"), + lease_expires_at: timestamp("lease_expires_at"), + created_at: timestamp("created_at").notNull().defaultNow(), updated_at: timestamp("updated_at").notNull().defaultNow() }) diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 37e1b4c..e3d5ecd 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -2,9 +2,10 @@ import "dotenv/config" import {Effect, Layer} from "effect" import Api from "./api/index.ts" import {Database} from "./database/index.ts" +import {LeaseReaper} from "./services/LeaseReaper.ts" import {NodeContext, NodeRuntime} from "@effect/platform-node" -const ApiLayer = Api.pipe( +const ApiLayer = Layer.mergeAll(Api, LeaseReaper).pipe( Layer.provide(Database.layer), Layer.provide(NodeContext.layer) ) diff --git a/apps/server/src/services/LeaseReaper.ts b/apps/server/src/services/LeaseReaper.ts new file mode 100644 index 0000000..1dc1c6d --- /dev/null +++ b/apps/server/src/services/LeaseReaper.ts @@ -0,0 +1,37 @@ +import {Config, Duration, Effect, Layer, Schedule} from "effect" +import {and, eq, isNotNull, lt} from "drizzle-orm" +import {Database} from "../database/index.ts" +import * as schema from "../database/schemas/index.ts" + +const reapExpiredLeases = Effect.gen(function* () { + const db = yield* Database + const rows = yield* db + .update(schema.tasks) + .set({status: "pending", worker_id: null, lease_expires_at: null}) + .where( + and( + eq(schema.tasks.status, "processing"), + isNotNull(schema.tasks.lease_expires_at), + lt(schema.tasks.lease_expires_at, new Date()) + ) + ) + .returning({id: schema.tasks.id}) + + if (rows.length > 0) { + yield* Effect.log(`LeaseReaper: reclaimed ${rows.length} expired task(s)`) + } +}) + +export const LeaseReaper = Layer.scopedDiscard( + Effect.gen(function* () { + const intervalSeconds = yield* Config.integer( + "LEASE_REAPER_INTERVAL_SECONDS" + ).pipe(Config.withDefault(60)) + yield* Effect.forkScoped( + Effect.repeat( + reapExpiredLeases, + Schedule.spaced(Duration.seconds(intervalSeconds)) + ) + ) + }) +) diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index f255b1a..70d84d9 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -4,7 +4,7 @@ import {Scaler} from "./services/Scaler.ts" import {Client} from "./services/Client.ts" export const program = Effect.gen(function* () { - const {queue, submit} = yield* Client + const {queue, submit, renewLease} = yield* Client const {init, next} = yield* Scaler const extract = yield* Extractor @@ -15,14 +15,19 @@ export const program = Effect.gen(function* () { const results = yield* Queue.takeUpTo(queue, _concurrency).pipe( Effect.map( Chunk.map(task => - extract.extract(task).pipe( - Effect.timed, - Effect.tap(([duration, result]) => - next({task, result, duration}).pipe( - Effect.flatMap(target => Ref.set(concurrency, target)) + Effect.scoped( + Effect.gen(function* () { + yield* renewLease(task.id).pipe(Effect.forkScoped) + return yield* extract.extract(task).pipe( + Effect.timed, + Effect.tap(([duration, result]) => + next({task, result, duration}).pipe( + Effect.flatMap(target => Ref.set(concurrency, target)) + ) + ), + Effect.map(Tuple.getSecond) ) - ), - Effect.map(Tuple.getSecond) + }) ) ) ), diff --git a/apps/worker/src/services/Client.ts b/apps/worker/src/services/Client.ts index 15c41e7..631ba6d 100644 --- a/apps/worker/src/services/Client.ts +++ b/apps/worker/src/services/Client.ts @@ -13,6 +13,7 @@ export class Client extends Effect.Tag("Client")< readonly submit: ( input: ExtractorResult ) => Effect.Effect + readonly renewLease: (taskId: string) => Effect.Effect } >() { static readonly Default = Layer.scoped( @@ -42,12 +43,13 @@ export class Client extends Effect.Tag("Client")< const taskQueue = yield* Queue.unbounded() - const pollTimeout = 30 + const pollTimeout = config.pollTimeout const tags = config.tags + const workerId = config.id yield* Effect.gen(function* () { const task = yield* dispatcherClient.dispatcher.nextTask({ - payload: {}, + payload: {workerId}, urlParams: {tags, timeout: pollTimeout} }) yield* Queue.offer(taskQueue, task) @@ -61,6 +63,16 @@ export class Client extends Effect.Tag("Client")< return Client.of({ queue: taskQueue, + renewLease: taskId => + Effect.schedule( + dispatcherClient.dispatcher + .renewLease({ + path: {id: taskId}, + payload: {workerId} + }) + .pipe(Effect.ignore), + Schedule.spaced(config.renewInterval) + ), submit: result => Effect.gen(function* () { const firstLink = yield* Iterable.head(result.links) diff --git a/apps/worker/src/services/Scaler.ts b/apps/worker/src/services/Scaler.ts index 6b2435f..f793bfb 100644 --- a/apps/worker/src/services/Scaler.ts +++ b/apps/worker/src/services/Scaler.ts @@ -1,5 +1,6 @@ import { Chunk, + Config, Context, Duration, Effect, @@ -31,14 +32,24 @@ export class Scaler extends Context.Tag("Scaler")< readonly capacity: number readonly alpha: number readonly slack: number + readonly maintenanceInterval: Duration.Duration } >() { - static readonly Default = Layer.succeed(Scaler.EMAConfig, { - window: 10, - capacity: Infinity, - alpha: 0.3, - slack: 1.5 - }) + static readonly Default = Layer.effect( + Scaler.EMAConfig, + Config.all({ + window: Config.integer("EMA_WINDOW").pipe(Config.withDefault(10)), + capacity: Config.integer("EMA_CAPACITY").pipe( + Config.withDefault(0), + Config.map(n => (n === 0 ? Infinity : n)) + ), + alpha: Config.number("EMA_ALPHA").pipe(Config.withDefault(0.3)), + slack: Config.number("EMA_SLACK").pipe(Config.withDefault(1.5)), + maintenanceInterval: Config.integer( + "EMA_MAINTENANCE_INTERVAL_SECONDS" + ).pipe(Config.withDefault(300), Config.map(Duration.seconds)) + }).pipe(Config.map(Scaler.EMAConfig.of)) + ) } static readonly EMA = Layer.scoped( @@ -59,7 +70,7 @@ export class Scaler extends Context.Tag("Scaler")< const maintenance = Effect.gen(function* () { yield* down() yield* Ref.set(min, yield* Ref.get(ema)) - }).pipe(Effect.repeat(Schedule.spaced(Duration.minutes(5)))) + }).pipe(Effect.repeat(Schedule.spaced(config.maintenanceInterval))) yield* maintenance.pipe(Effect.forkScoped) diff --git a/apps/worker/src/services/WorkerConfig.ts b/apps/worker/src/services/WorkerConfig.ts index cc2a17e..065ba34 100644 --- a/apps/worker/src/services/WorkerConfig.ts +++ b/apps/worker/src/services/WorkerConfig.ts @@ -1,4 +1,4 @@ -import {Context, Config, Layer} from "effect" +import {Config, Context, Duration, Layer} from "effect" import {v7} from "uuid" export class WorkerConfig extends Context.Tag("WorkerConfig")< @@ -10,6 +10,9 @@ export class WorkerConfig extends Context.Tag("WorkerConfig")< readonly baseUrl: string readonly secretKey: string + + readonly pollTimeout: number + readonly renewInterval: Duration.Duration } >() { static readonly Default = Layer.effect( @@ -19,7 +22,12 @@ export class WorkerConfig extends Context.Tag("WorkerConfig")< name: Config.string("NAME").pipe(Config.withDefault("worker")), tags: Config.array(Config.string(), "TAGS").pipe(Config.withDefault([])), baseUrl: Config.string("BASE_URL"), - secretKey: Config.string("SECRET_KEY") + secretKey: Config.string("SECRET_KEY"), + pollTimeout: Config.integer("POLL_TIMEOUT").pipe(Config.withDefault(30)), + renewInterval: Config.integer("RENEW_INTERVAL_SECONDS").pipe( + Config.withDefault(120), + Config.map(Duration.seconds) + ) }).pipe(Config.map(config => WorkerConfig.of(config))) ) } diff --git a/libs/api/src/dispatcher/groups/root.ts b/libs/api/src/dispatcher/groups/root.ts index c797235..bb18cae 100644 --- a/libs/api/src/dispatcher/groups/root.ts +++ b/libs/api/src/dispatcher/groups/root.ts @@ -11,7 +11,8 @@ const { CreatePayload, UpdatePayload, NextPayload, - NextQueryParams + NextQueryParams, + RenewLeasePayload } = TaskApi export default HttpApiGroup.make("dispatcher") @@ -78,3 +79,15 @@ export default HttpApiGroup.make("dispatcher") "Fetches the next oldest available task matching the provided query filters.\n\n获取匹配提供查询过滤器的下一个最旧可用任务。" ) ) + .add( + HttpApiEndpoint.patch("renewLease")`/tasks/:id/renew` + .setPath(Schema.Struct({id: Schema.UUID})) + .setPayload(RenewLeasePayload) + .addSuccess(Schema.Void) + .addError(TaskNotFoundError) + .annotate(OpenApi.Summary, "Renew task lease") + .annotate( + OpenApi.Description, + "Renews the lease for a task currently being processed by a worker. Must be called periodically to prevent the task from being reclaimed.\n\n为 Worker 正在处理的任务续约。必须定期调用,否则任务将被回收并重新排队。" + ) + ) diff --git a/libs/api/src/schemas/Task.ts b/libs/api/src/schemas/Task.ts index 955c816..3aff1dc 100644 --- a/libs/api/src/schemas/Task.ts +++ b/libs/api/src/schemas/Task.ts @@ -69,11 +69,24 @@ export const UpdatePayload = Schema.Struct({ }) .pipe(Schema.partial) -export const NextPayload = Schema.Struct({}).annotations({ +export const NextPayload = Schema.Struct({ + workerId: Schema.String.annotations({ + description: "Unique identifier of the worker claiming this task\n\n领取任务的 Worker 唯一标识" + }) +}).annotations({ identifier: "NextTaskPayload", description: "Payload for getting the next task\n\n获取下一个任务的载荷" }) +export const RenewLeasePayload = Schema.Struct({ + workerId: Schema.String.annotations({ + description: "Worker ID that holds the lease\n\n持有租约的 Worker ID" + }) +}).annotations({ + identifier: "RenewLeasePayload", + description: "Payload for renewing a task lease\n\n续约任务租约的载荷" +}) + export const NextQueryParams = QueryParams.pipe( Schema.omit("limit"), Schema.extend( @@ -96,5 +109,6 @@ export const TaskApi = { CreatePayload, UpdatePayload, NextPayload, - NextQueryParams + NextQueryParams, + RenewLeasePayload }