diff --git a/clawloop/core/archive_recorder.py b/clawloop/core/archive_recorder.py new file mode 100644 index 00000000..9e660748 --- /dev/null +++ b/clawloop/core/archive_recorder.py @@ -0,0 +1,241 @@ +"""Archive-store recorder extracted from ``learning_loop``. + +The recorder encapsulates all writes to an ``ArchiveStore`` during a run +and owns the run-level counters (``run_id``, ``best_reward``, +``initial_reward``, ``total_cost``, ``prev_variant_hash``) that would +otherwise pollute ``learning_loop``'s local scope. + +All writes are wrapped in ``try / log.warning(..., exc_info=True)`` — +archive failures are non-fatal by design so that training continues +even if the backing store is misbehaving. +""" + +from __future__ import annotations + +import logging +import time +from typing import TYPE_CHECKING, Any + +from clawloop.archive.null_store import NullArchiveStore +from clawloop.archive.schema import ( + AgentVariant, + EpisodeRecord, + IterationRecord, + RunRecord, +) +from clawloop.archive.store import ArchiveStore +from clawloop.core.episode import Episode +from clawloop.core.state import StateID +from clawloop.core.types import FBResult +from clawloop.learning_layers.harness import Harness +from clawloop.utils.content_hash import canonical_hash + +if TYPE_CHECKING: + from clawloop.core.loop import AgentState + +log = logging.getLogger(__name__) + + +def _build_agent_config(agent_state: "AgentState") -> dict[str, Any]: + """Serializable snapshot of the agent's config for archive identity.""" + config: dict[str, Any] = {} + if isinstance(agent_state.harness, Harness): + config["system_prompts"] = dict(agent_state.harness.system_prompts) + config["playbook"] = agent_state.harness.playbook.to_dict() + config["router"] = agent_state.router.to_dict() + return config + + +def _variant_from_state( + agent_state: "AgentState", + variant_hash: str, + run_id: str, + created_at: float, +) -> AgentVariant: + harness = agent_state.harness + is_harness = isinstance(harness, Harness) + return AgentVariant( + variant_hash=variant_hash, + system_prompt=(next(iter(harness.system_prompts.values()), "") if is_harness else ""), + playbook_snapshot=harness.playbook.to_dict() if is_harness else {}, + model="", + tools=[], + first_seen_run_id=run_id, + created_at=created_at, + ) + + +class ArchiveRecorder: + """Owns all ``ArchiveStore`` writes for one ``learning_loop`` run. + + Construction writes the initial ``RunRecord`` + ``AgentVariant``. Each + iteration calls :meth:`record_episodes` and :meth:`record_iteration`. + :meth:`record_complete` writes the terminal ``log_run_complete`` row + with accumulated best-reward / improvement-delta / total-cost metrics. + """ + + def __init__( + self, + archive: ArchiveStore | None, + agent_state: "AgentState", + bench: str, + domain_tags: list[str] | None, + n_iterations: int, + ) -> None: + self._store: ArchiveStore = archive if archive is not None else NullArchiveStore() + self._run_id = RunRecord.new_id() + self._best_reward: float | None = None + self._initial_reward: float | None = None + self._total_cost = 0 + + initial_config = _build_agent_config(agent_state) + initial_hash = canonical_hash(initial_config) + self._prev_variant_hash = initial_hash + + now = time.time() + try: + self._store.log_run_start( + RunRecord( + run_id=self._run_id, + bench=bench, + domain_tags=list(domain_tags or []), + agent_config=initial_config, + config_hash=initial_hash, + n_iterations=n_iterations, + best_reward=0.0, + improvement_delta=0.0, + total_cost_tokens=0, + parent_run_id=None, + created_at=now, + completed_at=None, + ) + ) + except Exception: + log.warning("Archive: failed to log run start", exc_info=True) + + try: + self._store.log_variant( + _variant_from_state(agent_state, initial_hash, self._run_id, now) + ) + except Exception: + log.warning("Archive: failed to log initial variant", exc_info=True) + + @property + def run_id(self) -> str: + """Stable run-id used for all records emitted by this recorder.""" + return self._run_id + + def record_episodes(self, iteration: int, episodes: list[Episode]) -> None: + """Write one ``EpisodeRecord`` per episode. No-op when ``episodes`` is empty.""" + if not episodes: + return + records: list[EpisodeRecord] = [] + for ep in episodes: + tool_call_count = sum(len(m.tool_calls or []) for m in ep.messages) + token_usage: dict[str, Any] + if ep.summary.token_usage is not None: + token_usage = { + "prompt_tokens": ep.summary.token_usage.prompt_tokens, + "completion_tokens": ep.summary.token_usage.completion_tokens, + "total_tokens": ep.summary.token_usage.total_tokens, + } + else: + token_usage = {} + records.append( + EpisodeRecord( + run_id=self._run_id, + iteration_num=iteration, + episode_id=ep.id, + task_id=ep.task_id, + bench=ep.bench, + model=ep.model or "", + reward=ep.summary.normalized_reward(), + signals=( + { + k: {"value": s.value, "confidence": s.confidence} + for k, s in ep.summary.signals.items() + } + if ep.summary.signals + else {} + ), + n_steps=ep.n_steps(), + n_tool_calls=tool_call_count, + token_usage=token_usage, + latency_ms=int(ep.summary.timing.total_ms if ep.summary.timing else 0), + messages_ref=f"traces/{ep.id}.json", + created_at=ep.created_at if ep.created_at is not None else time.time(), + ) + ) + try: + self._store.log_episodes(records) + except Exception: + log.warning("Archive: failed to log episodes", exc_info=True) + + def record_iteration( + self, + iteration: int, + agent_state: "AgentState", + state_id: StateID, + fb_results: dict[str, FBResult], + episodes: list[Episode], + avg_reward: float, + prev_avg_reward: float, + ) -> None: + """Write the per-iteration ``IterationRecord`` and, if the agent's + config hash changed, a new ``AgentVariant``. Also updates + ``best_reward`` / ``initial_reward`` / ``total_cost`` counters used + by :meth:`record_complete`. + """ + iter_cost = sum( + r.metrics.get("tokens_used", 0) for r in fb_results.values() if r.status == "ok" + ) + self._total_cost += iter_cost + + try: + cur_config = _build_agent_config(agent_state) + cur_variant_hash = canonical_hash(cur_config) + evolver_action: dict[str, Any] = {} + for name, result in fb_results.items(): + if result.status == "ok": + evolver_action[name] = result.metrics + self._store.log_iteration( + IterationRecord( + run_id=self._run_id, + iteration_num=iteration, + harness_snapshot_hash=state_id.harness_hash, + mean_reward=avg_reward, + reward_trajectory=[ep.summary.normalized_reward() for ep in episodes], + evolver_action=evolver_action, + cost_tokens=iter_cost, + parent_variant_hash=self._prev_variant_hash, + child_variant_hash=cur_variant_hash, + reward_delta=avg_reward - prev_avg_reward, + created_at=time.time(), + ) + ) + if cur_variant_hash != self._prev_variant_hash: + self._store.log_variant( + _variant_from_state(agent_state, cur_variant_hash, self._run_id, time.time()) + ) + self._prev_variant_hash = cur_variant_hash + except Exception: + log.warning("Archive: failed to log iteration %d", iteration, exc_info=True) + + if self._best_reward is None or avg_reward > self._best_reward: + self._best_reward = avg_reward + if self._initial_reward is None: + self._initial_reward = avg_reward + + def record_complete(self) -> None: + """Write the terminal ``log_run_complete`` with accumulated metrics.""" + try: + best = self._best_reward if self._best_reward is not None else 0.0 + initial = self._initial_reward if self._initial_reward is not None else 0.0 + self._store.log_run_complete( + self._run_id, + best, + best - initial, + total_cost_tokens=self._total_cost, + ) + except Exception: + log.warning("Archive: failed to log run complete", exc_info=True) diff --git a/clawloop/core/loop.py b/clawloop/core/loop.py index 52ff7e1d..3e654e25 100644 --- a/clawloop/core/loop.py +++ b/clawloop/core/loop.py @@ -8,32 +8,25 @@ from __future__ import annotations -import copy import json import logging -import random import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable, Protocol -from clawloop.archive.null_store import NullArchiveStore -from clawloop.archive.schema import ( - AgentVariant, - EpisodeRecord, - IterationRecord, - RunRecord, -) from clawloop.archive.store import ArchiveStore +from clawloop.core.archive_recorder import ArchiveRecorder from clawloop.core.episode import Episode from clawloop.core.evolution_log import EvolutionEntry, EvolutionLog from clawloop.core.intensity import AdaptiveIntensity +from clawloop.core.runner import EpisodeCollectorRunner from clawloop.core.state import StateID -from clawloop.core.types import Datum, FBResult +from clawloop.core.transaction import LayerTransaction +from clawloop.core.types import FBResult from clawloop.learning_layers.harness import Harness from clawloop.learning_layers.router import Router from clawloop.learning_layers.weights import Weights -from clawloop.utils.content_hash import canonical_hash log = logging.getLogger(__name__) @@ -244,14 +237,158 @@ class AdapterLike(Protocol): def run_episode(self, task: Any, agent_state: AgentState) -> Episode: ... -def _build_agent_config(agent_state: AgentState) -> dict[str, Any]: - """Extract serializable agent config snapshot for archive identity.""" - config: dict[str, Any] = {} - if isinstance(agent_state.harness, Harness): - config["system_prompts"] = dict(agent_state.harness.system_prompts) - config["playbook"] = agent_state.harness.playbook.to_dict() - config["router"] = agent_state.router.to_dict() - return config +def _avg_reward(episodes: list[Episode]) -> float: + """Mean ``total_reward`` across episodes; 0.0 when empty.""" + if not episodes: + return 0.0 + return sum(ep.summary.total_reward for ep in episodes) / len(episodes) + + +def _refresh_tinker_fields( + agent_state: AgentState, + active_layers: list[str] | None, +) -> tuple[AgentState, list[tuple[str, Any]]]: + """Refresh Tinker-driven fields on ``AgentState`` from current backend. + + Gated on ``hasattr`` so non-Tinker backends (e.g. SkyRL) are untouched. + """ + backend = getattr(agent_state.weights, "_backend", None) + if backend is None or not hasattr(backend, "current_sampling_client"): + return agent_state, agent_state.get_layers(active_layers) + + from dataclasses import replace as _replace + + agent_state = _replace( + agent_state, + sampling_client=backend.current_sampling_client(), + renderer=getattr(backend, "renderer", None), + tokenizer=getattr(backend, "tokenizer", None), + ) + return agent_state, agent_state.get_layers(active_layers) + + +def _set_evolver_context( + agent_state: AgentState, + intensity: AdaptiveIntensity | None, + iteration: int, +) -> None: + """Push reward history + stagnation signal onto the harness's evolver.""" + if not isinstance(agent_state.harness, Harness) or agent_state.harness.evolver is None: + return + from clawloop.core.evolver import EvolverContext + + agent_state.harness.set_evolver_context( + EvolverContext( + reward_history=list(intensity._rewards) if intensity else [], + is_stagnating=intensity.is_stagnating() if intensity else False, + iteration=iteration, + tried_paradigms=list(agent_state.tried_paradigms), + ) + ) + + +def _maybe_save_backend_state( + agent_state: AgentState, + weights_fb: FBResult | None, + iteration: int, +) -> None: + """Tinker save_state hook — persist intermediate weights between iters. + + Skips when the weights step produced zero datums (GRPO filtered every + group for zero variance) to avoid polluting the durable-path timeline. + """ + backend = getattr(agent_state.weights, "_backend", None) + if backend is None or not hasattr(backend, "save_state"): + return + n_datums = weights_fb.metrics.get("n_datums", 0) if weights_fb and weights_fb.metrics else 0 + if n_datums > 0: + try: + backend.save_state(f"iter_{iteration}").result() + except Exception: + log.exception("backend.save_state failed for iter_%d", iteration) + else: + log.info( + " [save_state] skipped iter_%d — no datums produced (GRPO filtered all groups)", + iteration, + ) + + +def _flush_generation_if_advanced(agent_state: AgentState, optim_failed: bool) -> None: + """Drop stale weights-buffer episodes when the playbook generation advances. + + Prevents RL from learning against pre-adaptation behavior. No-op when + optim failed (we already rolled back) or harness isn't a ``Harness``. + """ + if optim_failed or not isinstance(agent_state.harness, Harness): + return + current_gen = agent_state.harness.playbook_generation + prev_gen = agent_state._prev_playbook_generation + if current_gen > prev_gen: + stale = agent_state.weights.pending_advantage_count() + agent_state.weights.clear_pending_state() + log.info( + " Generation %d->%d: flushed %d stale episodes from weights buffer", + prev_gen, + current_gen, + stale, + ) + agent_state._prev_playbook_generation = current_gen + + +def _log_evolution_entry( + evo_log: EvolutionLog, + agent_state: AgentState, + fb_results: dict[str, FBResult], + prev_hash: str, + new_hash: str, + prev_avg_reward: float, + avg_reward: float, + iteration: int, +) -> None: + """Append one ``EvolutionEntry`` iff any fb result produced a named action.""" + actions: list[str] = [] + for result in fb_results.values(): + if result.status != "ok": + continue + if result.metrics.get("insights_generated"): + actions.append("reflect") + if result.metrics.get("candidates_generated"): + actions.append("mutate") + if result.metrics.get("paradigm_shifted"): + actions.append("paradigm_shift") + if not actions: + return + backend = ( + agent_state.harness.evolver.name() + if isinstance(agent_state.harness, Harness) and agent_state.harness.evolver + else "none" + ) + evo_log.append( + EvolutionEntry( + iteration=iteration, + state_hash_before=prev_hash, + state_hash_after=new_hash, + actions=actions, + reward_before=prev_avg_reward, + reward_after=avg_reward, + backend=backend, + ) + ) + + +def _run_after_iteration( + cb: "Callable[[int, AgentState, list[Episode]], None] | None", + iteration: int, + agent_state: AgentState, + episodes: list[Episode], +) -> None: + """Invoke the optional after-iteration callback, logging any exception.""" + if cb is None: + return + try: + cb(iteration, agent_state, episodes) + except Exception: + log.exception("after_iteration callback failed") def learning_loop( @@ -307,436 +444,71 @@ def learning_loop( wandb_name=wandb_name, ) evo_log = EvolutionLog(output_dir) - _archive: ArchiveStore = archive if archive is not None else NullArchiveStore() - _run_id = RunRecord.new_id() - _initial_config = _build_agent_config(agent_state) - _initial_variant_hash = canonical_hash(_initial_config) - _now = time.time() - try: - _archive.log_run_start( - RunRecord( - run_id=_run_id, - bench=bench, - domain_tags=list(domain_tags or []), - agent_config=_initial_config, - config_hash=_initial_variant_hash, - n_iterations=n_iterations, - best_reward=0.0, - improvement_delta=0.0, - total_cost_tokens=0, - parent_run_id=None, - created_at=_now, - completed_at=None, - ) - ) - except Exception: - log.warning("Archive: failed to log run start", exc_info=True) - try: - _archive.log_variant( - AgentVariant( - variant_hash=_initial_variant_hash, - system_prompt=( - next(iter(agent_state.harness.system_prompts.values()), "") - if isinstance(agent_state.harness, Harness) - else "" - ), - playbook_snapshot=( - agent_state.harness.playbook.to_dict() - if isinstance(agent_state.harness, Harness) - else {} - ), - model="", - tools=[], - first_seen_run_id=_run_id, - created_at=_now, - ) - ) - except Exception: - log.warning("Archive: failed to log initial variant", exc_info=True) - _prev_variant_hash = _initial_variant_hash - _best_reward: float | None = None # None until first iteration — handles negative rewards - _initial_reward: float | None = None - _total_cost = 0 + runner = EpisodeCollectorRunner(adapter) + recorder = ArchiveRecorder( + archive, + agent_state, + bench=bench, + domain_tags=domain_tags, + n_iterations=n_iterations, + ) prev_avg_reward = 0.0 log.info("Starting learning loop — initial state: %s", state_id.combined_hash[:12]) for iteration in range(n_iterations): log.info("Iteration %d/%d", iteration + 1, n_iterations) - # 0. Refresh Tinker-driven fields on AgentState from current backend. - # Gated on hasattr so non-Tinker backends (e.g. SkyRL) are untouched. - backend = getattr(agent_state.weights, "_backend", None) - if backend is not None and hasattr(backend, "current_sampling_client"): - from dataclasses import replace as _replace - - agent_state = _replace( - agent_state, - sampling_client=backend.current_sampling_client(), - renderer=getattr(backend, "renderer", None), - tokenizer=getattr(backend, "tokenizer", None), - ) - # Refresh the layers view against the (possibly) new agent_state. - layers = agent_state.get_layers(active_layers) - - # 1. Collect episodes - if not tasks or n_episodes <= 0: - episodes: list[Episode] = [] - else: - if n_episodes <= len(tasks): - selected_tasks = random.sample(tasks, n_episodes) - else: - selected_tasks = random.choices(tasks, k=n_episodes) - - if hasattr(adapter, "run_batch") and callable(getattr(adapter, "run_batch", None)): - episodes = adapter.run_batch(agent_state, selected_tasks) - elif hasattr(adapter, "run_episodes_batch") and callable( - getattr(adapter, "run_episodes_batch", None) - ): - # Concurrent rollout path (OpenSpielGameAdapter): all - # episodes for this iter fan out under one event loop, - # Tinker queues them as parallel ConcurrentFutures. - episodes = adapter.run_episodes_batch(selected_tasks, agent_state) - else: - episodes = [] - for task in selected_tasks: - ep = adapter.run_episode(task, agent_state) - episodes.append(ep) - - avg_reward = ( - sum(ep.summary.total_reward for ep in episodes) / len(episodes) if episodes else 0.0 - ) + agent_state, layers = _refresh_tinker_fields(agent_state, active_layers) + episodes: list[Episode] = runner.collect(agent_state, tasks, n_episodes) + avg_reward = _avg_reward(episodes) log.info(" Collected %d episodes, avg reward: %.4f", len(episodes), avg_reward) - - if episodes: - _ep_records: list[EpisodeRecord] = [] - for ep in episodes: - tool_call_count = sum(len(m.tool_calls or []) for m in ep.messages) - _ep_records.append( - EpisodeRecord( - run_id=_run_id, - iteration_num=iteration, - episode_id=ep.id, - task_id=ep.task_id, - bench=ep.bench, - model=ep.model or "", - reward=ep.summary.normalized_reward(), - signals={ - k: {"value": s.value, "confidence": s.confidence} - for k, s in ep.summary.signals.items() - } - if ep.summary.signals - else {}, - n_steps=ep.n_steps(), - n_tool_calls=tool_call_count, - token_usage=( - { - "prompt_tokens": ep.summary.token_usage.prompt_tokens, - "completion_tokens": ep.summary.token_usage.completion_tokens, - "total_tokens": ep.summary.token_usage.total_tokens, - } - if ep.summary.token_usage - else {} - ), - latency_ms=int(ep.summary.timing.total_ms if ep.summary.timing else 0), - messages_ref=f"traces/{ep.id}.json", - created_at=ep.created_at if ep.created_at is not None else time.time(), - ) - ) - try: - _archive.log_episodes(_ep_records) - except Exception: - log.warning("Archive: failed to log episodes", exc_info=True) - - # Record reward for adaptive intensity + recorder.record_episodes(iteration, episodes) if intensity is not None: intensity.record_reward(avg_reward) - # 2. Build per-layer datums - # NOTE: Support-query split (failures→harness, - # successes→weights) is disabled. GRPO needs all episodes for - # advantage variance, and the on-policy vs off-policy boundary - # after harness updates needs more work. See roadmap Task 2.1. - layer_datums: dict[str, Datum] = { - "harness": Datum(episodes=episodes), - "weights": Datum(episodes=episodes), - "router": Datum(episodes=episodes), - } + _set_evolver_context(agent_state, intensity, iteration) - # 2b. Set evolver context on harness (for Evolver-based optimization) - if isinstance(agent_state.harness, Harness) and agent_state.harness.evolver is not None: - from clawloop.core.evolver import EvolverContext - - ctx = EvolverContext( - reward_history=list(intensity._rewards) if intensity else [], - is_stagnating=intensity.is_stagnating() if intensity else False, - iteration=iteration, - tried_paradigms=list(agent_state.tried_paradigms), - ) - agent_state.harness.set_evolver_context(ctx) - - # 3. Phase 1: forward_backward (all active layers) - fb_results: dict[str, FBResult] = {} - for name, layer in layers: - # Skip harness reflection when intensity says not to - if ( - name == "harness" - and intensity is not None - and not intensity.should_reflect(iteration) - ): - log.info(" skipping harness fb (adaptive intensity)") - fb_results[name] = FBResult(status="skipped") - continue - if name in layer_datums: - datum = layer_datums[name] - else: - log.warning(" unknown layer %s — using all episodes as fallback", name) - datum = Datum(episodes=episodes) - should_clear = False - try: - fut = layer.forward_backward(datum) - fb_result = fut.result() - fb_results[name] = fb_result - if fb_result.status in ("error", "skipped"): - should_clear = True - except Exception: - log.exception("forward_backward failed for %s", name) - fb_results[name] = FBResult(status="error") - should_clear = True - - if should_clear: - try: - layer.clear_pending_state() - except Exception: - log.exception("Failed to clear pending for %s", name) - - for name, result in fb_results.items(): - log.info(" fb %s: %s %s", name, result.status, result.metrics) - - # Track paradigm shifts before optim drains _pending - harness_fb = fb_results.get("harness") - if ( - harness_fb is not None - and harness_fb.metrics.get("paradigm_shifted") - and isinstance(agent_state.harness, Harness) - ): - for insight in agent_state.harness._pending.insights: - if "paradigm" in (insight.tags or []): - agent_state.tried_paradigms.append(insight.content) - - # 4. Phase 2: optim_step with cross-layer rollback - layers_to_optim = [ - (name, layer) - for name, layer in layers - if fb_results.get(name, FBResult(status="error")).status not in ("error", "skipped") - ] - - # Snapshot all layers before optim (for cross-layer rollback) - snapshots: dict[str, dict[str, Any]] = {} - try: - for name, layer in layers_to_optim: - snapshots[name] = copy.deepcopy(layer.to_dict()) - except Exception: - log.exception("Snapshot failed — skipping optim this iteration") - for name, layer in layers_to_optim: - try: - layer.clear_pending_state() - except Exception: - log.exception("Failed to clear pending for %s", name) - layers_to_optim = [] - - optim_failed = False - for name, layer in layers_to_optim: - try: - result = layer.optim_step().result() - log.info( - " optim %s: %s, %d updates", - name, - result.status, - result.updates_applied, - ) - if result.status == "error": - optim_failed = True - log.error( - " optim %s returned error — triggering rollback", - name, - ) - break - except Exception: - log.exception( - "optim_step failed for %s — triggering rollback", - name, - ) - optim_failed = True - break - - if optim_failed: - log.warning(" rolling back all layers to pre-optim state") - for name, layer in layers_to_optim: - if name in snapshots: - try: - lr = layer.load_state(snapshots[name]).result() - if lr.status != "ok": - log.error( - " rollback returned %s for %s", - lr.status, - name, - ) - except Exception: - log.exception(" rollback failed for %s", name) - - # 4b. Tinker save_state hook — persist intermediate weights and swap - # the backend's internal SamplingClient so the next iter picks up the - # freshly-trained adapter on its top-of-iter refresh. Gated on - # hasattr so SkyRL-style backends are unaffected. - # - # Skip the save entirely when the weights step produced zero datums - # (GRPO filtered every group for zero variance) — checkpointing an - # unchanged adapter wastes Tinker quota and pollutes the durable-path - # timeline. - weights_fb = fb_results.get("weights") - n_datums = ( - weights_fb.metrics.get("n_datums", 0) - if weights_fb is not None and weights_fb.metrics - else 0 - ) - if backend is not None and hasattr(backend, "save_state") and n_datums > 0: - try: - backend.save_state(f"iter_{iteration}").result() - except Exception: - log.exception("backend.save_state failed for iter_%d", iteration) - elif backend is not None and hasattr(backend, "save_state"): - log.info( - " [save_state] skipped iter_%d — no datums produced " - "(GRPO filtered all groups)", - iteration, - ) - - # Generation flush: when playbook_generation advances, clear stale - # episodes from weights buffer to prevent RL learning pre-adaptation behavior - if isinstance(agent_state.harness, Harness) and not optim_failed: - current_gen = agent_state.harness.playbook_generation - prev_gen = agent_state._prev_playbook_generation - if current_gen > prev_gen: - stale = agent_state.weights.pending_advantage_count() - agent_state.weights.clear_pending_state() - log.info( - " Generation %d->%d: flushed %d stale episodes from weights buffer", - prev_gen, - current_gen, - stale, - ) - agent_state._prev_playbook_generation = current_gen + tx_result = LayerTransaction( + layers, + intensity=intensity, + episodes=episodes, + agent_state=agent_state, + ).run(iteration) + fb_results = tx_result.fb_results + + _maybe_save_backend_state(agent_state, fb_results.get("weights"), iteration) + _flush_generation_if_advanced(agent_state, tx_result.optim_failed) - # 5. Log iteration results harness_ref = agent_state.harness if isinstance(agent_state.harness, Harness) else None + backend = getattr(agent_state.weights, "_backend", None) exp_log.log_iteration(iteration, episodes, fb_results, harness_ref, backend=backend) - # 6. Recompute state identity and log evolution entry prev_hash = state_id.combined_hash state_id = agent_state.state_id() + _log_evolution_entry( + evo_log, + agent_state, + fb_results, + prev_hash, + state_id.combined_hash, + prev_avg_reward, + avg_reward, + iteration, + ) - # Build actions list from fb results for evolution log - actions: list[str] = [] - for name, result in fb_results.items(): - if result.status == "ok": - if result.metrics.get("insights_generated"): - actions.append("reflect") - if result.metrics.get("candidates_generated"): - actions.append("mutate") - if result.metrics.get("paradigm_shifted"): - actions.append("paradigm_shift") - if actions: - evo_log.append( - EvolutionEntry( - iteration=iteration, - state_hash_before=prev_hash, - state_hash_after=state_id.combined_hash, - actions=actions, - reward_before=prev_avg_reward, - reward_after=avg_reward, - backend=( - agent_state.harness.evolver.name() - if isinstance(agent_state.harness, Harness) and agent_state.harness.evolver - else "none" - ), - ) - ) - - try: - _cur_config = _build_agent_config(agent_state) - _cur_variant_hash = canonical_hash(_cur_config) - _evolver_action: dict[str, Any] = {} - for name, result in fb_results.items(): - if result.status == "ok": - _evolver_action[name] = result.metrics - _iter_cost = sum( - r.metrics.get("tokens_used", 0) for r in fb_results.values() if r.status == "ok" - ) - _total_cost += _iter_cost - _archive.log_iteration( - IterationRecord( - run_id=_run_id, - iteration_num=iteration, - harness_snapshot_hash=state_id.harness_hash, - mean_reward=avg_reward, - reward_trajectory=[ep.summary.normalized_reward() for ep in episodes], - evolver_action=_evolver_action, - cost_tokens=_iter_cost, - parent_variant_hash=_prev_variant_hash, - child_variant_hash=_cur_variant_hash, - reward_delta=avg_reward - prev_avg_reward, - created_at=time.time(), - ) - ) - if _cur_variant_hash != _prev_variant_hash: - _archive.log_variant( - AgentVariant( - variant_hash=_cur_variant_hash, - system_prompt=( - next(iter(agent_state.harness.system_prompts.values()), "") - if isinstance(agent_state.harness, Harness) - else "" - ), - playbook_snapshot=( - agent_state.harness.playbook.to_dict() - if isinstance(agent_state.harness, Harness) - else {} - ), - model="", - tools=[], - first_seen_run_id=_run_id, - created_at=time.time(), - ) - ) - _prev_variant_hash = _cur_variant_hash - except Exception: - log.warning("Archive: failed to log iteration %d", iteration, exc_info=True) - - if _best_reward is None or avg_reward > _best_reward: - _best_reward = avg_reward - if _initial_reward is None: - _initial_reward = avg_reward - + recorder.record_iteration( + iteration=iteration, + agent_state=agent_state, + state_id=state_id, + fb_results=fb_results, + episodes=episodes, + avg_reward=avg_reward, + prev_avg_reward=prev_avg_reward, + ) prev_avg_reward = avg_reward - # 7. Optional after-iteration callback (e.g. eval scoring) - if after_iteration is not None: - try: - after_iteration(iteration, agent_state, episodes) - except Exception: - log.exception("after_iteration callback failed") - - try: - final_best = _best_reward if _best_reward is not None else 0.0 - final_initial = _initial_reward if _initial_reward is not None else 0.0 - _archive.log_run_complete( - _run_id, - final_best, - final_best - final_initial, - total_cost_tokens=_total_cost, - ) - except Exception: - log.warning("Archive: failed to log run complete", exc_info=True) + _run_after_iteration(after_iteration, iteration, agent_state, episodes) + recorder.record_complete() log.info("Loop complete — final state: %s", state_id.combined_hash[:12]) return agent_state, state_id diff --git a/clawloop/core/runner.py b/clawloop/core/runner.py new file mode 100644 index 00000000..69dcc965 --- /dev/null +++ b/clawloop/core/runner.py @@ -0,0 +1,64 @@ +"""Episode-collection helper extracted from ``learning_loop``. + +The runner owns the per-iteration task-sampling + adapter-dispatch protocol. +It is deliberately a thin wrapper: the loop still owns iteration control flow, +and the runner just encapsulates the "get me ``n_episodes`` rollouts" step so +``learning_loop`` stays readable end-to-end. +""" + +from __future__ import annotations + +import random +from typing import Any + +from clawloop.core.episode import Episode + + +class EpisodeCollectorRunner: + """Samples tasks and dispatches to the adapter to collect a batch of episodes. + + Parameters + ---------- + adapter: + Environment adapter. Must expose ``run_episode(task, agent_state)`` and + may optionally expose ``run_batch(agent_state, tasks)`` for a faster + path. Typed as ``Any`` to avoid a circular import with the + ``AdapterLike`` Protocol, which lives in ``loop.py``. + """ + + def __init__(self, adapter: Any) -> None: + self._adapter = adapter + + def collect( + self, + agent_state: Any, + tasks: list[Any], + n_episodes: int, + ) -> list[Episode]: + """Return ``n_episodes`` rollouts against ``tasks``. + + Behavior mirrors the block previously inlined in ``learning_loop``: + + * Empty ``tasks`` or ``n_episodes <= 0`` returns ``[]`` without + touching the adapter. + * Samples without replacement when ``n_episodes <= len(tasks)``, + with replacement otherwise. + * Prefers ``adapter.run_batch(agent_state, selected_tasks)`` when + available; otherwise falls back to a per-task ``run_episode`` loop. + """ + if not tasks or n_episodes <= 0: + return [] + + if n_episodes <= len(tasks): + selected_tasks = random.sample(tasks, n_episodes) + else: + selected_tasks = random.choices(tasks, k=n_episodes) + + run_batch = getattr(self._adapter, "run_batch", None) + if callable(run_batch): + return run_batch(agent_state, selected_tasks) + + episodes: list[Episode] = [] + for task in selected_tasks: + episodes.append(self._adapter.run_episode(task, agent_state)) + return episodes diff --git a/clawloop/core/transaction.py b/clawloop/core/transaction.py new file mode 100644 index 00000000..10e5236b --- /dev/null +++ b/clawloop/core/transaction.py @@ -0,0 +1,180 @@ +"""Two-phase fb→optim→rollback layer transaction. + +Extracted from ``learning_loop`` — this module owns the subtle +invariants of the per-iteration training protocol: + +1. Build one ``Datum`` per layer name. +2. Run ``forward_backward`` on every active layer, clearing pending + state on error/skipped results. The harness gets an intensity-gate + short-circuit that records ``status="skipped"`` without calling fb. +3. Track paradigm shifts onto ``agent_state.tried_paradigms`` *before* + optim drains ``_pending``. +4. Snapshot every layer that had ok fb. If snapshotting fails, + ``clear_pending_state`` fires on every snapshotted layer and optim is + skipped entirely. +5. Run ``optim_step`` on every snapshotted layer. On the first error or + exception, roll back *every* snapshotted layer (even those whose + optim hadn't run yet) to its pre-optim state. +""" + +from __future__ import annotations + +import copy +import logging +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from clawloop.core.episode import Episode +from clawloop.core.intensity import AdaptiveIntensity +from clawloop.core.types import Datum, FBResult +from clawloop.learning_layers.harness import Harness + +if TYPE_CHECKING: + from clawloop.core.loop import AgentState + +log = logging.getLogger(__name__) + + +@dataclass +class TransactionResult: + """Outcome of one ``LayerTransaction.run()`` call.""" + + fb_results: dict[str, FBResult] + optim_failed: bool + + +class LayerTransaction: + """One iteration's fb→optim→rollback protocol across active layers.""" + + def __init__( + self, + layers: list[tuple[str, Any]], + intensity: AdaptiveIntensity | None, + episodes: list[Episode], + agent_state: "AgentState", + ) -> None: + self._layers = layers + self._intensity = intensity + self._episodes = episodes + self._agent_state = agent_state + + def run(self, iteration: int) -> TransactionResult: + """Execute the transaction and return fb_results + optim_failed flag.""" + layer_datums: dict[str, Datum] = { + "harness": Datum(episodes=self._episodes), + "weights": Datum(episodes=self._episodes), + "router": Datum(episodes=self._episodes), + } + fb_results = self._forward_backward(iteration, layer_datums) + self._track_paradigm_shifts(fb_results) + optim_failed = self._optim_with_rollback(fb_results) + return TransactionResult(fb_results=fb_results, optim_failed=optim_failed) + + def _forward_backward( + self, + iteration: int, + layer_datums: dict[str, Datum], + ) -> dict[str, FBResult]: + fb_results: dict[str, FBResult] = {} + for name, layer in self._layers: + if ( + name == "harness" + and self._intensity is not None + and not self._intensity.should_reflect(iteration) + ): + log.info(" skipping harness fb (adaptive intensity)") + fb_results[name] = FBResult(status="skipped") + continue + if name in layer_datums: + datum = layer_datums[name] + else: + log.warning(" unknown layer %s — using all episodes as fallback", name) + datum = Datum(episodes=self._episodes) + + should_clear = False + try: + fb_result = layer.forward_backward(datum).result() + fb_results[name] = fb_result + if fb_result.status in ("error", "skipped"): + should_clear = True + except Exception: + log.exception("forward_backward failed for %s", name) + fb_results[name] = FBResult(status="error") + should_clear = True + + if should_clear: + try: + layer.clear_pending_state() + except Exception: + log.exception("Failed to clear pending for %s", name) + + for name, result in fb_results.items(): + log.info(" fb %s: %s %s", name, result.status, result.metrics) + return fb_results + + def _track_paradigm_shifts(self, fb_results: dict[str, FBResult]) -> None: + """Append paradigm-tagged pending insights to ``tried_paradigms``. + + Must run before optim drains ``_pending`` — otherwise the insights + we need to remember have already been consumed. + """ + harness_fb = fb_results.get("harness") + if ( + harness_fb is not None + and harness_fb.metrics.get("paradigm_shifted") + and isinstance(self._agent_state.harness, Harness) + ): + for insight in self._agent_state.harness.pending_paradigm_insights(): + self._agent_state.tried_paradigms.append(insight.content) + + def _optim_with_rollback(self, fb_results: dict[str, FBResult]) -> bool: + layers_to_optim = [ + (name, layer) + for name, layer in self._layers + if fb_results.get(name, FBResult(status="error")).status not in ("error", "skipped") + ] + + snapshots: dict[str, dict[str, Any]] = {} + try: + for name, layer in layers_to_optim: + snapshots[name] = copy.deepcopy(layer.to_dict()) + except Exception: + log.exception("Snapshot failed — skipping optim this iteration") + for name, layer in layers_to_optim: + try: + layer.clear_pending_state() + except Exception: + log.exception("Failed to clear pending for %s", name) + return False + + optim_failed = False + for name, layer in layers_to_optim: + try: + result = layer.optim_step().result() + log.info( + " optim %s: %s, %d updates", + name, + result.status, + result.updates_applied, + ) + if result.status == "error": + optim_failed = True + log.error(" optim %s returned error — triggering rollback", name) + break + except Exception: + log.exception("optim_step failed for %s — triggering rollback", name) + optim_failed = True + break + + if optim_failed: + log.warning(" rolling back all layers to pre-optim state") + for name, layer in layers_to_optim: + if name in snapshots: + try: + lr = layer.load_state(snapshots[name]).result() + if lr.status != "ok": + log.error(" rollback returned %s for %s", lr.status, name) + except Exception: + log.exception(" rollback failed for %s", name) + + return optim_failed diff --git a/clawloop/learning_layers/harness.py b/clawloop/learning_layers/harness.py index e42a5f33..800b703a 100644 --- a/clawloop/learning_layers/harness.py +++ b/clawloop/learning_layers/harness.py @@ -823,6 +823,10 @@ def set_evolver_context(self, ctx: Any) -> None: """Set the EvolverContext for the next forward_backward call.""" self._evolver_context = ctx + def pending_paradigm_insights(self) -> list[Insight]: + """Return pending insights tagged ``paradigm`` (not yet drained by optim).""" + return [i for i in self._pending.insights if "paradigm" in (i.tags or [])] + def _build_snapshot(self) -> Any: """Build a HarnessSnapshot from current state for the Evolver.""" from clawloop.core.evolver import HarnessSnapshot diff --git a/tests/unit/core/test_archive_recorder.py b/tests/unit/core/test_archive_recorder.py new file mode 100644 index 00000000..d59966aa --- /dev/null +++ b/tests/unit/core/test_archive_recorder.py @@ -0,0 +1,323 @@ +"""Unit tests for ``clawloop.core.archive_recorder.ArchiveRecorder``. + +The recorder is a thin wrapper around an ``ArchiveStore`` that owns the +run-level counters (run_id, best_reward, initial_reward, total_cost, +prev_variant_hash) so ``learning_loop`` doesn't have to. These tests pin +its contract: what it writes, when, and that archive exceptions are +caught and logged rather than propagated. +""" + +from __future__ import annotations + +import logging +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from clawloop.archive.schema import ( + AgentVariant, + EpisodeRecord, + IterationRecord, + RunRecord, +) +from clawloop.core.archive_recorder import ArchiveRecorder +from clawloop.core.loop import AgentState +from clawloop.core.state import StateID +from clawloop.core.types import FBResult + + +class _RecordingStore: + """Minimal ArchiveStore that remembers every write for inspection.""" + + def __init__(self) -> None: + self.runs: list[RunRecord] = [] + self.iterations: list[IterationRecord] = [] + self.episode_batches: list[list[EpisodeRecord]] = [] + self.variants: list[AgentVariant] = [] + self.completions: list[tuple[str, float, float, int]] = [] + + def log_run_start(self, run: RunRecord) -> None: + self.runs.append(run) + + def log_iteration(self, iteration: IterationRecord) -> None: + self.iterations.append(iteration) + + def log_episodes(self, episodes: list[EpisodeRecord]) -> None: + self.episode_batches.append(list(episodes)) + + def log_variant(self, variant: AgentVariant) -> None: + self.variants.append(variant) + + def log_run_complete( + self, + run_id: str, + best_reward: float, + improvement_delta: float, + total_cost_tokens: int = 0, + ) -> None: + self.completions.append((run_id, best_reward, improvement_delta, total_cost_tokens)) + + +def _mk_episode(episode_id: str = "ep1", task_id: str = "t1", reward: float = 0.5) -> Any: + """Minimal duck-typed Episode stub — recorder only touches documented fields.""" + ep = MagicMock() + ep.id = episode_id + ep.task_id = task_id + ep.bench = "unit" + ep.model = "test-model" + ep.messages = [] + ep.created_at = 1234567.0 + ep.summary.total_reward = reward + ep.summary.normalized_reward.return_value = reward + ep.summary.signals = {} + ep.summary.token_usage = None + ep.summary.timing = None + ep.n_steps.return_value = 3 + return ep + + +def test_init_writes_run_start_and_initial_variant() -> None: + store = _RecordingStore() + agent_state = AgentState() + recorder = ArchiveRecorder( + store, agent_state, bench="unit-bench", domain_tags=["a", "b"], n_iterations=5 + ) + + assert len(store.runs) == 1 + run = store.runs[0] + assert run.bench == "unit-bench" + assert run.domain_tags == ["a", "b"] + assert run.n_iterations == 5 + assert run.best_reward == 0.0 + assert run.improvement_delta == 0.0 + assert run.total_cost_tokens == 0 + assert run.parent_run_id is None + assert run.completed_at is None + assert run.run_id # non-empty + # run_id is stable and exposed so the loop doesn't regenerate + assert recorder.run_id == run.run_id + + assert len(store.variants) == 1 + variant = store.variants[0] + assert variant.variant_hash == run.config_hash + assert variant.first_seen_run_id == run.run_id + + +def test_archive_none_uses_null_store_silently() -> None: + # archive=None should fall back to NullArchiveStore — no errors, no writes + recorder = ArchiveRecorder(None, AgentState(), bench="b", domain_tags=None, n_iterations=1) + recorder.record_episodes(0, [_mk_episode()]) + recorder.record_complete() + # Nothing to assert except the absence of exceptions. + assert recorder.run_id # still generated + + +def test_record_episodes_writes_one_batch_with_correct_fields() -> None: + store = _RecordingStore() + recorder = ArchiveRecorder(store, AgentState(), bench="b", domain_tags=None, n_iterations=1) + + ep_a = _mk_episode("ep-a", "task-a", reward=0.7) + ep_b = _mk_episode("ep-b", "task-b", reward=0.3) + recorder.record_episodes(iteration=2, episodes=[ep_a, ep_b]) + + assert len(store.episode_batches) == 1 + batch = store.episode_batches[0] + assert len(batch) == 2 + assert [r.episode_id for r in batch] == ["ep-a", "ep-b"] + assert all(r.iteration_num == 2 for r in batch) + assert all(r.run_id == recorder.run_id for r in batch) + assert batch[0].messages_ref == "traces/ep-a.json" + + +def test_record_episodes_noop_on_empty_list() -> None: + store = _RecordingStore() + recorder = ArchiveRecorder(store, AgentState(), bench="b", domain_tags=None, n_iterations=1) + + recorder.record_episodes(iteration=0, episodes=[]) + + # record_episodes with no episodes must not emit an empty batch + assert store.episode_batches == [] + + +def test_record_iteration_writes_iteration_record_and_no_variant_when_unchanged() -> None: + store = _RecordingStore() + agent_state = AgentState() + recorder = ArchiveRecorder(store, agent_state, bench="b", domain_tags=None, n_iterations=3) + + initial_variant_count = len(store.variants) + state_id = agent_state.state_id() + fb_results = {"harness": FBResult(status="ok", metrics={"tokens_used": 100})} + ep = _mk_episode() + + recorder.record_iteration( + iteration=0, + agent_state=agent_state, + state_id=state_id, + fb_results=fb_results, + episodes=[ep], + avg_reward=0.5, + prev_avg_reward=0.0, + ) + + assert len(store.iterations) == 1 + it = store.iterations[0] + assert it.iteration_num == 0 + assert it.run_id == recorder.run_id + assert it.mean_reward == 0.5 + assert it.reward_delta == 0.5 + assert it.cost_tokens == 100 + assert it.evolver_action == {"harness": {"tokens_used": 100}} + # Variant hasn't changed — no new variant row + assert len(store.variants) == initial_variant_count + + +def test_record_iteration_writes_new_variant_when_config_changes() -> None: + store = _RecordingStore() + agent_state = AgentState() + recorder = ArchiveRecorder(store, agent_state, bench="b", domain_tags=None, n_iterations=1) + initial_variant_count = len(store.variants) + + agent_state.harness.system_prompts["main"] = "you are a changed agent" + + recorder.record_iteration( + iteration=0, + agent_state=agent_state, + state_id=agent_state.state_id(), + fb_results={}, + episodes=[_mk_episode()], + avg_reward=0.1, + prev_avg_reward=0.0, + ) + + assert len(store.variants) == initial_variant_count + 1 + new_variant = store.variants[-1] + assert new_variant.first_seen_run_id == recorder.run_id + + +def test_record_complete_writes_completion_with_accumulated_metrics() -> None: + store = _RecordingStore() + recorder = ArchiveRecorder(store, AgentState(), bench="b", domain_tags=None, n_iterations=2) + + recorder.record_iteration( + iteration=0, + agent_state=AgentState(), + state_id=AgentState().state_id(), + fb_results={"w": FBResult(status="ok", metrics={"tokens_used": 50})}, + episodes=[_mk_episode(reward=0.2)], + avg_reward=0.2, + prev_avg_reward=0.0, + ) + recorder.record_iteration( + iteration=1, + agent_state=AgentState(), + state_id=AgentState().state_id(), + fb_results={"w": FBResult(status="ok", metrics={"tokens_used": 75})}, + episodes=[_mk_episode(reward=0.8)], + avg_reward=0.8, + prev_avg_reward=0.2, + ) + + recorder.record_complete() + + assert len(store.completions) == 1 + run_id, best_reward, improvement_delta, total_cost = store.completions[0] + assert run_id == recorder.run_id + assert best_reward == 0.8 + assert improvement_delta == pytest.approx(0.8 - 0.2) # best - initial + assert total_cost == 125 + + +def test_record_complete_handles_zero_iterations() -> None: + # best_reward and initial_reward stay None if no iterations ran — must not crash + store = _RecordingStore() + recorder = ArchiveRecorder(store, AgentState(), bench="b", domain_tags=None, n_iterations=0) + + recorder.record_complete() + + assert len(store.completions) == 1 + run_id, best, delta, cost = store.completions[0] + assert best == 0.0 + assert delta == 0.0 + assert cost == 0 + + +def test_archive_exceptions_are_swallowed_with_warning(caplog: pytest.LogCaptureFixture) -> None: + class _BrokenStore(_RecordingStore): + def log_episodes(self, episodes: list[EpisodeRecord]) -> None: + raise RuntimeError("archive disk full") + + store = _BrokenStore() + recorder = ArchiveRecorder(store, AgentState(), bench="b", domain_tags=None, n_iterations=1) + + with caplog.at_level(logging.WARNING, logger="clawloop.core.archive_recorder"): + recorder.record_episodes(iteration=0, episodes=[_mk_episode()]) + + # Exception must not propagate; a warning must be emitted. + assert any("failed to log episodes" in rec.message.lower() for rec in caplog.records) + + +def test_run_start_exception_does_not_block_construction(caplog: pytest.LogCaptureFixture) -> None: + class _RunStartFails(_RecordingStore): + def log_run_start(self, run: RunRecord) -> None: + raise RuntimeError("db down") + + store = _RunStartFails() + with caplog.at_level(logging.WARNING, logger="clawloop.core.archive_recorder"): + recorder = ArchiveRecorder( + store, AgentState(), bench="b", domain_tags=None, n_iterations=1 + ) + + assert recorder.run_id # still usable + assert any("run start" in rec.message.lower() for rec in caplog.records) + + +def test_state_id_parameter_is_used_for_harness_snapshot_hash() -> None: + store = _RecordingStore() + agent_state = AgentState() + recorder = ArchiveRecorder(store, agent_state, bench="b", domain_tags=None, n_iterations=1) + + sid = StateID( + harness_hash="HHH", + router_hash="R", + weights_hash="W", + combined_hash="C", + created_at=0.0, + ) + recorder.record_iteration( + iteration=0, + agent_state=agent_state, + state_id=sid, + fb_results={}, + episodes=[], + avg_reward=0.0, + prev_avg_reward=0.0, + ) + + assert store.iterations[0].harness_snapshot_hash == "HHH" + + +def test_cost_tracking_survives_archive_exception() -> None: + # Regression: if log_iteration raises, tokens_used must still be accumulated + # onto total_cost_tokens. Cost tracking is independent of archive availability. + class _BrokenIter(_RecordingStore): + def log_iteration(self, iteration: IterationRecord) -> None: + raise RuntimeError("db down") + + store = _BrokenIter() + recorder = ArchiveRecorder(store, AgentState(), bench="b", domain_tags=None, n_iterations=1) + + recorder.record_iteration( + iteration=0, + agent_state=AgentState(), + state_id=AgentState().state_id(), + fb_results={"h": FBResult(status="ok", metrics={"tokens_used": 42})}, + episodes=[_mk_episode(reward=0.1)], + avg_reward=0.1, + prev_avg_reward=0.0, + ) + recorder.record_complete() + + assert len(store.completions) == 1 + _, _, _, total_cost = store.completions[0] + assert total_cost == 42 # accumulated despite log_iteration raising diff --git a/tests/unit/core/test_runner.py b/tests/unit/core/test_runner.py new file mode 100644 index 00000000..619155aa --- /dev/null +++ b/tests/unit/core/test_runner.py @@ -0,0 +1,153 @@ +"""Unit tests for ``clawloop.core.runner.EpisodeCollectorRunner``.""" + +from __future__ import annotations + +import random +from typing import Any + +import pytest + +from clawloop.core.runner import EpisodeCollectorRunner + + +def _mk_episode(task_id: str) -> Any: + """Sentinel stand-in for Episode. Runner never inspects episodes — it + only passes them through from adapter to caller — so a plain object + tagged with ``task_id`` is sufficient and keeps the test decoupled + from Episode's evolving required fields. + """ + return {"task_id": task_id} + + +class _PerTaskAdapter: + """Adapter exposing only ``run_episode`` (fallback path).""" + + def __init__(self) -> None: + self.calls: list[tuple[Any, Any]] = [] + + def run_episode(self, task: Any, agent_state: Any) -> Any: + self.calls.append((task, agent_state)) + return _mk_episode(str(task)) + + +class _BatchAdapter: + """Adapter exposing ``run_batch`` (fast path).""" + + def __init__(self) -> None: + self.calls: list[tuple[Any, list[Any]]] = [] + + def run_batch(self, agent_state: Any, tasks: list[Any]) -> list[Any]: + self.calls.append((agent_state, list(tasks))) + return [_mk_episode(str(t)) for t in tasks] + + +def test_empty_tasks_returns_empty_without_touching_adapter() -> None: + adapter = _PerTaskAdapter() + runner = EpisodeCollectorRunner(adapter) + assert runner.collect(agent_state=object(), tasks=[], n_episodes=3) == [] + assert adapter.calls == [] + + +def test_zero_n_episodes_returns_empty_without_touching_adapter() -> None: + adapter = _PerTaskAdapter() + runner = EpisodeCollectorRunner(adapter) + assert runner.collect(agent_state=object(), tasks=["a", "b"], n_episodes=0) == [] + assert adapter.calls == [] + + +def test_negative_n_episodes_returns_empty() -> None: + adapter = _PerTaskAdapter() + runner = EpisodeCollectorRunner(adapter) + assert runner.collect(agent_state=object(), tasks=["a"], n_episodes=-1) == [] + assert adapter.calls == [] + + +def test_sample_branch_uses_sample_without_replacement(monkeypatch: pytest.MonkeyPatch) -> None: + calls: dict[str, Any] = {} + + def fake_sample(pool: list[Any], k: int) -> list[Any]: + calls["sample"] = (list(pool), k) + return pool[:k] + + def fake_choices(pool: list[Any], k: int) -> list[Any]: # pragma: no cover — must not run + raise AssertionError("random.choices should not be called when n <= len(tasks)") + + monkeypatch.setattr(random, "sample", fake_sample) + monkeypatch.setattr(random, "choices", fake_choices) + + adapter = _PerTaskAdapter() + runner = EpisodeCollectorRunner(adapter) + episodes = runner.collect(agent_state="S", tasks=["a", "b", "c"], n_episodes=2) + + assert [ep["task_id"] for ep in episodes] == ["a", "b"] + assert calls["sample"] == (["a", "b", "c"], 2) + + +def test_choices_branch_used_when_n_exceeds_pool(monkeypatch: pytest.MonkeyPatch) -> None: + calls: dict[str, Any] = {} + + def fake_sample(pool: list[Any], k: int) -> list[Any]: # pragma: no cover + raise AssertionError("random.sample should not be called when n > len(tasks)") + + def fake_choices(pool: list[Any], *, k: int) -> list[Any]: + calls["choices"] = (list(pool), k) + return [pool[0]] * k + + monkeypatch.setattr(random, "sample", fake_sample) + monkeypatch.setattr(random, "choices", fake_choices) + + adapter = _PerTaskAdapter() + runner = EpisodeCollectorRunner(adapter) + episodes = runner.collect(agent_state="S", tasks=["x"], n_episodes=3) + + assert len(episodes) == 3 + assert calls["choices"] == (["x"], 3) + + +def test_prefers_run_batch_when_available(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(random, "sample", lambda pool, k: list(pool[:k])) + + adapter = _BatchAdapter() + runner = EpisodeCollectorRunner(adapter) + agent = object() + episodes = runner.collect(agent_state=agent, tasks=["a", "b", "c"], n_episodes=2) + + assert len(episodes) == 2 + assert len(adapter.calls) == 1 + called_agent, called_tasks = adapter.calls[0] + assert called_agent is agent + assert called_tasks == ["a", "b"] + + +def test_per_task_fallback_when_no_run_batch(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(random, "sample", lambda pool, k: list(pool[:k])) + + adapter = _PerTaskAdapter() + runner = EpisodeCollectorRunner(adapter) + agent = object() + episodes = runner.collect(agent_state=agent, tasks=["a", "b"], n_episodes=2) + + assert [ep["task_id"] for ep in episodes] == ["a", "b"] + assert [agent_state for _, agent_state in adapter.calls] == [agent, agent] + + +def test_non_callable_run_batch_attribute_falls_back(monkeypatch: pytest.MonkeyPatch) -> None: + """A non-callable ``run_batch`` attribute must not short-circuit the fallback.""" + monkeypatch.setattr(random, "sample", lambda pool, k: list(pool[:k])) + + class _WeirdAdapter: + run_batch = "not-a-callable" + + def __init__(self) -> None: + self.calls: list[tuple[Any, Any]] = [] + + def run_episode(self, task: Any, agent_state: Any) -> Any: + self.calls.append((task, agent_state)) + return _mk_episode(str(task)) + + adapter = _WeirdAdapter() + runner = EpisodeCollectorRunner(adapter) + episodes = runner.collect(agent_state="S", tasks=["a"], n_episodes=1) + + assert [ep["task_id"] for ep in episodes] == ["a"] + assert adapter.calls == [("a", "S")] diff --git a/tests/unit/core/test_transaction.py b/tests/unit/core/test_transaction.py new file mode 100644 index 00000000..caa17b9c --- /dev/null +++ b/tests/unit/core/test_transaction.py @@ -0,0 +1,233 @@ +"""Unit tests for ``clawloop.core.transaction.LayerTransaction``. + +The transaction owns the two-phase ``forward_backward`` → ``optim_step`` +protocol with cross-layer rollback. These tests pin the invariants that +the ``learning_loop`` depended on: which layers run, when +``clear_pending_state`` fires, and under what conditions every layer is +rolled back to its pre-optim snapshot. +""" + +from __future__ import annotations + +from typing import Any + +from clawloop.core.loop import AgentState +from clawloop.core.transaction import LayerTransaction +from clawloop.core.types import Datum, FBResult, Future, LoadResult, OptimResult + + +class _StubLayer: + """Hand-rolled Layer stub — records call order and returns canned results. + + Using a stub (not MagicMock) so assertions read like a state machine: + who was called, with what, in what order. That is the exact contract + the rollback invariant depends on. + """ + + def __init__( + self, + fb_result: FBResult | None = None, + fb_raises: bool = False, + optim_result: OptimResult | None = None, + optim_raises: bool = False, + load_result: LoadResult | None = None, + load_raises: bool = False, + state: dict[str, Any] | None = None, + ) -> None: + self._fb_result = fb_result or FBResult(status="ok", metrics={}) + self._fb_raises = fb_raises + self._optim_result = optim_result or OptimResult(status="ok", updates_applied=1) + self._optim_raises = optim_raises + self._load_result = load_result or LoadResult(status="ok") + self._load_raises = load_raises + self._state = state or {"version": 1} + + self.fb_calls: list[Datum] = [] + self.optim_calls = 0 + self.load_calls: list[dict[str, Any]] = [] + self.clear_calls = 0 + + def forward_backward(self, datum: Datum) -> Future[FBResult]: + self.fb_calls.append(datum) + if self._fb_raises: + raise RuntimeError("fb boom") + return Future.immediate(self._fb_result) + + def optim_step(self) -> Future[OptimResult]: + self.optim_calls += 1 + if self._optim_raises: + raise RuntimeError("optim boom") + return Future.immediate(self._optim_result) + + def load_state(self, state: dict[str, Any]) -> Future[LoadResult]: + self.load_calls.append(state) + if self._load_raises: + raise RuntimeError("load boom") + return Future.immediate(self._load_result) + + def clear_pending_state(self) -> None: + self.clear_calls += 1 + + def to_dict(self) -> dict[str, Any]: + return dict(self._state) + + +def test_happy_path_runs_all_layers_and_no_rollback() -> None: + h, r, w = _StubLayer(), _StubLayer(), _StubLayer() + layers = [("harness", h), ("router", r), ("weights", w)] + tx = LayerTransaction(layers, intensity=None, episodes=[], agent_state=AgentState()) + + result = tx.run(iteration=0) + + assert result.optim_failed is False + assert set(result.fb_results.keys()) == {"harness", "router", "weights"} + assert all(r.status == "ok" for r in result.fb_results.values()) + # Every layer saw both fb and optim; nothing was rolled back or cleared + assert all(layer.optim_calls == 1 for layer in (h, r, w)) + assert all(layer.load_calls == [] for layer in (h, r, w)) + assert all(layer.clear_calls == 0 for layer in (h, r, w)) + + +def test_fb_exception_clears_pending_and_skips_that_layer_from_optim() -> None: + h = _StubLayer(fb_raises=True) + w = _StubLayer() + layers = [("harness", h), ("weights", w)] + tx = LayerTransaction(layers, intensity=None, episodes=[], agent_state=AgentState()) + + result = tx.run(iteration=0) + + # Harness fb raised → status=error, clear_pending_state called, no optim + assert result.fb_results["harness"].status == "error" + assert h.clear_calls == 1 + assert h.optim_calls == 0 + # Weights is independent and proceeds through optim normally + assert result.fb_results["weights"].status == "ok" + assert w.optim_calls == 1 + # The failing layer wasn't rolled back (it never reached optim) + assert h.load_calls == [] + assert result.optim_failed is False + + +def test_fb_skipped_status_also_clears_pending() -> None: + # fb_result.status == "skipped" must trigger clear_pending_state + # (matches the original in-loop behavior where skipped fb also clears) + h = _StubLayer(fb_result=FBResult(status="skipped")) + layers = [("harness", h)] + tx = LayerTransaction(layers, intensity=None, episodes=[], agent_state=AgentState()) + + tx.run(iteration=0) + + assert h.clear_calls == 1 + assert h.optim_calls == 0 + + +def test_optim_error_rolls_back_all_snapshotted_layers() -> None: + # When any optim returns status="error", EVERY layer that had a + # successful fb gets load_state called with its pre-optim snapshot. + h = _StubLayer(state={"v": "h-before"}) + r = _StubLayer( + state={"v": "r-before"}, + optim_result=OptimResult(status="error"), # trigger rollback + ) + w = _StubLayer(state={"v": "w-before"}) + layers = [("harness", h), ("router", r), ("weights", w)] + tx = LayerTransaction(layers, intensity=None, episodes=[], agent_state=AgentState()) + + result = tx.run(iteration=0) + + assert result.optim_failed is True + # Rollback fires for every layer that was snapshotted — even weights, + # whose optim never ran because the loop broke on router's error + assert h.load_calls == [{"v": "h-before"}] + assert r.load_calls == [{"v": "r-before"}] + assert w.load_calls == [{"v": "w-before"}] + + +def test_optim_exception_rolls_back_all_snapshotted_layers() -> None: + # Same rollback contract when optim_step raises (not just returns error) + h = _StubLayer(state={"v": "h"}) + r = _StubLayer(state={"v": "r"}, optim_raises=True) + layers = [("harness", h), ("router", r)] + tx = LayerTransaction(layers, intensity=None, episodes=[], agent_state=AgentState()) + + result = tx.run(iteration=0) + + assert result.optim_failed is True + assert h.load_calls == [{"v": "h"}] + assert r.load_calls == [{"v": "r"}] + + +def test_snapshot_failure_skips_optim_and_clears_all() -> None: + # If deepcopy(layer.to_dict()) raises, optim must not run for anyone, + # and every layer that had ok fb gets clear_pending_state called. + class _BadToDict(_StubLayer): + def to_dict(self) -> dict[str, Any]: + raise RuntimeError("cannot serialize") + + h = _BadToDict() + w = _StubLayer() + layers = [("harness", h), ("weights", w)] + tx = LayerTransaction(layers, intensity=None, episodes=[], agent_state=AgentState()) + + result = tx.run(iteration=0) + + assert h.optim_calls == 0 + assert w.optim_calls == 0 + assert h.clear_calls == 1 + assert w.clear_calls == 1 + # No rollback happened because no optim ran + assert h.load_calls == [] + assert w.load_calls == [] + # optim_failed stays False — this is a snapshot failure, not an optim failure + assert result.optim_failed is False + + +def test_intensity_skips_harness_reflection_but_other_layers_proceed() -> None: + class _NoReflect: + def should_reflect(self, iteration: int) -> bool: + return False + + h = _StubLayer() + w = _StubLayer() + layers = [("harness", h), ("weights", w)] + tx = LayerTransaction(layers, intensity=_NoReflect(), episodes=[], agent_state=AgentState()) + + result = tx.run(iteration=3) + + # Harness fb was skipped entirely — no call, no clear (clear fires on + # error/skipped result; "intensity gate" short-circuits before fb runs + # and records skipped status) + assert h.fb_calls == [] + assert result.fb_results["harness"].status == "skipped" + assert h.optim_calls == 0 + # Weights proceeds normally + assert w.fb_calls != [] + assert w.optim_calls == 1 + + +def test_paradigm_shift_on_harness_fb_mutates_tried_paradigms() -> None: + # When harness fb returns metrics["paradigm_shifted"]=True and the + # harness has _pending insights tagged "paradigm", those contents + # must be appended to agent_state.tried_paradigms BEFORE optim runs + # (optim drains _pending, so ordering matters). + agent_state = AgentState() + # Seed _pending with a paradigm-tagged insight via the public helper + from clawloop.learning_layers.harness import Insight, _HarnessPending + + agent_state.harness._pending = _HarnessPending( + insights=[Insight(content="shift-to-x", tags=["paradigm"])], + ) + assert [i.content for i in agent_state.harness.pending_paradigm_insights()] == ["shift-to-x"] + + class _HarnessShiftLayer(_StubLayer): + def __init__(self) -> None: + super().__init__(fb_result=FBResult(status="ok", metrics={"paradigm_shifted": True})) + + # The transaction must resolve paradigm tracking against + # agent_state.harness, not the stub layer passed as ("harness", ...). + layers = [("harness", _HarnessShiftLayer())] + tx = LayerTransaction(layers, intensity=None, episodes=[], agent_state=agent_state) + + tx.run(iteration=0) + + assert "shift-to-x" in agent_state.tried_paradigms