Skip to content

feat: Implement Message Updates, Deletes, and Appends#697

Open
lmars wants to merge 5 commits intomainfrom
message-updates
Open

feat: Implement Message Updates, Deletes, and Appends#697
lmars wants to merge 5 commits intomainfrom
message-updates

Conversation

@lmars
Copy link
Member

@lmars lmars commented Feb 15, 2026

This implements message updates, deletes, and appends as proposed in #696.

Summary by CodeRabbit

  • New Features

    • Full message lifecycle: publish (sync/async) with per-message serials, update, delete, and append for REST and realtime; fetch message by serial and view version history
    • Protocol default updated to v5 for message operations (stats remain on v2)
  • Tests

    • New integration, unit tests and usage examples demonstrating publish/update/append/delete/versioning and async flows

Signed-off-by: Lewis Marshall <lewis.marshall@ably.com>
@lmars lmars requested a review from sacOO7 February 15, 2026 16:28
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 15, 2026

Walkthrough

Adds message versioning and actions (create/update/delete/append), publish-with-result APIs for REST and Realtime, serial-aware ACK delivery via an ackCallback wrapper, protocol default bumped to v5, per-message retrieval/version history, supporting decoders, and extensive tests and examples for message lifecycle flows.

Changes

Cohort / File(s) Summary
Core message types & tests
ably/proto_message.go, ably/proto_message_operations_test.go
Introduce MessageAction, MessageVersion, PublishResult, UpdateResult, update options and serialization (JSON/MsgPack); add unit tests for encodings and serial validation.
Protocol & wire changes
ably/proto_http.go, ably/proto_protocol_message.go, ably/paginated_result.go
Default protocol version changed to v5; add protocolPublishResult and Res field to protocol message wire type; allow paginatedRequest to carry custom http.Header.
REST channel & client
ably/rest_channel.go, ably/rest_client.go
Add PublishWithResult/PublishMultipleWithResult, Update/Delete/Append message ops, GetMessage, GetMessageVersions, per-message decoders (fullMessageDecoder), validateMessageSerial, getWithHeader, and conditional protocol header handling (Stats forced to v2).
Realtime channel & connection
ably/realtime_channel.go, ably/realtime_conn.go
Add context-aware and async PublishWithResult/PublishMultipleWithResult, Update/Delete/Append (sync+async), GetMessage/GetMessageVersions; introduce sendWithSerialCallback; refactor Connection.send to accept *ackCallback.
Callback mechanism & state
ably/state.go, ably/realtime_presence.go, ably/export_test.go
Replace plain onAck callbacks with ackCallback supporting error-only and serial-aware callbacks; update pendingEmitter, queues, enqueue/flush/fail/ack paths to use ackCallback.call(...).
Experimental objects & mocks
ably/realtime_experimental_objects.go, ably/realtime_experimental_objects_test.go
Update channel send signatures and channel mock to accept *ackCallback and adapt tests to call callback.call(...).
Integration, examples & sandbox
ably/message_updates_integration_test.go, ably/example_message_updates_test.go, ablytest/sandbox.go, ably/state_test.go
Add comprehensive integration tests and examples for REST and Realtime message update/append/delete flows; add pendingEmitter ACK unit tests; expose MutableMessages in sandbox defaults.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant REST_Channel as "REST Channel"
    participant Server
    Client->>REST_Channel: PublishWithResult(name, data)
    REST_Channel->>Server: POST /messages (Action=Create)
    Server-->>REST_Channel: { serials: ["s1"] }
    REST_Channel-->>Client: PublishResult{Serial: "s1"}

    Client->>REST_Channel: UpdateMessage(msg{Serial:s1}, opts)
    REST_Channel->>REST_Channel: validate serial, build MessageVersion
    REST_Channel->>Server: POST /messages (Action=Update, Version)
    Server-->>REST_Channel: { versionSerial: "v2" }
    REST_Channel-->>Client: UpdateResult{VersionSerial: "v2"}
Loading
sequenceDiagram
    participant Client
    participant Realtime_Channel as "Realtime Channel"
    participant Connection
    participant Server
    Client->>Realtime_Channel: PublishWithResultAsync(name, data, cb)
    Realtime_Channel->>Realtime_Channel: create ackCallback wrapper
    Realtime_Channel->>Connection: send(msg, callback)
    Connection->>Server: send PUBLISH
    Server-->>Connection: ACK { serials: ["s1"] }
    Connection->>Realtime_Channel: callback.call(serials, nil)
    Realtime_Channel-->>Client: cb(PublishResult{Serial:"s1"}, nil)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐇 I hopped through bytes and wires tonight,

