Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
770 changes: 770 additions & 0 deletions docs/superpowers/plans/2026-04-07-parallel-game-extraction.md

Large diffs are not rendered by default.

148 changes: 148 additions & 0 deletions docs/superpowers/specs/2026-04-07-parallel-game-extraction-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Parallel Game Extraction in IndexWorker

**Status:** Design
**Date:** 2026-04-07
**Scope:** Option 1 from `domains/games/apis/one_d4/docs/PARALLELIZING_INDEXING.md` — parallelize per-game feature extraction within a month, keeping DB writes, batching, and ordering unchanged.

## Background

`IndexWorker.process()` currently walks each month's games strictly sequentially: for each `PlayedGame`, it calls `FeatureExtractor.extract(pgn)`, builds a row, and appends to a batch that is flushed at `BATCH_SIZE = 100`. The CPU-bound cost is dominated by `extract()` (PGN replay + motif detectors). DB writes are already batched.

This design parallelizes **only** the `extract()` step across a bounded pool. The DB write path and all observable semantics (batch size, flush boundaries, per-month period upsert, progress updates) remain identical.

## Non-goals

- Parallelizing across months (Option 4 in the parent doc).
- Parallelizing across index requests / worker threads (Option 2).
- Any schema, API, or queue changes.
- Changes to reanalysis / unification (Option 6).

## Design

### Injected executor

`IndexWorker` gains a new constructor parameter:

```java
public IndexWorker(
ChessClient chessClient,
FeatureExtractor featureExtractor,
IndexingRequestStore requestStore,
GameFeatureStore gameFeatureStore,
IndexedPeriodStore periodStore,
ExecutorService extractionExecutor) { ... }
```

The executor is provided by `IndexerModule` as a new `@Context` bean:

```java
@Context
@Bean(preDestroy = "shutdown")
public ExecutorService indexExtractionExecutor() {
int threads = parseThreads(System.getenv("INDEXER_EXTRACTION_THREADS"), 4);
ThreadFactory tf = r -> {
Thread t = new Thread(r);
t.setName("index-extract-" + COUNTER.incrementAndGet());
t.setDaemon(true);
return t;
};
return Executors.newFixedThreadPool(threads, tf);
}
```

- **Config:** env var `INDEXER_EXTRACTION_THREADS`, default **4**. Values ≤ 0 or unparseable fall back to the default with a warning log.
- **Lifecycle:** Micronaut's `@Bean(preDestroy = "shutdown")` invokes `ExecutorService.shutdown()` on context close.
- **Thread naming:** `index-extract-N` for log/profile visibility.

### Inner loop rewrite

Current per-month loop (pseudocode):

```
for each game in month:
try { features = extract(pgn); append to batch; flush if full }
catch { log and skip }
flush remaining
```

New per-month loop:

```
submit all games to executor, collecting List<Future<ExtractResult>> in source order
for each future in source order:
try {
result = future.get()
append result to batch
flush if full and update status
} catch (ExecutionException) { log and skip }
flush remaining
```

Key properties:
- **Bounded in-flight work:** at most `threads` extractions running + one in-progress batch buffered.
- **In-order drain (not a correctness requirement):** futures are drained in submission order purely because it's the simplest possible drain loop. Row order in `insertBatch` is not load-bearing — `game_features` rows are keyed by URL, IDs are DB-assigned, and the occurrences map is keyed by URL too. An `ExecutorCompletionService`-based drain would be valid but more code for no observable win at this scope (we still flush at month-end either way).
- **Failure isolation:** an extraction that throws is logged and skipped; other games still land. Mirrors the current per-game `try/catch`.
- **Batching untouched:** `BATCH_SIZE = 100`, flush-at-boundary, flush-at-month-end, and `requestStore.updateStatus` after each flush all remain.
- **No executor shutdown inside `process()`** — the pool is shared across requests and owned by the DI container.

### `ExtractResult` helper

A small private record inside `IndexWorker`:

```java
private record ExtractResult(GameFeature row, String gameUrl,
Map<Motif, List<MotifOccurrence>> occurrences) {}
```

