Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions apps/server/src/api/groups/root.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {TaskNotFoundError} from "@ecrawler/api/schemas/Task.ts"
import {HttpApiBuilder} from "@effect/platform"
import {Array, Effect, Layer, Schedule, Duration, pipe} from "effect"
import {and, arrayContains, asc, eq, gte, lt, SQL} from "drizzle-orm"

const LEASE_DURATION_MS = Duration.toMillis(Duration.minutes(5))
import * as schema from "../../database/schemas"
import {Database} from "../../database/index.ts"

Expand Down Expand Up @@ -190,7 +192,7 @@ const dispatcherGroup = Layer.unwrapEffect(
UnknownError.mapError
)
)
.handle("nextTask", ({urlParams}) =>
.handle("nextTask", ({urlParams, payload}) =>
pipe(
db.transaction(tx =>
Effect.gen(function* () {
Expand Down Expand Up @@ -224,7 +226,11 @@ const dispatcherGroup = Layer.unwrapEffect(

const [updated] = yield* tx
.update(schema.tasks)
.set({status: "processing"})
.set({
status: "processing",
worker_id: payload.workerId,
lease_expires_at: new Date(Date.now() + LEASE_DURATION_MS)
})
.where(eq(schema.tasks.id, task.id))
.returning()

Expand All @@ -242,6 +248,28 @@ const dispatcherGroup = Layer.unwrapEffect(
})
)
)
.handle("renewLease", ({path, payload}) =>
pipe(
db
.update(schema.tasks)
.set({lease_expires_at: new Date(Date.now() + LEASE_DURATION_MS)})
.where(
and(
eq(schema.tasks.id, path.id),
eq(schema.tasks.worker_id, payload.workerId),
eq(schema.tasks.status, "processing")
)
)
.returning({id: schema.tasks.id}),
UnknownError.mapError,
Effect.flatMap(rows =>
Array.head(rows).pipe(
Effect.mapError(() => new TaskNotFoundError()),
Effect.asVoid
)
)
)
)
)
})
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE "tasks" ADD COLUMN "worker_id" text;--> statement-breakpoint
ALTER TABLE "tasks" ADD COLUMN "lease_expires_at" timestamp;
3 changes: 3 additions & 0 deletions apps/server/src/database/schemas/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ export const tasks = pgTable("tasks", {
link: text("link").notNull(),
meta: jsonb("meta"),

worker_id: text("worker_id"),
lease_expires_at: timestamp("lease_expires_at"),

created_at: timestamp("created_at").notNull().defaultNow(),
updated_at: timestamp("updated_at").notNull().defaultNow()
})
3 changes: 2 additions & 1 deletion apps/server/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import "dotenv/config"
import {Effect, Layer} from "effect"
import Api from "./api/index.ts"
import {Database} from "./database/index.ts"
import {LeaseReaper} from "./services/LeaseReaper.ts"
import {NodeContext, NodeRuntime} from "@effect/platform-node"

const ApiLayer = Api.pipe(
const ApiLayer = Layer.mergeAll(Api, LeaseReaper).pipe(
Layer.provide(Database.layer),
Layer.provide(NodeContext.layer)
)
Expand Down
37 changes: 37 additions & 0 deletions apps/server/src/services/LeaseReaper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import {Config, Duration, Effect, Layer, Schedule} from "effect"
import {and, eq, isNotNull, lt} from "drizzle-orm"
import {Database} from "../database/index.ts"
import * as schema from "../database/schemas/index.ts"

const reapExpiredLeases = Effect.gen(function* () {
const db = yield* Database
const rows = yield* db
.update(schema.tasks)
.set({status: "pending", worker_id: null, lease_expires_at: null})
.where(
and(
eq(schema.tasks.status, "processing"),
isNotNull(schema.tasks.lease_expires_at),
lt(schema.tasks.lease_expires_at, new Date())
)
)
.returning({id: schema.tasks.id})

if (rows.length > 0) {
yield* Effect.log(`LeaseReaper: reclaimed ${rows.length} expired task(s)`)
}
})

export const LeaseReaper = Layer.scopedDiscard(
Effect.gen(function* () {
const intervalSeconds = yield* Config.integer(
"LEASE_REAPER_INTERVAL_SECONDS"
).pipe(Config.withDefault(60))
yield* Effect.forkScoped(
Effect.repeat(
reapExpiredLeases,
Schedule.spaced(Duration.seconds(intervalSeconds))
)
)
})
)
21 changes: 13 additions & 8 deletions apps/worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {Scaler} from "./services/Scaler.ts"
import {Client} from "./services/Client.ts"

export const program = Effect.gen(function* () {
const {queue, submit} = yield* Client
const {queue, submit, renewLease} = yield* Client
const {init, next} = yield* Scaler
const extract = yield* Extractor

Expand All @@ -15,14 +15,19 @@ export const program = Effect.gen(function* () {
const results = yield* Queue.takeUpTo(queue, _concurrency).pipe(
Effect.map(
Chunk.map(task =>
extract.extract(task).pipe(
Effect.timed,
Effect.tap(([duration, result]) =>
next({task, result, duration}).pipe(
Effect.flatMap(target => Ref.set(concurrency, target))
Effect.scoped(
Effect.gen(function* () {
yield* renewLease(task.id).pipe(Effect.forkScoped)
return yield* extract.extract(task).pipe(
Effect.timed,
Effect.tap(([duration, result]) =>
next({task, result, duration}).pipe(
Effect.flatMap(target => Ref.set(concurrency, target))
)
),
Effect.map(Tuple.getSecond)
)
),
Effect.map(Tuple.getSecond)
})
)
)
),
Expand Down
16 changes: 14 additions & 2 deletions apps/worker/src/services/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export class Client extends Effect.Tag("Client")<
readonly submit: (
input: ExtractorResult
) => Effect.Effect<void, never, never>
readonly renewLease: (taskId: string) => Effect.Effect<void, never, never>
}
>() {
static readonly Default = Layer.scoped(
Expand Down Expand Up @@ -42,12 +43,13 @@ export class Client extends Effect.Tag("Client")<

const taskQueue = yield* Queue.unbounded<Task.Task>()

const pollTimeout = 30
const pollTimeout = config.pollTimeout
const tags = config.tags
const workerId = config.id

yield* Effect.gen(function* () {
const task = yield* dispatcherClient.dispatcher.nextTask({
payload: {},
payload: {workerId},
urlParams: {tags, timeout: pollTimeout}
})
yield* Queue.offer(taskQueue, task)
Expand All @@ -61,6 +63,16 @@ export class Client extends Effect.Tag("Client")<

return Client.of({
queue: taskQueue,
renewLease: taskId =>
Effect.schedule(
dispatcherClient.dispatcher
.renewLease({
path: {id: taskId},
payload: {workerId}
})
.pipe(Effect.ignore),
Schedule.spaced(config.renewInterval)
),
submit: result =>
Effect.gen(function* () {
const firstLink = yield* Iterable.head(result.links)
Expand Down
25 changes: 18 additions & 7 deletions apps/worker/src/services/Scaler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
Chunk,
Config,
Context,
Duration,
Effect,
Expand Down Expand Up @@ -31,14 +32,24 @@ export class Scaler extends Context.Tag("Scaler")<
readonly capacity: number
readonly alpha: number
readonly slack: number
readonly maintenanceInterval: Duration.Duration
}
>() {
static readonly Default = Layer.succeed(Scaler.EMAConfig, {
window: 10,
capacity: Infinity,
alpha: 0.3,
slack: 1.5
})
static readonly Default = Layer.effect(
Scaler.EMAConfig,
Config.all({
window: Config.integer("EMA_WINDOW").pipe(Config.withDefault(10)),
capacity: Config.integer("EMA_CAPACITY").pipe(
Config.withDefault(0),
Config.map(n => (n === 0 ? Infinity : n))
),
alpha: Config.number("EMA_ALPHA").pipe(Config.withDefault(0.3)),
slack: Config.number("EMA_SLACK").pipe(Config.withDefault(1.5)),
maintenanceInterval: Config.integer(
"EMA_MAINTENANCE_INTERVAL_SECONDS"
).pipe(Config.withDefault(300), Config.map(Duration.seconds))
}).pipe(Config.map(Scaler.EMAConfig.of))
)
}

static readonly EMA = Layer.scoped(
Expand All @@ -59,7 +70,7 @@ export class Scaler extends Context.Tag("Scaler")<
const maintenance = Effect.gen(function* () {
yield* down()
yield* Ref.set(min, yield* Ref.get(ema))
}).pipe(Effect.repeat(Schedule.spaced(Duration.minutes(5))))
}).pipe(Effect.repeat(Schedule.spaced(config.maintenanceInterval)))

yield* maintenance.pipe(Effect.forkScoped)

Expand Down
12 changes: 10 additions & 2 deletions apps/worker/src/services/WorkerConfig.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Context, Config, Layer} from "effect"
import {Config, Context, Duration, Layer} from "effect"
import {v7} from "uuid"

export class WorkerConfig extends Context.Tag("WorkerConfig")<
Expand All @@ -10,6 +10,9 @@ export class WorkerConfig extends Context.Tag("WorkerConfig")<

readonly baseUrl: string
readonly secretKey: string

readonly pollTimeout: number
readonly renewInterval: Duration.Duration
}
>() {
static readonly Default = Layer.effect(
Expand All @@ -19,7 +22,12 @@ export class WorkerConfig extends Context.Tag("WorkerConfig")<
name: Config.string("NAME").pipe(Config.withDefault("worker")),
tags: Config.array(Config.string(), "TAGS").pipe(Config.withDefault([])),
baseUrl: Config.string("BASE_URL"),
secretKey: Config.string("SECRET_KEY")
secretKey: Config.string("SECRET_KEY"),
pollTimeout: Config.integer("POLL_TIMEOUT").pipe(Config.withDefault(30)),
renewInterval: Config.integer("RENEW_INTERVAL_SECONDS").pipe(
Config.withDefault(120),
Config.map(Duration.seconds)
)
}).pipe(Config.map(config => WorkerConfig.of(config)))
)
}
15 changes: 14 additions & 1 deletion libs/api/src/dispatcher/groups/root.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ const {
CreatePayload,
UpdatePayload,
NextPayload,
NextQueryParams
NextQueryParams,
RenewLeasePayload
} = TaskApi

export default HttpApiGroup.make("dispatcher")
Expand Down Expand Up @@ -78,3 +79,15 @@ export default HttpApiGroup.make("dispatcher")
"Fetches the next oldest available task matching the provided query filters.\n\n获取匹配提供查询过滤器的下一个最旧可用任务。"
)
)
.add(
HttpApiEndpoint.patch("renewLease")`/tasks/:id/renew`
.setPath(Schema.Struct({id: Schema.UUID}))
.setPayload(RenewLeasePayload)
.addSuccess(Schema.Void)
.addError(TaskNotFoundError)
.annotate(OpenApi.Summary, "Renew task lease")
.annotate(
OpenApi.Description,
"Renews the lease for a task currently being processed by a worker. Must be called periodically to prevent the task from being reclaimed.\n\n为 Worker 正在处理的任务续约。必须定期调用,否则任务将被回收并重新排队。"
)
)
18 changes: 16 additions & 2 deletions libs/api/src/schemas/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,24 @@ export const UpdatePayload = Schema.Struct({
})
.pipe(Schema.partial)

export const NextPayload = Schema.Struct({}).annotations({
export const NextPayload = Schema.Struct({
workerId: Schema.String.annotations({
description: "Unique identifier of the worker claiming this task\n\n领取任务的 Worker 唯一标识"
})
}).annotations({
identifier: "NextTaskPayload",
description: "Payload for getting the next task\n\n获取下一个任务的载荷"
})

export const RenewLeasePayload = Schema.Struct({
workerId: Schema.String.annotations({
description: "Worker ID that holds the lease\n\n持有租约的 Worker ID"
})
}).annotations({
identifier: "RenewLeasePayload",
description: "Payload for renewing a task lease\n\n续约任务租约的载荷"
})

export const NextQueryParams = QueryParams.pipe(
Schema.omit("limit"),
Schema.extend(
Expand All @@ -96,5 +109,6 @@ export const TaskApi = {
CreatePayload,
UpdatePayload,
NextPayload,
NextQueryParams
NextQueryParams,
RenewLeasePayload
}