serials flashed and shifted bright,
Update, Append, Delete—each new trail,
REST and Realtime tell the tale,
A rabbit's hop, versioned delight.

🚥 Pre-merge checks | ✅ 3 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 39.13% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and accurately describes the main feature implementation: message updates, deletes, and appends, which aligns with the substantial changes across REST and Realtime channels.
Merge Conflict Detection ✅ Passed ✅ No merge conflicts detected when merging into main

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch message-updates

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions bot temporarily deployed to staging/pull/697/godoc February 15, 2026 16:29 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/697/features February 15, 2026 16:29 Inactive
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
ably/paginated_result.go (1)

189-194: ⚠️ Potential issue | 🟠 Major

Bug: copyHeader assigns the original slice instead of the copy.

Line 192 creates a copy d of the header values slice, but line 193 assigns the original v to dest[k], making the copy dead code. The destination header will share the same backing array as the source.

🐛 Proposed fix
 func copyHeader(dest, src http.Header) {
 	for k, v := range src {
 		d := make([]string, len(v))
 		copy(d, v)
-		dest[k] = v
+		dest[k] = d
 	}
 }
ably/rest_channel.go (1)

112-119: ⚠️ Potential issue | 🟡 Minor

publishMultiple mutates caller's message slice elements in place.

The *m, err = (*m).withEncodedData(cipher) dereferences and overwrites the pointed-to Message value. Since messages is []*Message, this modifies the caller's original messages (e.g., overwriting Data with encoded bytes and setting Encoding). This could surprise callers who reuse message objects. The same pattern exists in the UpdateMessage/DeleteMessage/AppendMessage methods, but those copy first (updateMsg := *msg), which is safer.

🤖 Fix all issues with AI agents
In `@ably/message_updates_integration_test.go`:
- Around line 315-329: The async callback passed to channel.AppendMessageAsync
currently calls t.Logf from a goroutine (in the loop over tokens) which can
panic if the test ends; change the callback to never call testing.T methods and
instead send results over the existing completed channel (e.g., send an index or
an error value/struct) so the main test goroutine can observe failures; update
the completion loop that reads from completed to detect and fail the test via
require/ t.Fatalf when an error result is received, and remove the t.Logf call
inside the callback (refer to the loop variable tokens, the
channel.AppendMessageAsync callback, and the completed channel when making the
change).

In `@ably/state.go`:
- Line 138: The Ack method on pendingEmitter has an unused conn *Connection
parameter; remove the parameter from the function signature (func (q
*pendingEmitter) Ack(msg *protocolMessage, errInfo *ErrorInfo)) and update all
callers (e.g., the calls from realtime_conn.go that currently pass a Connection)
to stop passing conn. Ensure you update any interface definitions or
implementations that referenced pendingEmitter.Ack to match the new signature
and run build/tests to catch remaining references.
🧹 Nitpick comments (6)
ably/rest_client.go (1)

812-819: Header ordering: custom headers copied before Accept is set.

Custom headers from r.header are copied at line 813 before Accept is unconditionally set at line 815. If a caller ever passes a custom Accept header, it would be silently overwritten. Currently this isn't an issue (only protocol version is overridden), but the conditional guard applied to ablyProtocolVersionHeader (line 817) is not applied to Accept.

Consider applying the same "set only if absent" pattern to Accept for consistency, or document that certain headers cannot be overridden.

ably/proto_message.go (1)

34-106: DRY: action↔numeric mapping is duplicated four times.

The same switch logic for converting between MessageAction and its numeric wire representation is repeated in MarshalJSON, UnmarshalJSON, CodecEncodeSelf, and CodecDecodeSelf. Consider extracting helpers like messageActionToNum and numToMessageAction to centralize the mapping.

Also, the default branch silently maps unknown actions to MessageActionCreate (0) during both marshaling and unmarshaling. This could mask bugs if an invalid action is accidentally used. Consider returning an error for unknown values instead.