Built inside the submitted task so that `buildGameFeature(...)` also runs on the pool thread (cheap, but keeps the drain loop pure glue).

### Thread-safety audit (implementation step)

`FeatureExtractor.extract()` creates local `positions`, `foundMotifs`, and `allOccurrences` per call, but its injected collaborators — `PgnParser`, `GameReplayer`, and `List<MotifDetector>` — are shared across threads. Before the first green concurrency test, verify each of these is stateless (no mutable instance fields touched inside `extract()` / `replay()` / `detect()`).

`MotifDetector` implementations are required to be stateless by contract. If the audit finds one that isn't, that is a design bug to fix at the source — not something to paper over with `ThreadLocal` or per-task construction. `PgnParser` and `GameReplayer` are held to the same standard.

This audit is a discrete step in the plan and blocks the concurrency test from being marked green.

## Test plan (TDD)

All new tests live in `IndexWorkerTest`, constructing `IndexWorker` directly with a real `ExecutorService` and shutting it down in `@AfterEach`.

1. **Concurrency proof (write first, should fail/deadlock)**
A `FeatureExtractor` mock where `extract()` decrements a `CountDownLatch(2)` then `await()`s it. Run with a pool size of 2 and ≥ 2 games in the month. With the current sequential loop, the test deadlocks (latch never reaches zero); after the loop rewrite it completes. Use a `junit` timeout of a few seconds so "deadlocked" surfaces as a test failure rather than a hang.

2. **Failure isolation**
Mock `extract()` to throw on one specific PGN; assert all other games in the month land in `insertBatch` and `updateStatus`'s `totalIndexed` reflects survivors only.

3. **All games land (set, not order)**
Mock returns distinguishable features per game; assert that the union of `List<GameFeature>` arguments passed to `insertBatch` across all flushes equals the input set. Order is not asserted — the drain is in-order today, but that's an implementation detail, not a contract.

4. **Existing tests stay green**
Update the existing `IndexWorkerTest` setup to construct the worker with a fixed pool (e.g. size 4). No assertion changes needed — this is the behavior-preservation bar.

5. **Module wiring**
A small unit test on `IndexerModule.indexExtractionExecutor()` (or the helper `parseThreads`) verifying: default when env var unset, respects a valid value, falls back to default on invalid input. No Micronaut context required — test the factory method directly, or extract `parseThreads` as package-private.

### TDD order

1. Concurrency-proof test → red.
2. Add executor constructor param; plumb through `IndexerModule`; update existing tests to pass the pool → existing tests green, concurrency test still red.
3. Thread-safety audit of `FeatureExtractor` collaborators.
4. Rewrite inner loop (submit + drain) → concurrency test green.
5. Failure-isolation test → red → passes once exception handling in the drain loop is wired.
6. "All games land" test → should be green immediately after step 4.
7. Module wiring test.
8. `./scripts/format-all` + `bazel test //...` before pushing.

## Constraints & risks

- **Memory:** ~20 MB per concurrent replay per `IN_PROCESS_MODE.md`. At default `threads=4` that is ~80 MB of transient replay state, well within headroom. Documenting the cap so ops knows not to set it to 32.
- **Chess.com rate limits:** Unaffected — fetching is still one HTTP call per month.
- **DB writes:** Unaffected — still single-threaded, still batched at 100.
- **Thread safety of `FeatureExtractor` collaborators:** the audit step is blocking. `MotifDetector`, `PgnParser`, and `GameReplayer` are required to be stateless; any violation found during the audit is fixed at the source as part of this change, not worked around.

## Out of scope / follow-ups

- Option 2 (multiple worker threads) remains the natural next step once single-request latency is good.
- Option 6 (unification with reanalysis) is orthogonal and not affected by this change.
- Metrics for games/sec and batch flush rate would help tune `INDEXER_EXTRACTION_THREADS`; not implemented here, noted for later.
2 changes: 1 addition & 1 deletion domains/games/apis/one_d4/docs/PARALLELIZING_INDEXING.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Currently indexing is **strictly sequential**: one worker thread, one index requ
### Suggested order of work (Java, before or without a rewrite)

