diff --git a/docs/superpowers/plans/2026-04-07-parallel-game-extraction.md b/docs/superpowers/plans/2026-04-07-parallel-game-extraction.md new file mode 100644 index 00000000..07b9cd0d --- /dev/null +++ b/docs/superpowers/plans/2026-04-07-parallel-game-extraction.md @@ -0,0 +1,770 @@ +# Parallel Game Extraction Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Parallelize per-game `FeatureExtractor.extract()` calls within a month inside `IndexWorker`, behind an injected `ExecutorService`, while preserving DB write batching, status updates, and failure-isolation semantics. + +**Architecture:** `IndexWorker` gains a constructor-injected `ExecutorService`. Inside the per-month loop, each surviving game is submitted as a task that runs `featureExtractor.extract` and builds the `GameFeature` row. Futures are drained in submission order into the existing batch buffer, which is flushed at `BATCH_SIZE` and at month-end exactly as today. The pool is wired in `IndexerModule` as a fixed-size pool (default 4, env override `INDEXER_EXTRACTION_THREADS`) with Micronaut-managed shutdown. + +**Tech Stack:** Java 21, Micronaut DI (`@Factory` / `@Context`), JUnit 4, AssertJ, Bazel build, Failsafe (already used). + +**Spec:** `docs/superpowers/specs/2026-04-07-parallel-game-extraction-design.md` + +--- + +## File Structure + +**Modified:** +- `domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/worker/IndexWorker.java` — new `ExecutorService` constructor param; rewritten per-month inner loop; new private `ExtractResult` record. +- `domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/IndexerModule.java` — new `@Context` factory method `indexExtractionExecutor()`; new package-private static helper `parseThreads`; updated `indexWorker(...)` factory signature. +- `domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java` — new helper `newPool()` + `@After tearDown()`; existing setup constructs worker with the pool; three new tests. + +**Created:** +- `domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/IndexerModuleTest.java` — unit test for `parseThreads`. + +**Audited (no expected code change):** +- All `MotifDetector` implementations under `.../one_d4/motifs/` +- `FeatureExtractor`, `PgnParser`, `GameReplayer` under `.../one_d4/engine/` + +--- + +## Task 1: Statelessness audit + +`MotifDetector`, `PgnParser`, and `GameReplayer` are required by contract to be stateless. This task verifies that and is the gate for the concurrency test. + +**Files:** read-only audit of: +- `domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/motifs/*.java` +- `domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/engine/{FeatureExtractor,PgnParser,GameReplayer}.java` + +- [ ] **Step 1: Grep for non-static instance fields in detectors** + +Run (via Grep tool): +- pattern: `^\s*private (?!static)` +- path: `domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/motifs` +- output_mode: `content` + +Expected: no matches (or only `final` collaborator references that are themselves stateless). + +- [ ] **Step 2: Grep for non-static instance fields in engine helpers** + +Run (via Grep tool): +- pattern: `^\s*private (?!static)` +- path: `domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/engine` +- output_mode: `content` + +Expected: only `private final` references in `FeatureExtractor` (`pgnParser`, `replayer`, `detectors`) — all of which are themselves stateless. + +- [ ] **Step 3: Spot-read each detector's `detect(...)` method** + +Read each of: `PinDetector`, `CrossPinDetector`, `SkewerDetector`, `AttackDetector`, `CheckDetector`, `BackRankMateDetector`, `SmotheredMateDetector`, `PromotionDetector`, `PromotionWithCheckDetector`, `PromotionWithCheckmateDetector`, `DiscoveredAttackDetector`. + +Verify: `detect(List)` takes input, allocates only locals, returns a fresh list. No writes to instance fields. + +- [ ] **Step 4: Spot-read `GameReplayer.replay()` and `PgnParser.parse()`** + +Verify: each call allocates fresh state. No mutable instance fields touched. + +- [ ] **Step 5: Decision point** + +If all classes are stateless: proceed to Task 2. No commit needed (no code changed). + +If any class is stateful: **stop** and surface the finding to the user. Per the spec, the fix is to make the class stateless at the source — that becomes a prerequisite task before the rest of the plan continues. + +--- + +## Task 2: Inject `ExecutorService` into `IndexWorker` (existing tests stay green) + +This task adds the new constructor parameter and threads it through `IndexerModule` and existing tests. No behavior change yet — the inner loop still runs sequentially. Existing tests must remain green at the end. + +**Files:** +- Modify: `domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/worker/IndexWorker.java` +- Modify: `domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/IndexerModule.java` +- Modify: `domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java` + +- [ ] **Step 1: Add `ExecutorService` constructor parameter to `IndexWorker`** + +Edit `IndexWorker.java`. Add the import: + +```java +import java.util.concurrent.ExecutorService; +``` + +Add a field: + +```java +private final ExecutorService extractionExecutor; +``` + +Update the constructor signature and body: + +```java +public IndexWorker( + ChessClient chessClient, + FeatureExtractor featureExtractor, + IndexingRequestStore requestStore, + GameFeatureStore gameFeatureStore, + IndexedPeriodStore periodStore, + ExecutorService extractionExecutor) { + this.chessClient = chessClient; + this.featureExtractor = featureExtractor; + this.requestStore = requestStore; + this.gameFeatureStore = gameFeatureStore; + this.periodStore = periodStore; + this.extractionExecutor = extractionExecutor; +} +``` + +Do not change `process()` yet. The field is unused for now — that's intentional. + +- [ ] **Step 2: Wire executor through `IndexerModule`** + +Edit `IndexerModule.java`. Add imports: + +```java +import io.micronaut.context.annotation.Bean; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +``` + +Add a static counter and a new `@Context` factory method **above** `indexWorker(...)`: + +```java +private static final AtomicInteger EXTRACT_THREAD_COUNTER = new AtomicInteger(); + +static int parseThreads(String raw, int defaultValue) { + if (raw == null || raw.isBlank()) { + return defaultValue; + } + try { + int parsed = Integer.parseInt(raw.strip()); + if (parsed <= 0) { + LOG.warn("Invalid INDEXER_EXTRACTION_THREADS={}; falling back to {}", raw, defaultValue); + return defaultValue; + } + return parsed; + } catch (NumberFormatException e) { + LOG.warn("Unparseable INDEXER_EXTRACTION_THREADS={}; falling back to {}", raw, defaultValue); + return defaultValue; + } +} + +@Context +@Bean(preDestroy = "shutdown") +public ExecutorService indexExtractionExecutor() { + int threads = parseThreads(System.getenv("INDEXER_EXTRACTION_THREADS"), 4); + ThreadFactory tf = + r -> { + Thread t = new Thread(r); + t.setName("index-extract-" + EXTRACT_THREAD_COUNTER.incrementAndGet()); + t.setDaemon(true); + return t; + }; + LOG.info("Index extraction executor: fixed pool of {} threads", threads); + return Executors.newFixedThreadPool(threads, tf); +} +``` + +Update the existing `indexWorker(...)` factory method to take and forward the executor: + +```java +@Context +public IndexWorker indexWorker( + ChessClient chessClient, + FeatureExtractor featureExtractor, + IndexingRequestStore requestStore, + GameFeatureStore gameFeatureStore, + IndexedPeriodStore periodStore, + ExecutorService indexExtractionExecutor) { + return new IndexWorker( + chessClient, + featureExtractor, + requestStore, + gameFeatureStore, + periodStore, + indexExtractionExecutor); +} +``` + +- [ ] **Step 3: Update `IndexWorkerTest` setup to construct worker with a real pool** + +Edit `IndexWorkerTest.java`. Add imports: + +```java +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.After; +``` + +Add a field next to the others: + +```java +private ExecutorService extractionExecutor; +``` + +Update `setUp()` to allocate the pool and pass it through: + +```java +@Before +public void setUp() { + stubChessClient = new StubChessClient(); + requestStore = new RecordingRequestStore(); + periodStore = new StubPeriodStore(); + extractionExecutor = Executors.newFixedThreadPool(4); + List detectors = + List.of( + new PinDetector(), new CrossPinDetector(), new SkewerDetector(), new AttackDetector()); + featureExtractor = new FeatureExtractor(new PgnParser(), new GameReplayer(), detectors); + worker = + new IndexWorker( + stubChessClient, + featureExtractor, + requestStore, + new NoOpGameFeatureStore(), + periodStore, + extractionExecutor); +} + +@After +public void tearDown() { + extractionExecutor.shutdownNow(); +} +``` + +Then update **every** other place in `IndexWorkerTest.java` that constructs an `IndexWorker` to pass `extractionExecutor` as the sixth argument. Specifically: +- `process_whenGameHasMotifs_callsInsertOccurrencesWithOccurrences` — `workerWithRecording` +- `process_bulletGamesNotSkippedWhenExcludeBulletFalse` — `w` +- `process_bulletGamesSkippedWhenExcludeBulletTrue` — `w` + +Each becomes: + +```java +new IndexWorker( + stubChessClient, + featureExtractor /* or the local one */, + requestStore, + recordingStore, + periodStore, + extractionExecutor) +``` + +- [ ] **Step 4: Build and run existing tests** + +Run: +```bash +bazel test //domains/games/apis/one_d4:IndexWorkerTest +``` + +Expected: PASS. All existing tests still green; the field on `IndexWorker` is unused but compiles cleanly. + +- [ ] **Step 5: Commit** + +```bash +git add domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/worker/IndexWorker.java \ + domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/IndexerModule.java \ + domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java +git commit -m "refactor(one_d4): inject ExecutorService into IndexWorker (unused, plumbing)" +``` + +--- + +## Task 3: Concurrency-proof test (red — proves the loop is sequential today) + +Add a test that proves real parallelism. With the current sequential loop it deadlocks; the JUnit timeout converts the deadlock into a clean test failure. + +**Files:** +- Modify: `domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java` + +- [ ] **Step 1: Write the failing test** + +Add to `IndexWorkerTest.java` (anywhere among the `@Test` methods): + +```java +@Test(timeout = 5000) +public void process_runsExtractionsConcurrently_acrossPoolThreads() throws Exception { + // Two games in one month. Each extract() decrements a 2-latch then awaits it. + // If extract() runs sequentially, the second call never starts and the latch + // never reaches zero -> the test times out. With a pool size >= 2, both + // games are in flight at once and the latch releases immediately. + java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(2); + FeatureExtractor latchExtractor = + new FeatureExtractor(new PgnParser(), new GameReplayer(), List.of()) { + @Override + public GameFeatures extract(String pgn) { + latch.countDown(); + try { + if (!latch.await(3, java.util.concurrent.TimeUnit.SECONDS)) { + throw new RuntimeException("latch never released — extraction is sequential"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return new GameFeatures( + java.util.EnumSet.noneOf(Motif.class), 0, java.util.Map.of()); + } + }; + + ExecutorService pool = Executors.newFixedThreadPool(2); + try { + RecordingGameFeatureStore store = new RecordingGameFeatureStore(); + IndexWorker concurrentWorker = + new IndexWorker( + stubChessClient, latchExtractor, requestStore, store, periodStore, pool); + stubChessClient.setResponse( + java.time.YearMonth.of(2024, 1), + List.of( + playedGame("https://chess.com/g/1", MINIMAL_PGN, "blitz"), + playedGame("https://chess.com/g/2", MINIMAL_PGN, "blitz"))); + + concurrentWorker.process( + new IndexMessage(REQUEST_ID, PLAYER, PLATFORM, "2024-01", "2024-01", false)); + + assertThat(store.getInsertCount()).isEqualTo(2); + assertThat(requestStore.getLastStatus()).isEqualTo("COMPLETED"); + } finally { + pool.shutdownNow(); + } +} +``` + +Note: this test subclasses `FeatureExtractor` to override `extract`. `FeatureExtractor.extract` is currently non-final — confirm that's the case before relying on the override. If `extract` is `final`, change it to non-final in `FeatureExtractor.java` as part of this step (single-line change), since making test seams overridable is preferable to introducing a Mockito dependency in this test file. + +- [ ] **Step 2: Run the test to verify it fails** + +Run: +```bash +bazel test //domains/games/apis/one_d4:IndexWorkerTest --test_filter=process_runsExtractionsConcurrently_acrossPoolThreads +``` + +Expected: FAIL with timeout (`test timed out after 5000 milliseconds`) — proving the inner loop is currently sequential. + +- [ ] **Step 3: Commit the failing test** + +```bash +git add domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java +git commit -m "test(one_d4): add failing concurrency test for IndexWorker extraction" +``` + +(Optional but recommended: lets the next commit show the fix in isolation.) + +--- + +## Task 4: Rewrite the inner loop to submit + drain (concurrency test goes green) + +**Files:** +- Modify: `domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/worker/IndexWorker.java` + +- [ ] **Step 1: Add the `ExtractResult` private record and rewrite the per-month inner loop** + +In `IndexWorker.java`, add imports: + +```java +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +``` + +Add a private nested record near the bottom of the class: + +```java +private record ExtractResult( + GameFeature row, + String gameUrl, + Map> occurrences) {} +``` + +Replace the inner per-game loop in `process()` (currently `IndexWorker.java:106-131`) with: + +```java +List featureBatch = new ArrayList<>(); +Map>> occurrencesBatch = + new LinkedHashMap<>(); + +// Submit each surviving game to the extraction pool, preserving source order. +List> futures = new ArrayList<>(); +for (PlayedGame game : response.get().games()) { + if (message.excludeBullet() && "bullet".equals(game.timeClass())) { + continue; + } + futures.add( + extractionExecutor.submit( + () -> { + GameFeatures features = featureExtractor.extract(game.pgn()); + GameFeature row = buildGameFeature(message, game, features); + return new ExtractResult(row, game.url(), features.occurrences()); + })); +} + +int monthCount = 0; +for (Future future : futures) { + ExtractResult result; + try { + result = future.get(); + } catch (ExecutionException e) { + LOG.warn("Failed to index game", e.getCause()); + continue; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while draining extraction futures", e); + break; + } + featureBatch.add(result.row()); + if (!result.occurrences().isEmpty()) { + occurrencesBatch.put(result.gameUrl(), result.occurrences()); + } + monthCount++; + totalIndexed++; + if (featureBatch.size() >= BATCH_SIZE) { + flushBatch(featureBatch, occurrencesBatch); + requestStore.updateStatus(message.requestId(), "PROCESSING", null, totalIndexed); + } +} +flushBatch(featureBatch, occurrencesBatch); +``` + +Leave the period upsert and final `updateStatus` calls below this block exactly as they are. The `int finalMonthCount = monthCount;` capture for the lambda still works. + +- [ ] **Step 2: Run the concurrency test to verify it passes** + +Run: +```bash +bazel test //domains/games/apis/one_d4:IndexWorkerTest --test_filter=process_runsExtractionsConcurrently_acrossPoolThreads +``` + +Expected: PASS within 5 seconds. + +- [ ] **Step 3: Run the full IndexWorkerTest to verify nothing regressed** + +Run: +```bash +bazel test //domains/games/apis/one_d4:IndexWorkerTest +``` + +Expected: all tests PASS. + +- [ ] **Step 4: Commit** + +```bash +git add domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/worker/IndexWorker.java +git commit -m "feat(one_d4): parallelize per-month feature extraction in IndexWorker" +``` + +--- + +## Task 5: Failure-isolation test + +Verify that an extraction throwing for one game does not break the rest of the month. + +**Files:** +- Modify: `domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java` + +- [ ] **Step 1: Write the failing test** + +Add to `IndexWorkerTest.java`: + +```java +@Test +public void process_oneFailingExtraction_doesNotPreventOthers() { + String poisonUrl = "https://chess.com/g/poison"; + FeatureExtractor selectivelyFailing = + new FeatureExtractor(new PgnParser(), new GameReplayer(), List.of()) { + @Override + public GameFeatures extract(String pgn) { + if (pgn.contains("POISON")) { + throw new RuntimeException("boom"); + } + return new GameFeatures( + java.util.EnumSet.noneOf(Motif.class), 0, java.util.Map.of()); + } + }; + + String poisonPgn = + """ + [Event "POISON"] + [Site "Chess.com"] + [White "W"] + [Black "B"] + [Result "1-0"] + [ECO "C20"] + + 1. e4 e5 1-0 + """; + + RecordingGameFeatureStore store = new RecordingGameFeatureStore(); + IndexWorker w = + new IndexWorker( + stubChessClient, selectivelyFailing, requestStore, store, periodStore, extractionExecutor); + stubChessClient.setResponse( + java.time.YearMonth.of(2024, 1), + List.of( + playedGame("https://chess.com/g/ok1", MINIMAL_PGN, "blitz"), + playedGame(poisonUrl, poisonPgn, "blitz"), + playedGame("https://chess.com/g/ok2", MINIMAL_PGN, "blitz"))); + + w.process(new IndexMessage(REQUEST_ID, PLAYER, PLATFORM, "2024-01", "2024-01", false)); + + assertThat(store.getInsertCount()).isEqualTo(2); + assertThat(requestStore.getLastStatus()).isEqualTo("COMPLETED"); + assertThat(requestStore.getLastGamesIndexed()).isEqualTo(2); +} +``` + +- [ ] **Step 2: Run it to verify it passes** + +Run: +```bash +bazel test //domains/games/apis/one_d4:IndexWorkerTest --test_filter=process_oneFailingExtraction_doesNotPreventOthers +``` + +Expected: PASS — the drain loop's `ExecutionException` catch already handles this. If it fails, inspect the unwrapping: `future.get()` wraps the cause in `ExecutionException`, which the loop logs and skips. + +- [ ] **Step 3: Commit** + +```bash +git add domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java +git commit -m "test(one_d4): add failure isolation test for parallel extraction" +``` + +--- + +## Task 6: All-games-land test (set equality, not order) + +Asserts that every input game appears in `insertBatch` calls — without asserting order, leaving room for a future switch to completion-order draining. + +**Files:** +- Modify: `domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java` + +- [ ] **Step 1: Extend `RecordingGameFeatureStore` to capture inserted URLs** + +In `IndexWorkerTest.java`, edit the `RecordingGameFeatureStore` inner class. Add a field and capture URLs in `insertBatch`: + +```java +private static final class RecordingGameFeatureStore extends NoOpGameFeatureStore { + private final Map>> + allInsertedOccurrences = new HashMap<>(); + private final List insertedUrls = + java.util.Collections.synchronizedList(new ArrayList<>()); + private int insertCount = 0; + + Map>> getAllInsertedOccurrences() { + return allInsertedOccurrences; + } + + int getInsertCount() { + return insertCount; + } + + List getInsertedUrls() { + return new ArrayList<>(insertedUrls); + } + + @Override + public void insertBatch(List features) { + insertCount += features.size(); + for (GameFeature f : features) { + insertedUrls.add(f.gameUrl()); + } + } + + @Override + public void insertOccurrencesBatch( + Map>> occurrencesByGame) { + allInsertedOccurrences.putAll(occurrencesByGame); + } +} +``` + +Note: `GameFeature.gameUrl()` is the accessor — verified against `GameFeature.java` (record component named `gameUrl`). + +- [ ] **Step 2: Add the test** + +```java +@Test +public void process_allGamesLandInBatch_regardlessOfOrder() { + List urls = + List.of( + "https://chess.com/g/a", + "https://chess.com/g/b", + "https://chess.com/g/c", + "https://chess.com/g/d"); + List games = new ArrayList<>(); + for (String u : urls) { + games.add(playedGame(u, MINIMAL_PGN, "blitz")); + } + stubChessClient.setResponse(java.time.YearMonth.of(2024, 1), games); + + RecordingGameFeatureStore store = new RecordingGameFeatureStore(); + IndexWorker w = + new IndexWorker( + stubChessClient, featureExtractor, requestStore, store, periodStore, extractionExecutor); + + w.process(new IndexMessage(REQUEST_ID, PLAYER, PLATFORM, "2024-01", "2024-01", false)); + + assertThat(store.getInsertedUrls()).containsExactlyInAnyOrderElementsOf(urls); + assertThat(requestStore.getLastStatus()).isEqualTo("COMPLETED"); +} +``` + +- [ ] **Step 3: Run it to verify it passes** + +Run: +```bash +bazel test //domains/games/apis/one_d4:IndexWorkerTest --test_filter=process_allGamesLandInBatch_regardlessOfOrder +``` + +Expected: PASS. + +- [ ] **Step 4: Commit** + +```bash +git add domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java +git commit -m "test(one_d4): assert all games land via insertBatch (set equality)" +``` + +--- + +## Task 7: `IndexerModule.parseThreads` unit test + +Cover the env-var parsing helper directly. No Micronaut context needed. + +**Files:** +- Create: `domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/IndexerModuleTest.java` +- Modify: `domains/games/apis/one_d4/BUILD.bazel` (only if a new test target is needed; otherwise add the source to an existing test target) + +- [ ] **Step 1: Inspect the existing test BUILD wiring** + +Read `domains/games/apis/one_d4/BUILD.bazel` and locate the `java_test` (or `java_library` test target) that compiles `IndexWorkerTest.java`. Determine whether `IndexerModuleTest.java` can be added to that target's `srcs` (preferred — no new target) or whether a sibling target is the right home. + +Adjust the file path of the new test to match the package the existing test target expects (likely `com/muchq/games/one_d4/IndexerModuleTest.java` directly under `src/test/java/`). + +- [ ] **Step 2: Write the test** + +Create `domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/IndexerModuleTest.java`: + +```java +package com.muchq.games.one_d4; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Test; + +public class IndexerModuleTest { + + @Test + public void parseThreads_returnsDefault_whenNull() { + assertThat(IndexerModule.parseThreads(null, 4)).isEqualTo(4); + } + + @Test + public void parseThreads_returnsDefault_whenBlank() { + assertThat(IndexerModule.parseThreads(" ", 4)).isEqualTo(4); + } + + @Test + public void parseThreads_returnsDefault_whenUnparseable() { + assertThat(IndexerModule.parseThreads("abc", 4)).isEqualTo(4); + } + + @Test + public void parseThreads_returnsDefault_whenNonPositive() { + assertThat(IndexerModule.parseThreads("0", 4)).isEqualTo(4); + assertThat(IndexerModule.parseThreads("-3", 4)).isEqualTo(4); + } + + @Test + public void parseThreads_respectsValidValue() { + assertThat(IndexerModule.parseThreads("8", 4)).isEqualTo(8); + assertThat(IndexerModule.parseThreads(" 16 ", 4)).isEqualTo(16); + } +} +``` + +- [ ] **Step 3: Add the test source to BUILD.bazel if needed** + +If the existing `IndexWorkerTest` target uses an explicit `srcs = [...]` list, add the new file. If it uses a glob (e.g., `glob(["src/test/java/**/*.java"])`), no edit is needed. + +- [ ] **Step 4: Run the test** + +Run: +```bash +bazel test //domains/games/apis/one_d4:IndexerModuleTest +``` + +(Or whatever target now contains the file — discover via `bazel query 'tests(//domains/games/apis/one_d4/...)'` if unsure.) + +Expected: 5 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/IndexerModuleTest.java \ + domains/games/apis/one_d4/BUILD.bazel +git commit -m "test(one_d4): cover IndexerModule.parseThreads" +``` + +--- + +## Task 8: Format, full test sweep, push + +**Files:** none modified directly; this is a verification gate. + +- [ ] **Step 1: Format** + +Run: +```bash +./scripts/format-all +``` + +Expected: clean exit. If files are reformatted, stage and amend the relevant prior commit OR create a small `chore: format` commit on top — prefer the latter to avoid amending shared history. + +- [ ] **Step 2: Full Bazel test sweep** + +Run: +```bash +bazel test //... +``` + +Expected: all tests PASS. Investigate any failure before pushing. + +- [ ] **Step 3: Push the branch** + +Run: +```bash +git push -u origin parallel +``` + +(Or current branch name — confirm with `git branch --show-current` first.) + +- [ ] **Step 4: Update the parent doc to mark Option 1 as done** + +Edit `domains/games/apis/one_d4/docs/PARALLELIZING_INDEXING.md`. In the "Suggested order of work" list, mark item 2 as complete the same way item 1 already is: + +Find: +``` +2. **Parallel games within a month** — Fixed thread pool (e.g. 4–8), submit each game to the pool, collect `GameFeature` + occurrences, then batch insert when a batch is full or the month is done. Keeps a single "logical" worker and request ordering; only the CPU part is parallel. +``` + +Replace with: +``` +2. ~~**Parallel games within a month**~~ ✓ — `IndexWorker` now submits each game's extraction to an injected `ExecutorService` (default 4 threads, env `INDEXER_EXTRACTION_THREADS`), drains futures into the existing batch buffer; DB writes remain single-threaded and batched. +``` + +- [ ] **Step 5: Commit the doc update** + +```bash +git add domains/games/apis/one_d4/docs/PARALLELIZING_INDEXING.md +git commit -m "docs(one_d4): mark parallel-extraction step as complete" +git push +``` + +--- + +## Notes for the engineer + +- **Why an in-order drain rather than `ExecutorCompletionService`?** Order is not load-bearing (rows are keyed by URL, IDs are DB-assigned). The in-order drain is just the simplest possible loop. Test 6 deliberately uses `containsExactlyInAnyOrder` so a future switch to completion-order draining doesn't break it. +- **Why not `parallelStream()`?** It uses the common ForkJoinPool — can't size, can't isolate, can't inject in tests. +- **Pool sizing:** default 4 is conservative. Each concurrent extraction holds ~20 MB of replay state (`IN_PROCESS_MODE.md`), so 4 ≈ 80 MB transient. Don't bump the default without checking memory headroom. +- **Failure mode if `extract` is final:** the test uses an anonymous subclass of `FeatureExtractor` to inject latch/throw behavior. If `FeatureExtractor.extract` is `final`, drop the `final` modifier — the codebase doesn't use Mockito for this test file and the alternative (introducing a mock framework just for these tests) is heavier. +- **Interrupted exception in the drain:** the rewritten loop sets the interrupt flag, logs, and breaks out of the drain. The remaining futures will be cancelled when the executor is shut down by the DI container; not perfect but correct for our use case (we'd be tearing down anyway). +- **`IndexWorker` field is "unused" between Task 2 and Task 4.** That's fine — it lets each commit be small and individually green. Some linters may flag this; tolerate it for the duration of one commit. diff --git a/docs/superpowers/specs/2026-04-07-parallel-game-extraction-design.md b/docs/superpowers/specs/2026-04-07-parallel-game-extraction-design.md new file mode 100644 index 00000000..6fc2f4fb --- /dev/null +++ b/docs/superpowers/specs/2026-04-07-parallel-game-extraction-design.md @@ -0,0 +1,148 @@ +# Parallel Game Extraction in IndexWorker + +**Status:** Design +**Date:** 2026-04-07 +**Scope:** Option 1 from `domains/games/apis/one_d4/docs/PARALLELIZING_INDEXING.md` — parallelize per-game feature extraction within a month, keeping DB writes, batching, and ordering unchanged. + +## Background + +`IndexWorker.process()` currently walks each month's games strictly sequentially: for each `PlayedGame`, it calls `FeatureExtractor.extract(pgn)`, builds a row, and appends to a batch that is flushed at `BATCH_SIZE = 100`. The CPU-bound cost is dominated by `extract()` (PGN replay + motif detectors). DB writes are already batched. + +This design parallelizes **only** the `extract()` step across a bounded pool. The DB write path and all observable semantics (batch size, flush boundaries, per-month period upsert, progress updates) remain identical. + +## Non-goals + +- Parallelizing across months (Option 4 in the parent doc). +- Parallelizing across index requests / worker threads (Option 2). +- Any schema, API, or queue changes. +- Changes to reanalysis / unification (Option 6). + +## Design + +### Injected executor + +`IndexWorker` gains a new constructor parameter: + +```java +public IndexWorker( + ChessClient chessClient, + FeatureExtractor featureExtractor, + IndexingRequestStore requestStore, + GameFeatureStore gameFeatureStore, + IndexedPeriodStore periodStore, + ExecutorService extractionExecutor) { ... } +``` + +The executor is provided by `IndexerModule` as a new `@Context` bean: + +```java +@Context +@Bean(preDestroy = "shutdown") +public ExecutorService indexExtractionExecutor() { + int threads = parseThreads(System.getenv("INDEXER_EXTRACTION_THREADS"), 4); + ThreadFactory tf = r -> { + Thread t = new Thread(r); + t.setName("index-extract-" + COUNTER.incrementAndGet()); + t.setDaemon(true); + return t; + }; + return Executors.newFixedThreadPool(threads, tf); +} +``` + +- **Config:** env var `INDEXER_EXTRACTION_THREADS`, default **4**. Values ≤ 0 or unparseable fall back to the default with a warning log. +- **Lifecycle:** Micronaut's `@Bean(preDestroy = "shutdown")` invokes `ExecutorService.shutdown()` on context close. +- **Thread naming:** `index-extract-N` for log/profile visibility. + +### Inner loop rewrite + +Current per-month loop (pseudocode): + +``` +for each game in month: + try { features = extract(pgn); append to batch; flush if full } + catch { log and skip } +flush remaining +``` + +New per-month loop: + +``` +submit all games to executor, collecting List> in source order +for each future in source order: + try { + result = future.get() + append result to batch + flush if full and update status + } catch (ExecutionException) { log and skip } +flush remaining +``` + +Key properties: +- **Bounded in-flight work:** at most `threads` extractions running + one in-progress batch buffered. +- **In-order drain (not a correctness requirement):** futures are drained in submission order purely because it's the simplest possible drain loop. Row order in `insertBatch` is not load-bearing — `game_features` rows are keyed by URL, IDs are DB-assigned, and the occurrences map is keyed by URL too. An `ExecutorCompletionService`-based drain would be valid but more code for no observable win at this scope (we still flush at month-end either way). +- **Failure isolation:** an extraction that throws is logged and skipped; other games still land. Mirrors the current per-game `try/catch`. +- **Batching untouched:** `BATCH_SIZE = 100`, flush-at-boundary, flush-at-month-end, and `requestStore.updateStatus` after each flush all remain. +- **No executor shutdown inside `process()`** — the pool is shared across requests and owned by the DI container. + +### `ExtractResult` helper + +A small private record inside `IndexWorker`: + +```java +private record ExtractResult(GameFeature row, String gameUrl, + Map> occurrences) {} +``` + +Built inside the submitted task so that `buildGameFeature(...)` also runs on the pool thread (cheap, but keeps the drain loop pure glue). + +### Thread-safety audit (implementation step) + +`FeatureExtractor.extract()` creates local `positions`, `foundMotifs`, and `allOccurrences` per call, but its injected collaborators — `PgnParser`, `GameReplayer`, and `List` — are shared across threads. Before the first green concurrency test, verify each of these is stateless (no mutable instance fields touched inside `extract()` / `replay()` / `detect()`). + +`MotifDetector` implementations are required to be stateless by contract. If the audit finds one that isn't, that is a design bug to fix at the source — not something to paper over with `ThreadLocal` or per-task construction. `PgnParser` and `GameReplayer` are held to the same standard. + +This audit is a discrete step in the plan and blocks the concurrency test from being marked green. + +## Test plan (TDD) + +All new tests live in `IndexWorkerTest`, constructing `IndexWorker` directly with a real `ExecutorService` and shutting it down in `@AfterEach`. + +1. **Concurrency proof (write first, should fail/deadlock)** + A `FeatureExtractor` mock where `extract()` decrements a `CountDownLatch(2)` then `await()`s it. Run with a pool size of 2 and ≥ 2 games in the month. With the current sequential loop, the test deadlocks (latch never reaches zero); after the loop rewrite it completes. Use a `junit` timeout of a few seconds so "deadlocked" surfaces as a test failure rather than a hang. + +2. **Failure isolation** + Mock `extract()` to throw on one specific PGN; assert all other games in the month land in `insertBatch` and `updateStatus`'s `totalIndexed` reflects survivors only. + +3. **All games land (set, not order)** + Mock returns distinguishable features per game; assert that the union of `List` arguments passed to `insertBatch` across all flushes equals the input set. Order is not asserted — the drain is in-order today, but that's an implementation detail, not a contract. + +4. **Existing tests stay green** + Update the existing `IndexWorkerTest` setup to construct the worker with a fixed pool (e.g. size 4). No assertion changes needed — this is the behavior-preservation bar. + +5. **Module wiring** + A small unit test on `IndexerModule.indexExtractionExecutor()` (or the helper `parseThreads`) verifying: default when env var unset, respects a valid value, falls back to default on invalid input. No Micronaut context required — test the factory method directly, or extract `parseThreads` as package-private. + +### TDD order + +1. Concurrency-proof test → red. +2. Add executor constructor param; plumb through `IndexerModule`; update existing tests to pass the pool → existing tests green, concurrency test still red. +3. Thread-safety audit of `FeatureExtractor` collaborators. +4. Rewrite inner loop (submit + drain) → concurrency test green. +5. Failure-isolation test → red → passes once exception handling in the drain loop is wired. +6. "All games land" test → should be green immediately after step 4. +7. Module wiring test. +8. `./scripts/format-all` + `bazel test //...` before pushing. + +## Constraints & risks + +- **Memory:** ~20 MB per concurrent replay per `IN_PROCESS_MODE.md`. At default `threads=4` that is ~80 MB of transient replay state, well within headroom. Documenting the cap so ops knows not to set it to 32. +- **Chess.com rate limits:** Unaffected — fetching is still one HTTP call per month. +- **DB writes:** Unaffected — still single-threaded, still batched at 100. +- **Thread safety of `FeatureExtractor` collaborators:** the audit step is blocking. `MotifDetector`, `PgnParser`, and `GameReplayer` are required to be stateless; any violation found during the audit is fixed at the source as part of this change, not worked around. + +## Out of scope / follow-ups + +- Option 2 (multiple worker threads) remains the natural next step once single-request latency is good. +- Option 6 (unification with reanalysis) is orthogonal and not affected by this change. +- Metrics for games/sec and batch flush rate would help tune `INDEXER_EXTRACTION_THREADS`; not implemented here, noted for later. diff --git a/domains/games/apis/one_d4/docs/PARALLELIZING_INDEXING.md b/domains/games/apis/one_d4/docs/PARALLELIZING_INDEXING.md index 59865bf1..7c21c1aa 100644 --- a/domains/games/apis/one_d4/docs/PARALLELIZING_INDEXING.md +++ b/domains/games/apis/one_d4/docs/PARALLELIZING_INDEXING.md @@ -40,7 +40,7 @@ Currently indexing is **strictly sequential**: one worker thread, one index requ ### Suggested order of work (Java, before or without a rewrite) 1. ~~**Batch inserts**~~ ✓ — `GameFeatureStore.insertBatch` and `insertOccurrencesBatch` implemented; `IndexWorker` collects batches of 100 games before flushing. -2. **Parallel games within a month** — Fixed thread pool (e.g. 4–8), submit each game to the pool, collect `GameFeature` + occurrences, then batch insert when a batch is full or the month is done. Keeps a single "logical" worker and request ordering; only the CPU part is parallel. +2. ~~**Parallel games within a month**~~ ✓ — `IndexWorker` now submits each game's extraction to an injected `ExecutorService` (default 4 threads, env `INDEXER_EXTRACTION_THREADS`) and drains futures into the existing batch buffer; DB writes remain single-threaded and batched. 3. **Multiple worker threads** — If a single request is still slow, run 2–4 index-worker threads so multiple index requests are processed concurrently. Tune pool size and DB/API limits. **6. Unify indexing and reanalysis paths** diff --git a/domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/IndexerModule.java b/domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/IndexerModule.java index 304ab2e5..9f39f8f5 100644 --- a/domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/IndexerModule.java +++ b/domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/IndexerModule.java @@ -34,6 +34,7 @@ import com.muchq.platform.http_client.core.HttpClient; import com.muchq.platform.http_client.jdk.Jdk11HttpClient; import com.muchq.platform.json.JsonUtils; +import io.micronaut.context.annotation.Bean; import io.micronaut.context.annotation.Context; import io.micronaut.context.annotation.Factory; import java.io.IOException; @@ -42,6 +43,10 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import javax.sql.DataSource; import org.jdbi.v3.core.Jdbi; import org.slf4j.Logger; @@ -173,15 +178,56 @@ public FeatureExtractor featureExtractor( return new FeatureExtractor(pgnParser, replayer, detectors); } + private static final AtomicInteger EXTRACT_THREAD_COUNTER = new AtomicInteger(); + + static int parseThreads(String raw, int defaultValue) { + if (raw == null || raw.isBlank()) { + return defaultValue; + } + try { + int parsed = Integer.parseInt(raw.strip()); + if (parsed <= 0) { + LOG.warn("Invalid INDEXER_EXTRACTION_THREADS={}; falling back to {}", raw, defaultValue); + return defaultValue; + } + return parsed; + } catch (NumberFormatException e) { + LOG.warn("Unparseable INDEXER_EXTRACTION_THREADS={}; falling back to {}", raw, defaultValue); + return defaultValue; + } + } + + @Context + @Bean(preDestroy = "shutdown") + @jakarta.inject.Named("indexExtraction") + public ExecutorService indexExtractionExecutor() { + int threads = parseThreads(System.getenv("INDEXER_EXTRACTION_THREADS"), 4); + ThreadFactory tf = + r -> { + Thread t = new Thread(r); + t.setName("index-extract-" + EXTRACT_THREAD_COUNTER.incrementAndGet()); + t.setDaemon(true); + return t; + }; + LOG.info("Index extraction executor: fixed pool of {} threads", threads); + return Executors.newFixedThreadPool(threads, tf); + } + @Context public IndexWorker indexWorker( ChessClient chessClient, FeatureExtractor featureExtractor, IndexingRequestStore requestStore, GameFeatureStore gameFeatureStore, - IndexedPeriodStore periodStore) { + IndexedPeriodStore periodStore, + @jakarta.inject.Named("indexExtraction") ExecutorService indexExtractionExecutor) { return new IndexWorker( - chessClient, featureExtractor, requestStore, gameFeatureStore, periodStore); + chessClient, + featureExtractor, + requestStore, + gameFeatureStore, + periodStore, + indexExtractionExecutor); } @Context diff --git a/domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/worker/IndexWorker.java b/domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/worker/IndexWorker.java index 0f001e00..991c1cb0 100644 --- a/domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/worker/IndexWorker.java +++ b/domains/games/apis/one_d4/src/main/java/com/muchq/games/one_d4/worker/IndexWorker.java @@ -24,6 +24,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.slf4j.Logger; @@ -51,18 +54,21 @@ public class IndexWorker { private final IndexingRequestStore requestStore; private final GameFeatureStore gameFeatureStore; private final IndexedPeriodStore periodStore; + private final ExecutorService extractionExecutor; public IndexWorker( ChessClient chessClient, FeatureExtractor featureExtractor, IndexingRequestStore requestStore, GameFeatureStore gameFeatureStore, - IndexedPeriodStore periodStore) { + IndexedPeriodStore periodStore, + ExecutorService extractionExecutor) { this.chessClient = chessClient; this.featureExtractor = featureExtractor; this.requestStore = requestStore; this.gameFeatureStore = gameFeatureStore; this.periodStore = periodStore; + this.extractionExecutor = extractionExecutor; } public void process(IndexMessage message) { @@ -107,26 +113,51 @@ public void process(IndexMessage message) { Map>> occurrencesBatch = new LinkedHashMap<>(); - int monthCount = 0; + // Submit each surviving game to the extraction pool, preserving source order. + List> futures = new ArrayList<>(); for (PlayedGame game : response.get().games()) { if (message.excludeBullet() && "bullet".equals(game.timeClass())) { continue; } + futures.add( + extractionExecutor.submit( + () -> { + try { + GameFeatures features = featureExtractor.extract(game.pgn()); + GameFeature row = buildGameFeature(message, game, features); + return new ExtractResult(row, game.url(), features.occurrences()); + } catch (Exception e) { + // TODO: pair futures with their game URLs in the drain loop instead of + // smuggling the URL through an exception message — the wrapper is only + // here so the warn log below can identify which game failed. + throw new RuntimeException( + "Failed to extract features for game " + game.url(), e); + } + })); + } + + int monthCount = 0; + for (Future future : futures) { + ExtractResult result; try { - GameFeatures features = featureExtractor.extract(game.pgn()); - GameFeature row = buildGameFeature(message, game, features); - featureBatch.add(row); - if (!features.occurrences().isEmpty()) { - occurrencesBatch.put(game.url(), features.occurrences()); - } - monthCount++; - totalIndexed++; - if (featureBatch.size() >= BATCH_SIZE) { - flushBatch(featureBatch, occurrencesBatch); - requestStore.updateStatus(message.requestId(), "PROCESSING", null, totalIndexed); - } - } catch (Exception e) { - LOG.warn("Failed to index game {}", game.url(), e); + result = future.get(); + } catch (ExecutionException e) { + LOG.warn("{}", e.getCause().getMessage(), e.getCause()); + continue; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while draining extraction futures", e); + break; + } + featureBatch.add(result.row()); + if (!result.occurrences().isEmpty()) { + occurrencesBatch.put(result.gameUrl(), result.occurrences()); + } + monthCount++; + totalIndexed++; + if (featureBatch.size() >= BATCH_SIZE) { + flushBatch(featureBatch, occurrencesBatch); + requestStore.updateStatus(message.requestId(), "PROCESSING", null, totalIndexed); } } flushBatch(featureBatch, occurrencesBatch); @@ -199,6 +230,11 @@ private String extractEcoFromPgn(String pgn) { return m.find() ? m.group(1) : null; } + private record ExtractResult( + GameFeature row, + String gameUrl, + Map> occurrences) {} + private static boolean isLockConflict(Throwable e) { Throwable cause = e; while (cause != null) { diff --git a/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/IndexerModuleTest.java b/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/IndexerModuleTest.java index c6a5425d..5ad53532 100644 --- a/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/IndexerModuleTest.java +++ b/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/IndexerModuleTest.java @@ -76,6 +76,33 @@ public void readJdbcUrl_throwsUncheckedIOException_onNonFileNotFoundIOError() { .hasCauseInstanceOf(IOException.class); } + @Test + public void parseThreads_returnsDefault_whenNull() { + assertThat(IndexerModule.parseThreads(null, 4)).isEqualTo(4); + } + + @Test + public void parseThreads_returnsDefault_whenBlank() { + assertThat(IndexerModule.parseThreads(" ", 4)).isEqualTo(4); + } + + @Test + public void parseThreads_returnsDefault_whenUnparseable() { + assertThat(IndexerModule.parseThreads("abc", 4)).isEqualTo(4); + } + + @Test + public void parseThreads_returnsDefault_whenNonPositive() { + assertThat(IndexerModule.parseThreads("0", 4)).isEqualTo(4); + assertThat(IndexerModule.parseThreads("-3", 4)).isEqualTo(4); + } + + @Test + public void parseThreads_respectsValidValue() { + assertThat(IndexerModule.parseThreads("8", 4)).isEqualTo(8); + assertThat(IndexerModule.parseThreads(" 16 ", 4)).isEqualTo(16); + } + private Path missingPath() { return tmp.getRoot().toPath().resolve("nonexistent_db_config"); } diff --git a/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/e2e/IndexE2ETest.java b/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/e2e/IndexE2ETest.java index b41b5101..fb0b727f 100644 --- a/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/e2e/IndexE2ETest.java +++ b/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/e2e/IndexE2ETest.java @@ -55,6 +55,12 @@ public class IndexE2ETest { private FakeChessClient fakeChessClient; private IndexingRequestStore requestStore; private IndexedPeriodStore periodStore; + private java.util.concurrent.ExecutorService extractionExecutor; + + @org.junit.After + public void tearDownExecutor() { + if (extractionExecutor != null) extractionExecutor.shutdownNow(); + } @Before public void setUp() { @@ -76,9 +82,15 @@ public void setUp() { new AttackDetector()); FeatureExtractor featureExtractor = new FeatureExtractor(new PgnParser(), new GameReplayer(), detectors); + extractionExecutor = java.util.concurrent.Executors.newSingleThreadExecutor(); worker = new IndexWorker( - fakeChessClient, featureExtractor, requestStore, gameFeatureStore, periodStore); + fakeChessClient, + featureExtractor, + requestStore, + gameFeatureStore, + periodStore, + extractionExecutor); controller = new IndexController(requestStore, queue, new IndexRequestValidator()); } diff --git a/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/e2e/MotifE2ETest.java b/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/e2e/MotifE2ETest.java index f64fda41..9e36f570 100644 --- a/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/e2e/MotifE2ETest.java +++ b/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/e2e/MotifE2ETest.java @@ -110,6 +110,12 @@ public class MotifE2ETest { private FakeChessClient fakeChessClient; private IndexingRequestStore requestStore; private IndexedPeriodStore periodStore; + private java.util.concurrent.ExecutorService extractionExecutor; + + @org.junit.After + public void tearDownExecutor() { + if (extractionExecutor != null) extractionExecutor.shutdownNow(); + } @Before public void setUp() { @@ -136,9 +142,15 @@ public void setUp() { new SmotheredMateDetector()); FeatureExtractor featureExtractor = new FeatureExtractor(new PgnParser(), new GameReplayer(), detectors); + extractionExecutor = java.util.concurrent.Executors.newSingleThreadExecutor(); worker = new IndexWorker( - fakeChessClient, featureExtractor, requestStore, gameFeatureStore, periodStore); + fakeChessClient, + featureExtractor, + requestStore, + gameFeatureStore, + periodStore, + extractionExecutor); controller = new IndexController(requestStore, queue, new IndexRequestValidator()); } diff --git a/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/e2e/ReanalysisE2ETest.java b/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/e2e/ReanalysisE2ETest.java index d1216ccc..717181b8 100644 --- a/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/e2e/ReanalysisE2ETest.java +++ b/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/e2e/ReanalysisE2ETest.java @@ -93,6 +93,12 @@ public class ReanalysisE2ETest { private IndexingRequestStore requestStore; private IndexedPeriodStore periodStore; private FeatureExtractor featureExtractor; + private java.util.concurrent.ExecutorService extractionExecutor; + + @org.junit.After + public void tearDownExecutor() { + if (extractionExecutor != null) extractionExecutor.shutdownNow(); + } @Before public void setUp() { @@ -119,9 +125,15 @@ public void setUp() { new SmotheredMateDetector()); featureExtractor = new FeatureExtractor(new PgnParser(), new GameReplayer(), detectors); + extractionExecutor = java.util.concurrent.Executors.newSingleThreadExecutor(); worker = new IndexWorker( - fakeChessClient, featureExtractor, requestStore, gameFeatureStore, periodStore); + fakeChessClient, + featureExtractor, + requestStore, + gameFeatureStore, + periodStore, + extractionExecutor); controller = new IndexController(requestStore, queue, new IndexRequestValidator()); adminController = new AdminController(gameFeatureStore, featureExtractor); } diff --git a/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java b/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java index 75835467..3eb8005a 100644 --- a/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java +++ b/domains/games/apis/one_d4/src/test/java/com/muchq/games/one_d4/worker/IndexWorkerTest.java @@ -33,6 +33,9 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -47,12 +50,14 @@ public class IndexWorkerTest { private StubPeriodStore periodStore; private IndexWorker worker; private FeatureExtractor featureExtractor; + private ExecutorService extractionExecutor; @Before public void setUp() { stubChessClient = new StubChessClient(); requestStore = new RecordingRequestStore(); periodStore = new StubPeriodStore(); + extractionExecutor = Executors.newFixedThreadPool(4); List detectors = List.of( new PinDetector(), new CrossPinDetector(), new SkewerDetector(), new AttackDetector()); @@ -63,7 +68,13 @@ public void setUp() { featureExtractor, requestStore, new NoOpGameFeatureStore(), - periodStore); + periodStore, + extractionExecutor); + } + + @After + public void tearDown() { + extractionExecutor.shutdownNow(); } @Test @@ -132,7 +143,12 @@ public void process_whenGameHasMotifs_callsInsertOccurrencesWithOccurrences() { new FeatureExtractor(new PgnParser(), new GameReplayer(), detectors); IndexWorker workerWithRecording = new IndexWorker( - stubChessClient, featureExtractor, requestStore, recordingStore, periodStore); + stubChessClient, + featureExtractor, + requestStore, + recordingStore, + periodStore, + extractionExecutor); IndexMessage message = new IndexMessage(REQUEST_ID, PLAYER, PLATFORM, "2024-01", "2024-01", false); @@ -156,7 +172,12 @@ public void process_bulletGamesNotSkippedWhenExcludeBulletFalse() { RecordingGameFeatureStore recordingStore = new RecordingGameFeatureStore(); IndexWorker w = new IndexWorker( - stubChessClient, featureExtractor, requestStore, recordingStore, periodStore); + stubChessClient, + featureExtractor, + requestStore, + recordingStore, + periodStore, + extractionExecutor); w.process(new IndexMessage(REQUEST_ID, PLAYER, PLATFORM, "2024-01", "2024-01", false)); @@ -180,6 +201,51 @@ public void process_onUnhandledException_storesOpaqueErrorMessage() { .isEqualTo("Indexing failed due to an internal error"); } + @Test(timeout = 5000) + public void process_runsExtractionsConcurrently_acrossPoolThreads() throws Exception { + // Two games in one month. Each extract() decrements a 2-latch then awaits it. + // If extract() runs sequentially, the second call never starts and the latch + // never reaches zero -> the test times out. With a pool size >= 2, both + // games are in flight at once and the latch releases immediately. + java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(2); + FeatureExtractor latchExtractor = + new FeatureExtractor(new PgnParser(), new GameReplayer(), List.of()) { + @Override + public GameFeatures extract(String pgn) { + latch.countDown(); + try { + // No timeout: if the loop is sequential the second game never starts, + // so the latch never reaches 0 and this blocks forever -> JUnit timeout fires. + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return new GameFeatures(java.util.EnumSet.noneOf(Motif.class), 0, java.util.Map.of()); + } + }; + + ExecutorService pool = Executors.newFixedThreadPool(2); + try { + RecordingGameFeatureStore store = new RecordingGameFeatureStore(); + IndexWorker concurrentWorker = + new IndexWorker(stubChessClient, latchExtractor, requestStore, store, periodStore, pool); + stubChessClient.setResponse( + java.time.YearMonth.of(2024, 1), + List.of( + playedGame("https://chess.com/g/1", MINIMAL_PGN, "blitz"), + playedGame("https://chess.com/g/2", MINIMAL_PGN, "blitz"))); + + concurrentWorker.process( + new IndexMessage(REQUEST_ID, PLAYER, PLATFORM, "2024-01", "2024-01", false)); + + assertThat(store.getInsertCount()).isEqualTo(2); + assertThat(requestStore.getLastStatus()).isEqualTo("COMPLETED"); + } finally { + pool.shutdownNow(); + } + } + @Test public void process_bulletGamesSkippedWhenExcludeBulletTrue() { String gameUrl = "https://chess.com/game/bullet-skip"; @@ -188,7 +254,12 @@ public void process_bulletGamesSkippedWhenExcludeBulletTrue() { RecordingGameFeatureStore recordingStore = new RecordingGameFeatureStore(); IndexWorker w = new IndexWorker( - stubChessClient, featureExtractor, requestStore, recordingStore, periodStore); + stubChessClient, + featureExtractor, + requestStore, + recordingStore, + periodStore, + extractionExecutor); w.process(new IndexMessage(REQUEST_ID, PLAYER, PLATFORM, "2024-01", "2024-01", true)); @@ -196,6 +267,85 @@ public void process_bulletGamesSkippedWhenExcludeBulletTrue() { assertThat(requestStore.getLastStatus()).isEqualTo("COMPLETED"); } + @Test + public void process_oneFailingExtraction_doesNotPreventOthers() { + String poisonUrl = "https://chess.com/g/poison"; + FeatureExtractor selectivelyFailing = + new FeatureExtractor(new PgnParser(), new GameReplayer(), List.of()) { + @Override + public GameFeatures extract(String pgn) { + if (pgn.contains("POISON")) { + throw new RuntimeException("boom"); + } + return new GameFeatures(java.util.EnumSet.noneOf(Motif.class), 0, java.util.Map.of()); + } + }; + + String poisonPgn = + """ + [Event "POISON"] + [Site "Chess.com"] + [White "W"] + [Black "B"] + [Result "1-0"] + [ECO "C20"] + + 1. e4 e5 1-0 + """; + + RecordingGameFeatureStore store = new RecordingGameFeatureStore(); + IndexWorker w = + new IndexWorker( + stubChessClient, + selectivelyFailing, + requestStore, + store, + periodStore, + extractionExecutor); + stubChessClient.setResponse( + java.time.YearMonth.of(2024, 1), + List.of( + playedGame("https://chess.com/g/ok1", MINIMAL_PGN, "blitz"), + playedGame(poisonUrl, poisonPgn, "blitz"), + playedGame("https://chess.com/g/ok2", MINIMAL_PGN, "blitz"))); + + w.process(new IndexMessage(REQUEST_ID, PLAYER, PLATFORM, "2024-01", "2024-01", false)); + + assertThat(store.getInsertCount()).isEqualTo(2); + assertThat(requestStore.getLastStatus()).isEqualTo("COMPLETED"); + assertThat(requestStore.getLastGamesIndexed()).isEqualTo(2); + } + + @Test + public void process_allGamesLandInBatch_regardlessOfOrder() { + List urls = + List.of( + "https://chess.com/g/a", + "https://chess.com/g/b", + "https://chess.com/g/c", + "https://chess.com/g/d"); + List games = new ArrayList<>(); + for (String u : urls) { + games.add(playedGame(u, MINIMAL_PGN, "blitz")); + } + stubChessClient.setResponse(java.time.YearMonth.of(2024, 1), games); + + RecordingGameFeatureStore store = new RecordingGameFeatureStore(); + IndexWorker w = + new IndexWorker( + stubChessClient, + featureExtractor, + requestStore, + store, + periodStore, + extractionExecutor); + + w.process(new IndexMessage(REQUEST_ID, PLAYER, PLATFORM, "2024-01", "2024-01", false)); + + assertThat(store.getInsertedUrls()).containsExactlyInAnyOrderElementsOf(urls); + assertThat(requestStore.getLastStatus()).isEqualTo("COMPLETED"); + } + private static PlayedGame playedGame(String gameUrl, String pgn, String timeClass) { return new PlayedGame( gameUrl, @@ -356,6 +506,7 @@ public List fetchForReanalysis(int limit, int offset) { private static final class RecordingGameFeatureStore extends NoOpGameFeatureStore { private final Map>> allInsertedOccurrences = new HashMap<>(); + private final List insertedUrls = new ArrayList<>(); private int insertCount = 0; Map>> getAllInsertedOccurrences() { @@ -366,9 +517,16 @@ int getInsertCount() { return insertCount; } + List getInsertedUrls() { + return insertedUrls; + } + @Override public void insertBatch(List features) { insertCount += features.size(); + for (GameFeature f : features) { + insertedUrls.add(f.gameUrl()); + } } @Override