♻️ Proposed helper extraction
+var messageActionToNum = map[MessageAction]int{
+	MessageActionCreate: 0,
+	MessageActionUpdate: 1,
+	MessageActionDelete: 2,
+	MessageActionAppend: 5,
+}
+
+var numToMessageAction = map[int]MessageAction{
+	0: MessageActionCreate,
+	1: MessageActionUpdate,
+	2: MessageActionDelete,
+	5: MessageActionAppend,
+}
+
 func (a MessageAction) MarshalJSON() ([]byte, error) {
-	var num int
-	switch a {
-	case MessageActionCreate:
-		num = 0
-	case MessageActionUpdate:
-		num = 1
-	case MessageActionDelete:
-		num = 2
-	case MessageActionAppend:
-		num = 5
-	default:
-		num = 0
+	num, ok := messageActionToNum[a]
+	if !ok {
+		return nil, fmt.Errorf("unknown MessageAction: %q", a)
 	}
 	return json.Marshal(num)
 }

Apply the same pattern to UnmarshalJSON, CodecEncodeSelf, and CodecDecodeSelf.

ably/realtime_channel.go (1)

852-1046: Significant DRY opportunity: UpdateMessageAsync, DeleteMessageAsync, and AppendMessageAsync are nearly identical.

The three async methods differ only in the MessageAction constant assigned. The blocking variants follow the same boilerplate pattern too. Consider extracting a shared helper to reduce ~180 lines of near-duplicate code.

♻️ Sketch of a shared helper
+func (c *RealtimeChannel) messageOperationAsync(msg *Message, action MessageAction, onAck func(*UpdateResult, error), options ...UpdateOption) error {
+	if err := validateMessageSerial(msg); err != nil {
+		return err
+	}
+	var opts updateOptions
+	for _, o := range options {
+		o(&opts)
+	}
+	version := &MessageVersion{
+		Description: opts.description,
+		ClientID:    opts.clientID,
+		Metadata:    opts.metadata,
+	}
+	opMsg := *msg
+	opMsg.Action = action
+	opMsg.Version = version
+	protoMsg := &protocolMessage{
+		Action:   actionMessage,
+		Channel:  c.Name,
+		Messages: []*Message{&opMsg},
+	}
+	return c.sendWithSerialCallback(protoMsg, func(serials []string, err error) {
+		if err != nil {
+			onAck(nil, err)
+			return
+		}
+		result := &UpdateResult{}
+		if len(serials) > 0 {
+			result.VersionSerial = serials[0]
+		}
+		onAck(result, nil)
+	})
+}
+
+func (c *RealtimeChannel) messageOperation(ctx context.Context, msg *Message, action MessageAction, options ...UpdateOption) (*UpdateResult, error) {
+	type resultOrError struct {
+		result *UpdateResult
+		err    error
+	}
+	listen := make(chan resultOrError, 1)
+	if err := c.messageOperationAsync(msg, action, func(result *UpdateResult, err error) {
+		listen <- resultOrError{result, err}
+	}, options...); err != nil {
+		return nil, err
+	}
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case result := <-listen:
+		return result.result, result.err
+	}
+}

Then each public method becomes a one-liner:

func (c *RealtimeChannel) UpdateMessage(ctx context.Context, msg *Message, options ...UpdateOption) (*UpdateResult, error) {
	return c.messageOperation(ctx, msg, MessageActionUpdate, options...)
}
ably/message_updates_integration_test.go (1)

205-209: Assertion on version ordering may be fragile.

The test asserts that versions[0] is MessageActionCreate and versions[1]/versions[2] are MessageActionUpdate. This depends on the server returning versions in a specific order. If the API's default ordering changes, this test will break. Consider documenting or parameterizing the expected direction.

ably/rest_channel.go (2)

208-209: Remove debug log from production code.

This Debugf line logs response details including serials. While it's at debug level, it appears to be a development artifact (// Debug: log response). Consider removing it or ensuring it's intentional.

Proposed fix
-	// Debug: log response
-	c.log().Debugf("PublishMultipleWithResult response: serials=%v, count=%d", response.Serials, len(response.Serials))
-
 	// Build results from serials

221-363: Significant code duplication across UpdateMessage, DeleteMessage, and AppendMessage.

These three methods are nearly identical, differing only in the Action field (MessageActionUpdate, MessageActionDelete, MessageActionAppend). Consider extracting a shared helper.

