feat: add WebSocket subscription support for consensus events#129
feat: add WebSocket subscription support for consensus events#129AgustinRamiroDiaz wants to merge 14 commits intomainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a WebSocket-based consensus subscription layer, a low-level genCall RPC and tests, updates ABIs/decoders to rename and extend transaction fields (e.g., Changes
Sequence DiagramsequenceDiagram
participant Client as GenLayerClient
participant SubActions as subscriptionActions
participant WsClient as WebSocket PublicClient
participant Contract as Consensus Contract
participant EventStream as AsyncIterator
Client->>SubActions: subscribeToX()
SubActions->>SubActions: validate WS URL & consensus contract
SubActions->>WsClient: getOrCreateWsClient()
WsClient-->>SubActions: return cached/new ws client
SubActions->>WsClient: watchContractEvent(eventName)
Contract-->>WsClient: emits log
WsClient->>SubActions: forward log
SubActions->>EventStream: decode & enqueue event
Client->>EventStream: next()
EventStream-->>Client: deliver typed event
Client->>EventStream: unsubscribe()
EventStream->>WsClient: teardown watcher
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/types/transactions.ts (1)
275-315:⚠️ Potential issue | 🟡 Minor
txReceiptfield inGenLayerTransactionis no longer populated after the rename toeqBlocksOutputs.
GenLayerRawTransactionreplacedtxReceiptwitheqBlocksOutputs, butGenLayerTransaction(line 208) still declarestxReceipt?: Hash. ThedecodeTransactionspread (...tx) passes througheqBlocksOutputsbut does not map it totxReceipt, so consumers readingtxReceiptfrom a decoded transaction will getundefined.Consider either:
- Adding a backward-compat alias in
decodeTransaction:txReceipt: tx.eqBlocksOutputs- Or removing
txReceiptfromGenLayerTransactionif it's intentionally deprecated.
🤖 Fix all issues with AI agents
In `@src/types/clients.ts`:
- Around line 11-19: Replace the relative import for the subscription types with
the project path alias so the import that currently brings in
ConsensusEventStream, NewTransactionEvent, TransactionAcceptedEvent,
TransactionActivatedEvent, TransactionUndeterminedEvent,
TransactionLeaderTimeoutEvent, and AppealStartedEvent from "./subscriptions"
should instead import from the alias path (e.g., "@/types/subscriptions") so it
uses the `@/` module alias per the coding guidelines.
In `@src/types/subscriptions.ts`:
- Around line 1-2: Update the two imports in src/types/subscriptions.ts to use
the project path alias instead of relative paths: replace import {Address} from
"./accounts" and import {TransactionHash} from "./transactions" with their
aliased equivalents (e.g., import {Address} from "@/types/accounts" and import
{TransactionHash} from "@/types/transactions") so they follow the "@/..."
convention.
In `@tests/genCall.test.ts`:
- Around line 1-65: Update the top imports to use the project path alias (change
imports of createClient, localnet, Address, createAccount, generatePrivateKey,
GenCallStatusCode, zeroAddress to use "@/..." style) so all src references use
the "@/..." alias, and ensure the global fetch stub is cleaned up after the test
suite by calling vi.unstubGlobal("fetch") (or vi.restoreAllMocks plus
vi.unstubGlobal("fetch")) in the afterEach/afterAll so the stubbed mockFetch and
vi.stubGlobal("fetch") do not leak to other tests; locate usages by the symbols
mockFetch and vi.stubGlobal("fetch") and adjust the afterEach/afterAll
accordingly.
In `@tests/subscriptions.test.ts`:
- Around line 2-8: Update the import paths in tests/subscriptions.test.ts to use
the configured path alias instead of relative ../src paths: replace imports like
"../src/subscriptions/actions" and "../src/types" with "@/subscriptions/actions"
and "@/types" respectively (so symbols WebSocketNotConfiguredError,
ConsensusContractNotInitializedError, subscriptionActions, GenLayerClient,
GenLayerChain are imported via the `@/` alias). Ensure all other test imports
follow the same alias convention.
🧹 Nitpick comments (1)
src/client/client.ts (1)
86-96: Mutating a potentially shared chain config object.When
config.chainis not provided,chainConfigreferences the module-levellocalnetsingleton. Lines 89 and 91-96 mutaterpcUrls.defaultin-place, which means multiplecreateClient()calls would clobber each other's endpoint settings on the shared object.This is a pre-existing issue (line 89), but the new WebSocket path at lines 91-96 extends the mutation surface. Consider cloning the chain config upfront:
♻️ Suggested defensive clone
export const createClient = (config: ClientConfig = {chain: localnet}): GenLayerClient<GenLayerChain> => { - const chainConfig = config.chain || localnet; + const chainConfig = structuredClone(config.chain || localnet); if (config.endpoint) { chainConfig.rpcUrls.default.http = [config.endpoint]; }
c32b139 to
fc610b5
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/client/client.ts`:
- Around line 18-19: Update the import to use the project path alias: replace
the relative import of subscriptionActions ("../subscriptions/actions") with the
alias form "@/subscriptions/actions" in src/client/client.ts so the
subscriptionActions symbol is imported via the `@/` path; ensure the
GenLayerClient/GenLayerChain imports remain unchanged.
In `@src/subscriptions/actions.ts`:
- Around line 108-147: handle the fatal WS error inside handleError by marking
the stream as terminated and ensuring any future calls to
stream[Symbol.asyncIterator]().next return done: true: set isUnsubscribed (or a
new flag like isTerminated) to true, call unwatch to stop the wsClient
subscription, clear/resolve/reject any pending resolveNext/rejectNext with the
error, and ensure the next() implementation checks isTerminated/isUnsubscribed
and returns Promise.resolve({value: undefined as any, done: true}) instead of
creating a new pending promise; update references to handleError, unwatch,
lastError, resolveNext/rejectNext and the stream[Symbol.asyncIterator]().next
logic accordingly.
🧹 Nitpick comments (6)
src/client/client.ts (1)
86-106: Avoid mutating sharedchainConfigwhen overriding endpoints.Mutating exported chain objects can leak endpoint overrides across clients. Consider building a resolved copy instead.
♻️ Suggested refactor
- const chainConfig = config.chain || localnet; - if (config.endpoint) { - chainConfig.rpcUrls.default.http = [config.endpoint]; - } - if (config.webSocketEndpoint) { - chainConfig.rpcUrls.default = { - ...chainConfig.rpcUrls.default, - webSocket: [config.webSocketEndpoint], - }; - } + const chainConfig = config.chain || localnet; + const resolvedChain = { + ...chainConfig, + rpcUrls: { + ...chainConfig.rpcUrls, + default: { + ...chainConfig.rpcUrls.default, + ...(config.endpoint ? {http: [config.endpoint]} : {}), + ...(config.webSocketEndpoint ? {webSocket: [config.webSocketEndpoint]} : {}), + }, + }, + };- const customTransport = custom(getCustomTransportConfig(config, chainConfig as GenLayerChain), {retryCount: 0, retryDelay: 0}); - const publicClient = createPublicClient(chainConfig as GenLayerChain, customTransport).extend( + const customTransport = custom(getCustomTransportConfig(config, resolvedChain as GenLayerChain), {retryCount: 0, retryDelay: 0}); + const publicClient = createPublicClient(resolvedChain as GenLayerChain, customTransport).extend( publicActions, );- const baseClient = createViemClient({ - chain: chainConfig, + const baseClient = createViemClient({ + chain: resolvedChain, transport: customTransport, ...(config.account ? {account: config.account} : {}), });tests/subscriptions.test.ts (2)
91-194: Test suite only covers configuration guards — core streaming logic is untested.The tests validate error-throwing for missing config and error class semantics, which is good. However, the async iterator machinery (event delivery, queue backpressure at 1000 events,
unsubscribe()lifecycle, error propagation throughnext(), and WeakMap caching) has zero coverage. Consider adding tests that exerciseprocessLog/handleErrorpaths via the mockedwatchContractEvent, e.g. by capturing theonLogs/onErrorcallbacks from the mock and invoking them directly.
118-129: Simplify withexpect().not.toThrow().The try/catch pattern works but is verbose. Vitest supports a more idiomatic assertion:
- try { - actions.subscribeToNewTransaction(); - } catch (error) { - expect(error).not.toBeInstanceOf(WebSocketNotConfiguredError); - } + expect(() => actions.subscribeToNewTransaction()).not.toThrow(WebSocketNotConfiguredError);src/types/clients.ts (1)
21-83: Consider extracting GenCall types to a dedicated file.Subscription event types live in
src/types/subscriptions.ts, but GenCall types (GenCallStatusCode,GenCallStatus,GenCallEvent,GenCallMessage,GenCallResult) are inlined here. For consistency and to keepclients.tsfocused on the client interface, consider moving them tosrc/types/genCall.tsand re-exporting from the barrel.src/subscriptions/actions.ts (2)
96-100: Silent event loss when queue is full — consider surfacing a warning.When the queue exceeds 1000 events, the oldest is silently dropped. A slow consumer may lose events without any indication. Consider emitting a
console.warnon the first drop (or periodically) so operators can diagnose issues, e.g.:if (eventQueue.length >= MAX_EVENT_QUEUE_SIZE) { + console.warn(`[genlayer-js] ${eventName} event queue full (${MAX_EVENT_QUEUE_SIZE}), dropping oldest event`); eventQueue.shift(); // Drop oldest event }
46-67: Cached WebSocket client is never explicitly closed.The
WeakMapallows GC once theGenLayerClientis collected, but the underlying WebSocket connection won't be closed until the transport is garbage-collected (and even then, it depends on the runtime's finalizer behavior). If a client unsubscribes from all events, the WS connection stays open.Consider adding an explicit teardown path or removing the entry from the cache when the last active subscription for a client is unsubscribed.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/subscriptions/actions.ts`:
- Around line 46-67: The cached WebSocket client in wsClientCache can remain
after a connection failure; update getOrCreateWsClient to attach failure/close
handlers to the created wsClient (and/or ensure the stream's handleError path)
so that on transport error/close you call wsClientCache.delete(client) to evict
the dead client; specifically, when creating wsClient in getOrCreateWsClient,
register an onError/onClose listener on the underlying WebSocket transport (or
in the existing handleError where streams are terminated) to remove the entry
for the corresponding GenLayerClient so subsequent calls to getOrCreateWsClient
will create a fresh PublicClient<WebSocketTransport>.
🧹 Nitpick comments (1)
src/subscriptions/actions.ts (1)
134-161: Multiple iterators from the same stream share mutable state, causing lost events.Each call to
[Symbol.asyncIterator]()returns a new iterator object, but they all shareresolveNext,rejectNext, andeventQueue. If a consumer inadvertently creates two iterators (e.g., twofor await...ofloops on the same stream), events will be non-deterministically split between them. Consider documenting thatConsensusEventStreamis single-consumer, or guarding against multiple active iterators.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@tests/subscriptions.test.ts`:
- Around line 125-134: Add the missing assertions for subscribeToAppealStarted:
in the unit test that asserts all subscription methods throw
WebSocketNotConfiguredError, call expect(() =>
actions.subscribeToAppealStarted()).toThrow(WebSocketNotConfiguredError); and
likewise update the ConsensusEventStream interface check to include
subscribeToAppealStarted so the interface/test asserts that the method exists
and behaves like the other subscription methods; locate the tests referencing
subscriptionActions and ConsensusEventStream to add these two assertions
(subscribeToAppealStarted).
🧹 Nitpick comments (3)
src/subscriptions/actions.ts (2)
136-177: Each[Symbol.asyncIterator]()call creates a new iterator object sharing mutable state.If a consumer calls
[Symbol.asyncIterator]()more than once (e.g., twofor await...ofloops on the same stream), both iterators shareresolveNext/rejectNext/eventQueue, leading to lost events or races. This is a known limitation of single-consumer async iterables and likely acceptable here, but consider documenting that each stream supports only a single concurrent consumer.
87-106: Only the first queued event is delivered when a waiting consumer exists; remaining logs are queued correctly.The
processLogloop correctly delivers the first event to a waitingresolveNextconsumer and queues the rest. The bounded queue withshift()on overflow is a reasonable backpressure strategy. One subtlety: theconsole.warnon line 98 will fire on every dropped event if the producer consistently outpaces the consumer — consider rate-limiting the warning or logging the drop count periodically to avoid noisy logs.tests/subscriptions.test.ts (1)
109-112:afterEachwithvi.restoreAllMocks()is scoped only to the "Subscription Actions" describe block.The "ConsensusEventStream interface" and "Error Classes" describe blocks at the top level don't have cleanup. Since the ConsensusEventStream tests invoke
subscriptionActionsandsubscribeToNewTransaction(which trigger mockedcreatePublicClient), mock call counts and captured state may leak into later test blocks. This is currently benign because the cache eviction test explicitly clears the spy, but it's fragile.Consider hoisting the
afterEachto the top level or wrapping all test blocks in a single outerdescribe.
5ab184f to
09f8718
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/contracts/actions.ts`:
- Around line 374-386: The code currently defaults a missing resp.status to a
success which can hide real errors; change the logic around the returned status
(the resp and GenCallStatusCode references in this function) so that when
resp.status is null/undefined you return a non-success error status (e.g.,
GenCallStatusCode.FAILURE or UNKNOWN with a clear message like "missing status
from RPC response") and/or emit a warning/log entry so callers are not misled;
update the return object's status assignment to use that error-default value
instead of SUCCESS.
In `@tests/subscriptions.test.ts`:
- Around line 45-82: The mock defaultContract's ABI is missing the AppealStarted
event which will break decoding for subscribeToAppealStarted tests; add an ABI
entry named "AppealStarted" with type "event" and the appropriate inputs (e.g.,
indexed fields matching real contract like tx_id or txId and any other
addresses/fields used by your subscription) to the defaultContract constant so
tests can decode AppealStarted events correctly.
🧹 Nitpick comments (1)
src/contracts/actions.ts (1)
176-198: Consider extracting duplicatedNewTransactionevent parsing into a helper.The same pattern — checking
"hash" in result, parsingNewTransactionevents fromresult.receipt.logs, and throwing on empty events — is repeated verbatim in bothwriteContract(lines 184–198) anddeployContract(lines 237–251).♻️ Suggested helper extraction
+const _extractTxIdFromReceipt = ( + client: GenLayerClient<GenLayerChain>, + result: {receipt: TransactionReceipt} | {hash: `0x${string}`}, +): `0x${string}` => { + if ("hash" in result) { + return result.hash; + } + const newTxEvents = parseEventLogs({ + abi: client.chain.consensusMainContract?.abi as any, + eventName: "NewTransaction", + logs: result.receipt.logs, + }) as unknown as {args: {txId: `0x${string}`}}[]; + if (newTxEvents.length === 0) { + throw new Error("Transaction not processed by consensus"); + } + return newTxEvents[0].args["txId"]; +};Then in
writeContractanddeployContract:- if ("hash" in result) { - return result.hash; - } - const newTxEvents = parseEventLogs({...}); - if (newTxEvents.length === 0) { throw ... } - return newTxEvents[0].args["txId"]; + return _extractTxIdFromReceipt(client, result);Also applies to: 230-251
85909a5 to
883b554
Compare
|
@MuncleUscles can you review please? |
Add subscription methods to GenLayerClient for real-time consensus event streaming via WebSocket: - subscribeToNewTransaction - subscribeToTransactionAccepted - subscribeToTransactionActivated - subscribeToTransactionUndetermined - subscribeToTransactionLeaderTimeout Features: - Typed async iterable streams (ConsensusEventStream<T>) - Shared WebSocket client per GenLayerClient (WeakMap cache) - Bounded event queue (max 1000) to prevent memory leaks - Error propagation via async iterator - webSocketEndpoint config option for createClient() Throws WebSocketNotConfiguredError if no WS URL is configured. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add genCall method to genlayer-js client for direct gen_call RPC access: - Supports read, write, and deploy transaction types - Leader/validator mode via leader_results parameter - Returns full response: data, status, stdout, stderr, logs, events, messages - Add GenCallResult, GenCallStatus, GenCallEvent, GenCallMessage types Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add GenCallStatusCode enum with SUCCESS, USER_ERROR, VM_ERROR, INTERNAL_ERROR values for type-safe status code handling - Update GenCallStatus.code to use enum type instead of number - Add 12 unit tests covering: - Basic request/response handling - Account resolution (client, explicit, zero address) - Optional parameters (value, gas, blockNumber, status) - leaderResults → leader_results conversion - Response data normalization (0x prefix handling) - Default values for missing response fields - Error status codes - Deploy transaction type Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Update consensus contract ABIs for localnet and testnetAsimov - Add validUntil parameter to addTransaction - Rename txData to txCalldata, txReceipt to eqBlocksOutputs - Add txExecutionHash, saltNonce, validatorResultHash fields - Add consumedValidators array - Add subscribeToAppealStarted event subscription for appeal monitoring - Update transaction decoders to handle new field names with backward compatibility - Add AppealStartedEvent type with txId, appealer, appealBond, appealValidators Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Update GenCallResult interface to add eqOutputs field for non-deterministic operation results. - Modify contractActions function to handle eqOutputs in response format and ensure it defaults to an empty array if not present. This change improves the handling of equivalence outputs in contract interactions.
- Evict the cached WebSocket client when a WebSocket error occurs, ensuring that subsequent subscriptions create a fresh client. - Add tests to verify the behavior of client cache eviction upon WebSocket errors, confirming that the client is recreated as expected. This change enhances the reliability of WebSocket subscriptions by preventing stale connections.
…arrowing Move NewTransaction event parsing from _sendTransaction into each caller (writeContract, deployContract, appealTransaction) so external wallet flows return the consensus txId correctly. Add explicit return type to _sendTransaction for proper TypeScript narrowing. Update leaderResults type from Record to string[] and fix genCall test accordingly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…Finalized subscription Add dynamic contract address resolution via AddressManager for chains that configure an addressManagerAddress. Update consensus contract ABIs across all chain configs. Add TransactionFinalized event type and subscription. Update genCall to accept leaderResults as string[] and fix appeal transaction to return receipt hash for external wallets. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add AppealStarted and TransactionFinalized events to mock ABI in subscription tests - Add missing subscribeToAppealStarted and subscribeToTransactionFinalized assertions in all subscription method tests - Default genCall missing status to INTERNAL_ERROR instead of SUCCESS to avoid masking errors Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove obsolete V5/V6 dual ABI fallback tests (replaced by dynamic chain ABI approach), add WithFees (7-input) ABI test, fix gas expectations for 50% buffer, and remove dead V5/V6 constants. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
249b680 to
80f2ee0
Compare
249b680 to
80f2ee0
Compare
Summary
GenLayerClientfor real-time consensus event streamingsubscribeToNewTransaction,subscribeToTransactionAccepted,subscribeToTransactionActivated,subscribeToTransactionUndetermined,subscribeToTransactionLeaderTimeoutConsensusEventStream<T>) that can be consumed withfor await...ofwebSocketEndpointconfig option tocreateClient()Features
ConsensusEventStream<T>stream.unsubscribe()to stop listening and clean up resourcesUsage
Test plan
npm run build)tsc --noEmit)npm run lint)npm test)🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Changes
Tests