Skip to content

Commit beddc97

Browse files
committed
feat: address optimizations
1 parent c50b52f commit beddc97

30 files changed

Lines changed: 421 additions & 394 deletions

docs/optimizations.md

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
- **PY `ReactiveMapBundle` parity — `.get(key)`, `.has(key)`, `.size` (noted 2026-04-07):**
1010
- **Level A: DONE (2026-04-07).** Added `.get(key)`, `.has(key)`, `.size` to PY `ReactiveMapBundle` matching TS signatures. PY harness `strategy.py` updated to use `.get(key)` instead of Versioned navigation.
11-
- **Level B: Deferred (post-1.0).** `ReactiveMapBundle.node` (TS) / `.data` (PY) emits `Versioned<{ map: ReadonlyMap<K,V> }>` / `Versioned(version, MappingProxyType)`. The `Versioned` wrapper is a protocol optimization (efficient RESOLVED dedup via version comparison) that leaks into composition code when using the node as a derived dep. **Proposed fix:** `.node` / `.data` emits the unwrapped map directly; version-based equality handled internally via `equals` option on the state node. Consumers see `ReadonlyMap<K,V>` / `MappingProxyType`, not `Versioned`. Breaking change — defer to post-1.0 audit of all `Versioned` usage.
11+
- **Level B: DONE (PY, 2026-04-07).** Removed `Versioned` wrapper from all reactive bundle APIs (ReactiveMap, ReactiveLog, ReactiveList, ReactiveIndex). `.data` / `.entries` / `.items` / `.ordered` now emit unwrapped domain types (`MappingProxyType`, `tuple`, etc.). Internal version counter drives efficient equality without leaking into composition code (spec §5.12). All downstream consumers updated (messaging, cqrs, ai, domain-templates, composite, harness/strategy).
1212

1313
- **Whole-repo `emit``down` audit + `up` / backpressure / `message_tier` sweep (all phases, noted 2026-04-07):**
1414
- **TS: DONE (2026-04-07).** Renames: `emitWithBatch``downWithBatch`, `_emitToSinks``_downToSinks`, `_emitAutoValue``_downAutoValue`, `_boundEmitToSinks``_boundDownToSinks`, `_emitSequential``_downSequential`, `emitLine``flushLine` (reactive-layout). Batch param `emit``sink`. `up()` audit: no asymmetries. `messageTier()` audit: already clean. `NodeActions.emit()` kept (different semantics from `actions.down()`). CQRS `CommandActions.emit()` kept (domain concept). Spec updated (`_emitAutoValue``_downAutoValue`).
@@ -42,38 +42,37 @@ Non-blocking items tracked for later. **Keep this section identical in both repo
4242

4343
| Item | Notes |
4444
|------|-------|
45-
| **`lastDepValues` + `Object.is` / referential equality (resolved 2026-03-31 — keep + document)** | Default `Object.is` identity check is correct for the common immutable-value case. The `node({ equals })` option already exists for custom comparison. Document clearly that mutable dep values should use a custom `equals` function. No code change needed. |
45+
| **`lastDepValues` + `Object.is` / referential equality (resolved 2026-03-31 — documented)** | Default identity check is correct for the common immutable-value case. The `node(equals=...)` option already exists for custom comparison. Mutable dep values should use a custom `equals` function. **Documented in `node()` docstring (2026-04-07).** |
4646
| **`sideEffects: false` in `package.json`** | TypeScript package only. Safe while the library has no import-time side effects. Revisit if global registration or polyfills are added at module load. |
47-
| **JSDoc / docstrings on `node()` and public APIs** | `docs/docs-guidance.md`: JSDoc on new TS exports; docstrings on new Python public APIs. |
47+
| **JSDoc / docstrings on `node()` and public APIs** | `docs/docs-guidance.md`: JSDoc on new TS exports; docstrings on new Python public APIs. `node()` equals guidance added (2026-04-07). `flat_map` ERROR behavior documented (2026-04-07). `from_redis_stream` COMPLETE/disconnect documented (2026-04-07). |
4848
| **Roadmap §0.3 checkboxes** | Mark Phase 0.3 items when the team agrees the milestone is complete. |
4949

5050
### Factory teardown — `dispose()` pattern (D1/D2, noted 2026-04-07)
5151