Proposed refactor
+func (c *RESTChannel) performMessageAction(ctx context.Context, action MessageAction, msg *Message, options ...UpdateOption) (*UpdateResult, error) {
+	if err := validateMessageSerial(msg); err != nil {
+		return nil, err
+	}
+
+	var opts updateOptions
+	for _, o := range options {
+		o(&opts)
+	}
+
+	version := &MessageVersion{
+		Description: opts.description,
+		ClientID:    opts.clientID,
+		Metadata:    opts.metadata,
+	}
+
+	actionMsg := *msg
+	actionMsg.Action = action
+	actionMsg.Version = version
+
+	cipher, _ := c.options.GetCipher()
+	var err error
+	actionMsg, err = actionMsg.withEncodedData(cipher)
+	if err != nil {
+		return nil, fmt.Errorf("encoding data for message: %w", err)
+	}
+
+	var response publishResponse
+	res, err := c.client.post(ctx, c.baseURL+"/messages", []*Message{&actionMsg}, &response)
+	if err != nil {
+		return nil, err
+	}
+	defer res.Body.Close()
+
+	result := &UpdateResult{}
+	if len(response.Serials) > 0 {
+		result.VersionSerial = response.Serials[0]
+	}
+	return result, nil
+}
+
 func (c *RESTChannel) UpdateMessage(ctx context.Context, msg *Message, options ...UpdateOption) (*UpdateResult, error) {
-	// ... ~45 lines of duplicated code
+	return c.performMessageAction(ctx, MessageActionUpdate, msg, options...)
 }
 
 func (c *RESTChannel) DeleteMessage(ctx context.Context, msg *Message, options ...UpdateOption) (*UpdateResult, error) {
-	// ... ~45 lines of duplicated code
+	return c.performMessageAction(ctx, MessageActionDelete, msg, options...)
 }
 
 func (c *RESTChannel) AppendMessage(ctx context.Context, msg *Message, options ...UpdateOption) (*UpdateResult, error) {
-	// ... ~45 lines of duplicated code
+	return c.performMessageAction(ctx, MessageActionAppend, msg, options...)
 }

@github-actions github-actions bot temporarily deployed to staging/pull/697/features February 15, 2026 16:48 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/697/godoc February 15, 2026 16:48 Inactive
@paddybyers
Copy link
Member

It would have been nice as an experiment to derive the tests from https://github.com/ably/uts/blob/uts-experiments/test/completion-status.md

@lmars
Copy link
Member Author

lmars commented Feb 15, 2026

@paddybyers yes apologies I did start with that but got a bit bogged down so I thought I'd just get this implemented (because I want it for load testing), and then follow up with you on the UTS stuff (we can try it out for annotations/summaries, which I haven't added here).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
ably/state_test.go (3)

20-161: Tests cover the happy path well. Consider adding edge-case coverage.

The three sub-tests correctly verify per-message serial routing for the straightforward ACK case. The test logic and assertions are sound.

A few additional scenarios would strengthen confidence in the Ack implementation:

  1. Implicit NACK (serialShift > 0): Enqueue messages starting at serial N, send an ACK starting at serial N+1 — verify the first message gets errImplictNACK and a nil result, while the rest get their correct serials.
  2. NACK with errInfo: Call Ack with a non-nil errInfo and verify the error propagates to all callbacks.
  3. Nil callback safety: Enqueue a message with a nil *ackCallback and verify Ack doesn't panic.
  4. onAck (non-serial) path: Enqueue with an ackCallback that only sets onAck, verify it receives the error correctly.

21-23: Optional: Extract repeated logger setup into a test helper.

testLogger := logger{l: &stdLogger{log.New(io.Discard, "", 0)}} is duplicated in all three sub-tests. A small helper or a setup at the top of TestPendingEmitter_AckResult would reduce repetition.

♻️ Suggested refactor
 func TestPendingEmitter_AckResult(t *testing.T) {
+	testLogger := logger{l: &stdLogger{log.New(io.Discard, "", 0)}}
+
 	t.Run("two messages with single serial each", func(t *testing.T) {
-		testLogger := logger{l: &stdLogger{log.New(io.Discard, "", 0)}}
 		emitter := newPendingEmitter(testLogger)

34-48: Optional: Assert that err is nil in callbacks for ACK (non-NACK) scenarios.

The callbacks capture serials but silently discard err. Adding assert.NoError(t, err) inside each callback would catch unexpected error propagation.

♻️ Example for callback1
 		callback1 := &ackCallback{
-			onAckWithSerials: func(serials []string, err error) {
+			onAckWithSerials: func(serials []string, err error) {
+				assert.NoError(t, err)
 				msg1Serials = serials
 			},
 		}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

2 participants