feat: Implement Message Updates, Deletes, and Appends#697
feat: Implement Message Updates, Deletes, and Appends#697
Conversation
Signed-off-by: Lewis Marshall <lewis.marshall@ably.com>
WalkthroughAdds message versioning and actions (create/update/delete/append), publish-with-result APIs for REST and Realtime, serial-aware ACK delivery via an ackCallback wrapper, protocol default bumped to v5, per-message retrieval/version history, supporting decoders, and extensive tests and examples for message lifecycle flows. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant REST_Channel as "REST Channel"
participant Server
Client->>REST_Channel: PublishWithResult(name, data)
REST_Channel->>Server: POST /messages (Action=Create)
Server-->>REST_Channel: { serials: ["s1"] }
REST_Channel-->>Client: PublishResult{Serial: "s1"}
Client->>REST_Channel: UpdateMessage(msg{Serial:s1}, opts)
REST_Channel->>REST_Channel: validate serial, build MessageVersion
REST_Channel->>Server: POST /messages (Action=Update, Version)
Server-->>REST_Channel: { versionSerial: "v2" }
REST_Channel-->>Client: UpdateResult{VersionSerial: "v2"}
sequenceDiagram
participant Client
participant Realtime_Channel as "Realtime Channel"
participant Connection
participant Server
Client->>Realtime_Channel: PublishWithResultAsync(name, data, cb)
Realtime_Channel->>Realtime_Channel: create ackCallback wrapper
Realtime_Channel->>Connection: send(msg, callback)
Connection->>Server: send PUBLISH
Server-->>Connection: ACK { serials: ["s1"] }
Connection->>Realtime_Channel: callback.call(serials, nil)
Realtime_Channel-->>Client: cb(PublishResult{Serial:"s1"}, nil)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
ably/paginated_result.go (1)
189-194:⚠️ Potential issue | 🟠 MajorBug:
copyHeaderassigns the original slice instead of the copy.Line 192 creates a copy
dof the header values slice, but line 193 assigns the originalvtodest[k], making the copy dead code. The destination header will share the same backing array as the source.🐛 Proposed fix
func copyHeader(dest, src http.Header) { for k, v := range src { d := make([]string, len(v)) copy(d, v) - dest[k] = v + dest[k] = d } }ably/rest_channel.go (1)
112-119:⚠️ Potential issue | 🟡 Minor
publishMultiplemutates caller's message slice elements in place.The
*m, err = (*m).withEncodedData(cipher)dereferences and overwrites the pointed-toMessagevalue. Sincemessagesis[]*Message, this modifies the caller's original messages (e.g., overwritingDatawith encoded bytes and settingEncoding). This could surprise callers who reuse message objects. The same pattern exists in theUpdateMessage/DeleteMessage/AppendMessagemethods, but those copy first (updateMsg := *msg), which is safer.
🤖 Fix all issues with AI agents
In `@ably/message_updates_integration_test.go`:
- Around line 315-329: The async callback passed to channel.AppendMessageAsync
currently calls t.Logf from a goroutine (in the loop over tokens) which can
panic if the test ends; change the callback to never call testing.T methods and
instead send results over the existing completed channel (e.g., send an index or
an error value/struct) so the main test goroutine can observe failures; update
the completion loop that reads from completed to detect and fail the test via
require/ t.Fatalf when an error result is received, and remove the t.Logf call
inside the callback (refer to the loop variable tokens, the
channel.AppendMessageAsync callback, and the completed channel when making the
change).
In `@ably/state.go`:
- Line 138: The Ack method on pendingEmitter has an unused conn *Connection
parameter; remove the parameter from the function signature (func (q
*pendingEmitter) Ack(msg *protocolMessage, errInfo *ErrorInfo)) and update all
callers (e.g., the calls from realtime_conn.go that currently pass a Connection)
to stop passing conn. Ensure you update any interface definitions or
implementations that referenced pendingEmitter.Ack to match the new signature
and run build/tests to catch remaining references.
🧹 Nitpick comments (6)
ably/rest_client.go (1)
812-819: Header ordering: custom headers copied beforeAcceptis set.Custom headers from
r.headerare copied at line 813 beforeAcceptis unconditionally set at line 815. If a caller ever passes a customAcceptheader, it would be silently overwritten. Currently this isn't an issue (only protocol version is overridden), but the conditional guard applied toablyProtocolVersionHeader(line 817) is not applied toAccept.Consider applying the same "set only if absent" pattern to
Acceptfor consistency, or document that certain headers cannot be overridden.ably/proto_message.go (1)
34-106: DRY: action↔numeric mapping is duplicated four times.The same switch logic for converting between
MessageActionand its numeric wire representation is repeated inMarshalJSON,UnmarshalJSON,CodecEncodeSelf, andCodecDecodeSelf. Consider extracting helpers likemessageActionToNumandnumToMessageActionto centralize the mapping.Also, the
defaultbranch silently maps unknown actions toMessageActionCreate(0) during both marshaling and unmarshaling. This could mask bugs if an invalid action is accidentally used. Consider returning an error for unknown values instead.♻️ Proposed helper extraction
+var messageActionToNum = map[MessageAction]int{ + MessageActionCreate: 0, + MessageActionUpdate: 1, + MessageActionDelete: 2, + MessageActionAppend: 5, +} + +var numToMessageAction = map[int]MessageAction{ + 0: MessageActionCreate, + 1: MessageActionUpdate, + 2: MessageActionDelete, + 5: MessageActionAppend, +} + func (a MessageAction) MarshalJSON() ([]byte, error) { - var num int - switch a { - case MessageActionCreate: - num = 0 - case MessageActionUpdate: - num = 1 - case MessageActionDelete: - num = 2 - case MessageActionAppend: - num = 5 - default: - num = 0 + num, ok := messageActionToNum[a] + if !ok { + return nil, fmt.Errorf("unknown MessageAction: %q", a) } return json.Marshal(num) }Apply the same pattern to
UnmarshalJSON,CodecEncodeSelf, andCodecDecodeSelf.ably/realtime_channel.go (1)
852-1046: Significant DRY opportunity:UpdateMessageAsync,DeleteMessageAsync, andAppendMessageAsyncare nearly identical.The three async methods differ only in the
MessageActionconstant assigned. The blocking variants follow the same boilerplate pattern too. Consider extracting a shared helper to reduce ~180 lines of near-duplicate code.♻️ Sketch of a shared helper
+func (c *RealtimeChannel) messageOperationAsync(msg *Message, action MessageAction, onAck func(*UpdateResult, error), options ...UpdateOption) error { + if err := validateMessageSerial(msg); err != nil { + return err + } + var opts updateOptions + for _, o := range options { + o(&opts) + } + version := &MessageVersion{ + Description: opts.description, + ClientID: opts.clientID, + Metadata: opts.metadata, + } + opMsg := *msg + opMsg.Action = action + opMsg.Version = version + protoMsg := &protocolMessage{ + Action: actionMessage, + Channel: c.Name, + Messages: []*Message{&opMsg}, + } + return c.sendWithSerialCallback(protoMsg, func(serials []string, err error) { + if err != nil { + onAck(nil, err) + return + } + result := &UpdateResult{} + if len(serials) > 0 { + result.VersionSerial = serials[0] + } + onAck(result, nil) + }) +} + +func (c *RealtimeChannel) messageOperation(ctx context.Context, msg *Message, action MessageAction, options ...UpdateOption) (*UpdateResult, error) { + type resultOrError struct { + result *UpdateResult + err error + } + listen := make(chan resultOrError, 1) + if err := c.messageOperationAsync(msg, action, func(result *UpdateResult, err error) { + listen <- resultOrError{result, err} + }, options...); err != nil { + return nil, err + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case result := <-listen: + return result.result, result.err + } +}Then each public method becomes a one-liner:
func (c *RealtimeChannel) UpdateMessage(ctx context.Context, msg *Message, options ...UpdateOption) (*UpdateResult, error) { return c.messageOperation(ctx, msg, MessageActionUpdate, options...) }ably/message_updates_integration_test.go (1)
205-209: Assertion on version ordering may be fragile.The test asserts that
versions[0]isMessageActionCreateandversions[1]/versions[2]areMessageActionUpdate. This depends on the server returning versions in a specific order. If the API's default ordering changes, this test will break. Consider documenting or parameterizing the expected direction.ably/rest_channel.go (2)
208-209: Remove debug log from production code.This
Debugfline logs response details including serials. While it's at debug level, it appears to be a development artifact (// Debug: log response). Consider removing it or ensuring it's intentional.Proposed fix
- // Debug: log response - c.log().Debugf("PublishMultipleWithResult response: serials=%v, count=%d", response.Serials, len(response.Serials)) - // Build results from serials
221-363: Significant code duplication acrossUpdateMessage,DeleteMessage, andAppendMessage.These three methods are nearly identical, differing only in the
Actionfield (MessageActionUpdate,MessageActionDelete,MessageActionAppend). Consider extracting a shared helper.Proposed refactor
+func (c *RESTChannel) performMessageAction(ctx context.Context, action MessageAction, msg *Message, options ...UpdateOption) (*UpdateResult, error) { + if err := validateMessageSerial(msg); err != nil { + return nil, err + } + + var opts updateOptions + for _, o := range options { + o(&opts) + } + + version := &MessageVersion{ + Description: opts.description, + ClientID: opts.clientID, + Metadata: opts.metadata, + } + + actionMsg := *msg + actionMsg.Action = action + actionMsg.Version = version + + cipher, _ := c.options.GetCipher() + var err error + actionMsg, err = actionMsg.withEncodedData(cipher) + if err != nil { + return nil, fmt.Errorf("encoding data for message: %w", err) + } + + var response publishResponse + res, err := c.client.post(ctx, c.baseURL+"/messages", []*Message{&actionMsg}, &response) + if err != nil { + return nil, err + } + defer res.Body.Close() + + result := &UpdateResult{} + if len(response.Serials) > 0 { + result.VersionSerial = response.Serials[0] + } + return result, nil +} + func (c *RESTChannel) UpdateMessage(ctx context.Context, msg *Message, options ...UpdateOption) (*UpdateResult, error) { - // ... ~45 lines of duplicated code + return c.performMessageAction(ctx, MessageActionUpdate, msg, options...) } func (c *RESTChannel) DeleteMessage(ctx context.Context, msg *Message, options ...UpdateOption) (*UpdateResult, error) { - // ... ~45 lines of duplicated code + return c.performMessageAction(ctx, MessageActionDelete, msg, options...) } func (c *RESTChannel) AppendMessage(ctx context.Context, msg *Message, options ...UpdateOption) (*UpdateResult, error) { - // ... ~45 lines of duplicated code + return c.performMessageAction(ctx, MessageActionAppend, msg, options...) }
|
It would have been nice as an experiment to derive the tests from https://github.com/ably/uts/blob/uts-experiments/test/completion-status.md |
|
@paddybyers yes apologies I did start with that but got a bit bogged down so I thought I'd just get this implemented (because I want it for load testing), and then follow up with you on the UTS stuff (we can try it out for annotations/summaries, which I haven't added here). |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
ably/state_test.go (3)
20-161: Tests cover the happy path well. Consider adding edge-case coverage.The three sub-tests correctly verify per-message serial routing for the straightforward ACK case. The test logic and assertions are sound.
A few additional scenarios would strengthen confidence in the
Ackimplementation:
- Implicit NACK (serialShift > 0): Enqueue messages starting at serial N, send an ACK starting at serial N+1 — verify the first message gets
errImplictNACKand a nil result, while the rest get their correct serials.- NACK with errInfo: Call
Ackwith a non-nilerrInfoand verify the error propagates to all callbacks.- Nil callback safety: Enqueue a message with a nil
*ackCallbackand verifyAckdoesn't panic.onAck(non-serial) path: Enqueue with anackCallbackthat only setsonAck, verify it receives the error correctly.
21-23: Optional: Extract repeated logger setup into a test helper.
testLogger := logger{l: &stdLogger{log.New(io.Discard, "", 0)}}is duplicated in all three sub-tests. A small helper or asetupat the top ofTestPendingEmitter_AckResultwould reduce repetition.♻️ Suggested refactor
func TestPendingEmitter_AckResult(t *testing.T) { + testLogger := logger{l: &stdLogger{log.New(io.Discard, "", 0)}} + t.Run("two messages with single serial each", func(t *testing.T) { - testLogger := logger{l: &stdLogger{log.New(io.Discard, "", 0)}} emitter := newPendingEmitter(testLogger)
34-48: Optional: Assert thaterris nil in callbacks for ACK (non-NACK) scenarios.The callbacks capture
serialsbut silently discarderr. Addingassert.NoError(t, err)inside each callback would catch unexpected error propagation.♻️ Example for callback1
callback1 := &ackCallback{ - onAckWithSerials: func(serials []string, err error) { + onAckWithSerials: func(serials []string, err error) { + assert.NoError(t, err) msg1Serials = serials }, }
This implements message updates, deletes, and appends as proposed in #696.
Summary by CodeRabbit
New Features
Tests