1. ~~**Batch inserts**~~ ✓ — `GameFeatureStore.insertBatch` and `insertOccurrencesBatch` implemented; `IndexWorker` collects batches of 100 games before flushing.
2. **Parallel games within a month** — Fixed thread pool (e.g. 4–8), submit each game to the pool, collect `GameFeature` + occurrences, then batch insert when a batch is full or the month is done. Keeps a single "logical" worker and request ordering; only the CPU part is parallel.
2. ~~**Parallel games within a month**~~ ✓ — `IndexWorker` now submits each game's extraction to an injected `ExecutorService` (default 4 threads, env `INDEXER_EXTRACTION_THREADS`) and drains futures into the existing batch buffer; DB writes remain single-threaded and batched.
3. **Multiple worker threads** — If a single request is still slow, run 2–4 index-worker threads so multiple index requests are processed concurrently. Tune pool size and DB/API limits.

**6. Unify indexing and reanalysis paths**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.muchq.platform.http_client.core.HttpClient;
import com.muchq.platform.http_client.jdk.Jdk11HttpClient;
import com.muchq.platform.json.JsonUtils;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Factory;
import java.io.IOException;
Expand All @@ -42,6 +43,10 @@
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.sql.DataSource;
import org.jdbi.v3.core.Jdbi;
import org.slf4j.Logger;
Expand Down Expand Up @@ -173,15 +178,56 @@ public FeatureExtractor featureExtractor(
return new FeatureExtractor(pgnParser, replayer, detectors);
}

private static final AtomicInteger EXTRACT_THREAD_COUNTER = new AtomicInteger();

static int parseThreads(String raw, int defaultValue) {
if (raw == null || raw.isBlank()) {
return defaultValue;
}
try {
int parsed = Integer.parseInt(raw.strip());
if (parsed <= 0) {
LOG.warn("Invalid INDEXER_EXTRACTION_THREADS={}; falling back to {}", raw, defaultValue);
return defaultValue;
}
return parsed;
} catch (NumberFormatException e) {
LOG.warn("Unparseable INDEXER_EXTRACTION_THREADS={}; falling back to {}", raw, defaultValue);
return defaultValue;
}
}

@Context
@Bean(preDestroy = "shutdown")
@jakarta.inject.Named("indexExtraction")
public ExecutorService indexExtractionExecutor() {
int threads = parseThreads(System.getenv("INDEXER_EXTRACTION_THREADS"), 4);
ThreadFactory tf =
r -> {
Thread t = new Thread(r);
t.setName("index-extract-" + EXTRACT_THREAD_COUNTER.incrementAndGet());
t.setDaemon(true);
return t;
};
LOG.info("Index extraction executor: fixed pool of {} threads", threads);
return Executors.newFixedThreadPool(threads, tf);
}

@Context
public IndexWorker indexWorker(
ChessClient chessClient,
FeatureExtractor featureExtractor,
IndexingRequestStore requestStore,
GameFeatureStore gameFeatureStore,
IndexedPeriodStore periodStore) {
IndexedPeriodStore periodStore,
@jakarta.inject.Named("indexExtraction") ExecutorService indexExtractionExecutor) {
return new IndexWorker(
chessClient, featureExtractor, requestStore, gameFeatureStore, periodStore);
chessClient,
featureExtractor,
requestStore,
gameFeatureStore,
periodStore,
indexExtractionExecutor);
}

@Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
Expand Down Expand Up @@ -51,18 +54,21 @@ public class IndexWorker {
private final IndexingRequestStore requestStore;
private final GameFeatureStore gameFeatureStore;
private final IndexedPeriodStore periodStore;
private final ExecutorService extractionExecutor;

public IndexWorker(
ChessClient chessClient,
FeatureExtractor featureExtractor,
IndexingRequestStore requestStore,
GameFeatureStore gameFeatureStore,
IndexedPeriodStore periodStore) {
IndexedPeriodStore periodStore,
ExecutorService extractionExecutor) {
this.chessClient = chessClient;
this.featureExtractor = featureExtractor;
this.requestStore = requestStore;
this.gameFeatureStore = gameFeatureStore;
this.periodStore = periodStore;
this.extractionExecutor = extractionExecutor;
}

public void process(IndexMessage message) {
Expand Down Expand Up @@ -107,26 +113,51 @@ public void process(IndexMessage message) {
Map<String, Map<Motif, List<GameFeatures.MotifOccurrence>>> occurrencesBatch =
new LinkedHashMap<>();

int monthCount = 0;
// Submit each surviving game to the extraction pool, preserving source order.
List<Future<ExtractResult>> futures = new ArrayList<>();
for (PlayedGame game : response.get().games()) {
if (message.excludeBullet() && "bullet".equals(game.timeClass())) {
continue;
}
futures.add(
extractionExecutor.submit(
() -> {
try {
GameFeatures features = featureExtractor.extract(game.pgn());
GameFeature row = buildGameFeature(message, game, features);
return new ExtractResult(row, game.url(), features.occurrences());
} catch (Exception e) {
// TODO: pair futures with their game URLs in the drain loop instead of
// smuggling the URL through an exception message — the wrapper is only
// here so the warn log below can identify which game failed.
throw new RuntimeException(
"Failed to extract features for game " + game.url(), e);
}
}));
}

int monthCount = 0;
for (Future<ExtractResult> future : futures) {
ExtractResult result;
try {
GameFeatures features = featureExtractor.extract(game.pgn());
GameFeature row = buildGameFeature(message, game, features);
featureBatch.add(row);
if (!features.occurrences().isEmpty()) {
occurrencesBatch.put(game.url(), features.occurrences());
}
monthCount++;
totalIndexed++;
if (featureBatch.size() >= BATCH_SIZE) {
flushBatch(featureBatch, occurrencesBatch);
requestStore.updateStatus(message.requestId(), "PROCESSING", null, totalIndexed);
}
} catch (Exception e) {
LOG.warn("Failed to index game {}", game.url(), e);
result = future.get();
} catch (ExecutionException e) {
LOG.warn("{}", e.getCause().getMessage(), e.getCause());
continue;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while draining extraction futures", e);
break;
}
featureBatch.add(result.row());
if (!result.occurrences().isEmpty()) {
occurrencesBatch.put(result.gameUrl(), result.occurrences());
}
monthCount++;
totalIndexed++;
if (featureBatch.size() >= BATCH_SIZE) {
flushBatch(featureBatch, occurrencesBatch);
requestStore.updateStatus(message.requestId(), "PROCESSING", null, totalIndexed);
}
}
flushBatch(featureBatch, occurrencesBatch);
Expand Down Expand Up @@ -199,6 +230,11 @@ private String extractEcoFromPgn(String pgn) {
return m.find() ? m.group(1) : null;
}

private record ExtractResult(
GameFeature row,
String gameUrl,
Map<Motif, List<GameFeatures.MotifOccurrence>> occurrences) {}

private static boolean isLockConflict(Throwable e) {
Throwable cause = e;
while (cause != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,33 @@ public void readJdbcUrl_throwsUncheckedIOException_onNonFileNotFoundIOError() {
.hasCauseInstanceOf(IOException.class);
}

@Test
public void parseThreads_returnsDefault_whenNull() {
assertThat(IndexerModule.parseThreads(null, 4)).isEqualTo(4);
}

@Test
public void parseThreads_returnsDefault_whenBlank() {
assertThat(IndexerModule.parseThreads(" ", 4)).isEqualTo(4);
}

@Test
public void parseThreads_returnsDefault_whenUnparseable() {
assertThat(IndexerModule.parseThreads("abc", 4)).isEqualTo(4);
}

@Test
public void parseThreads_returnsDefault_whenNonPositive() {
assertThat(IndexerModule.parseThreads("0", 4)).isEqualTo(4);
assertThat(IndexerModule.parseThreads("-3", 4)).isEqualTo(4);
}

@Test
public void parseThreads_respectsValidValue() {
assertThat(IndexerModule.parseThreads("8", 4)).isEqualTo(8);
assertThat(IndexerModule.parseThreads(" 16 ", 4)).isEqualTo(16);
}

private Path missingPath() {
return tmp.getRoot().toPath().resolve("nonexistent_db_config");
}
Expand Down
Loading
Loading