5252
| Item | Status | Notes |
5353
|------|--------|-------|
54-
| **Phase 4+ factories don't register internal nodes on the graph** | Proposed (2026-04-07) | `harness_loop` (and other factories) create internal nodes (`triage_node`, `_router` effect, `_retry_effect`, `execute_node`, `verify_node`, `with_latest_from` combinators, etc.) that are never added to the parent graph via `graph.add()`. When `graph.destroy()` fires, these nodes retain their subscriptions and closures — memory leak on repeated create/destroy cycles. Keepalive subscriptions (`_router.subscribe(...)`, `_retry_effect.subscribe(...)`) also leak because their unsub handles are discarded. |
55-
| **Proposed: `dispose()` convention** | Proposed (2026-04-07) | Factories collect all cleanup functions into a `_disposers: list[Callable[[], None]]` list. Override `destroy()` on the returned graph class to drain the list. Keepalive unsubs are just more entries in the same list. This is a Phase 0 primitive addition — a lightweight contract that any factory can adopt. Alternatives considered: (A) `graph.add()` all internal nodes — simple but pollutes the node registry with internals; (B) manual bookkeeping of unsub handles — fragile, easy to miss one. Option C (dispose list) is composable, private, and hard to get wrong. D2 (leaked keepalive handles) collapses into D1 — same list, same `destroy()` drain. |
54+
| **Phase 4+ factories don't register internal nodes on the graph** | **DONE (TS + PY, 2026-04-07)** | Added `Graph.addDisposer(fn)` / `Graph.add_disposer(fn)` — general-purpose disposer registration drained on `destroy()` **before** TEARDOWN signal. TS: Fixed `harnessLoop`, `strategyModel`, `agentMemory`, `feedback`, `gate`, `contentModerationGraph`, `funnel` bridge, `ChatStreamGraph`, `ToolRegistryGraph`. PY: Fixed `harness_loop`, `reduction.py`, `ChatStreamGraph`, `ToolRegistryGraph`, `AgentMemoryGraph`. Dead `_version` counter removed from all reactive bundles (TS + PY). |
5655

5756
### AI surface (Phase 4.4) — deferred optimizations
5857

5958
| Item | Status | Notes |
6059
|------|--------|-------|
61-
| **Re-indexes entire store on every change** | Deferred | Decision: diff-based indexing using `Versioned` snapshot version field to track indexed entries. Deferred to after Phase 6 — current N is small enough that full re-index is acceptable pre-1.0. |
60+
| **Re-indexes entire store on every change** | Deferred | Decision: diff-based indexing using internal version counter to track indexed entries. Deferred to after Phase 6 — current N is small enough that full re-index is acceptable pre-1.0. |
6261
| **Budget packing always includes first item** | Documented behavior | The retrieval budget packer always includes the first ranked result even if it exceeds `maxTokens`. This is intentional "never return empty" semantics — a query that matches at least one entry always returns something. Callers who need strict budget enforcement should post-filter. |
6362
| **Retrieval pipeline auto-wires when vectors/KG enabled** | Documented behavior | When `embedFn` or `enableKnowledgeGraph` is set, the retrieval pipeline automatically wires vector search and KG expansion into the retrieval derived node. There is no explicit opt-in/opt-out per retrieval stage — the presence of the capability implies its use. Callers who need selective retrieval should use the individual nodes directly. |
6463

6564
### Tier 2 extra operators — deferred semantics
6665

6766
| Item | Status | Notes |
6867
|------|--------|-------|
69-
| **`mergeMap` / `merge_map` + `ERROR`** | Documented limitation (2026-03-31) | When the outer stream or one inner emits `ERROR`, other inner subscriptions may keep running until they complete or unsubscribe. Rx-style "first error cancels all sibling inners" is **not** specified or implemented. Current behavior (inner errors don't cascade) is arguably more useful for parallel work — no change needed. Document in JSDoc/docstrings. |
68+
| **`mergeMap` / `merge_map` + `ERROR`** | **Documented (PY docstring, 2026-04-07)** | Inner errors propagate downstream but do not cancel sibling inners. Outer ERROR cancels all inners. Current behavior is intentional for parallel work. **Documented in `flat_map` docstring.** |
7069

7170
### Ingest adapters — deferred items
7271

7372
| Item | Status | Notes |
7473
|------|--------|-------|
75-
| **`fromRedisStream` / `from_redis_stream` never emits COMPLETE** | Documented limitation (2026-04-03) | Long-lived stream consumers intentionally never complete. The consumer loop runs until teardown. This is expected behavior for persistent stream sources (same as Kafka). Document in JSDoc/docstrings. |
76-
| **`fromRedisStream` / `from_redis_stream` does not disconnect client** | Documented limitation (2026-04-03) | The caller owns the Redis client lifecycle. The adapter does not call `disconnect()` on teardown — the caller is responsible for closing the connection. Same contract as `fromKafka` (caller owns `consumer.connect()`/`disconnect()`). |
74+
| **`fromRedisStream` / `from_redis_stream` never emits COMPLETE** | **Documented (PY docstring, 2026-04-07)** | Long-lived stream consumers intentionally never complete. **Documented in `from_redis_stream` docstring.** |
75+
| **`fromRedisStream` / `from_redis_stream` does not disconnect client** | **Documented (PY docstring, 2026-04-07)** | The caller owns the Redis client lifecycle. **Documented in `from_redis_stream` docstring.** |
7776
| **PY `from_csv` / `from_ndjson` thread not joined on cleanup** | Documented limitation (2026-04-03) | Python file-ingest adapters run in a daemon thread. On teardown, `active[0] = False` signals the thread to exit but does not `join()` it. The daemon flag ensures the thread does not block process exit. A future optimization could add optional `join(timeout)` on cleanup for stricter resource control. |
7877

7978
### Intentional cross-language divergences

src/graphrefly/core/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
NodeImpl,
2727
NodeStatus,
2828
SubscribeHints,
29-
cleanup_result,
3029
node,
3130
)
3231
from graphrefly.core.protocol import (

src/graphrefly/core/node.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -976,6 +976,14 @@ def node(
976976
**kwargs: Any option key accepted by :class:`NodeImpl` (e.g. ``name``,
977977
``initial``, ``equals``, ``guard``, ``thread_safe``).
978978
979+
**Equality (``equals``):** The default ``equals`` uses identity (``is``)
980+
for immutable values. If your node produces *mutable* dep values
981+
(dicts, lists) that are modified in place, provide a custom ``equals``
982+
function — otherwise downstream nodes may skip updates because the
983+
identity check sees the *same* object::
984+
985+
node([source], compute_fn, equals=lambda a, b: a == b)
986+
979987
Returns:
980988
A new :class:`NodeImpl` instance.
981989

src/graphrefly/extra/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@
110110
ReactiveListBundle,
111111
ReactiveLogBundle,
112112
ReactiveMapBundle,
113-
Versioned,
114113
log_slice,
115114
pubsub,
116115
reactive_index,
@@ -214,7 +213,6 @@
214213
"ReactiveListBundle",
215214
"ReactiveLogBundle",
216215
"ReactiveMapBundle",
217-
"Versioned",
218216
"NS_PER_MS",
219217
"NS_PER_SEC",
220218
"CacheTier",

src/graphrefly/extra/adapters.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1848,6 +1848,15 @@ def from_redis_stream(
18481848
18491849
Uses XREAD with BLOCK to reactively consume stream entries.
18501850
1851+
**Lifecycle notes:**
1852+
1853+
- This source intentionally **never emits** ``COMPLETE``. Long-lived
1854+
stream consumers are expected to run indefinitely; completion would
1855+
tear down downstream operators prematurely.
1856+
- The caller **owns the Redis client lifecycle**. ``from_redis_stream``
1857+
does not close or disconnect the client on teardown — call
1858+
``client.close()`` yourself when the graph is destroyed.
1859+
18511860
Args:
18521861
client: Redis client instance with ``xread`` method (caller owns connection).
18531862
key: Redis stream key.

src/graphrefly/extra/composite.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,9 @@ class DistillBundle:
113113

114114

115115
def _snapshot_map(store: ReactiveMapBundle) -> Mapping[str, Any]:
116-
snap = store.data.get()
116+
snap = store.entries.get()
117117
if snap is None:
118118
return {}
119-
if hasattr(snap, "value"):
120-
return cast("Mapping[str, Any]", snap.value)
121119
return cast("Mapping[str, Any]", snap)
122120

123121

@@ -218,18 +216,18 @@ def _delete_keys(keys: list[str]) -> None:
218216
)
219217

220218
compact = derived(
221-
[store.data, context_node],
219+
[store.entries, context_node],
222220
lambda deps, _a: _pack_compact(
223-
deps[0].value if hasattr(deps[0], "value") else deps[0],
221+
deps[0],
224222
deps[1],
225223
score,
226224
cost,
227225
budget,
228226
),
229227
)
230228
size = derived(
231-
[store.data],
232-
lambda deps, _a: len(deps[0].value if hasattr(deps[0], "value") else deps[0]),
229+
[store.entries],
230+
lambda deps, _a: len(deps[0]) if deps[0] is not None else 0,
233231
initial=0,
234232
)
235233
compact.subscribe(lambda _msgs: None)
@@ -260,8 +258,9 @@ def _compute_evictions(
260258
evict: Callable[[str, Any], Any],
261259
) -> list[str]:
262260
out: list[str] = []
263-
current = get(store.data)
264-
snapshot = current.value if hasattr(current, "value") else current
261+
snapshot = get(store.entries)
262+
if snapshot is None:
263+
return out
265264
for key, mem in snapshot.items():
266265
verdict = evict(key, mem)
267266
if isinstance(verdict, Node):

0 commit comments

Comments
 (0)