Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ Spec version: `0.5.1`

### Added

- `SubscribeParams.delivery.maxLatencyMs` for clients to request a maximum
subscription delivery latency, including `0` for no intentional coalescing.
- `SessionState.inputNeeded` — a session-level aggregate of outstanding input
requests across all chats, so a client can discover and answer elicitations,
tool confirmations, and client-tool execution requests from the session
Expand Down
4 changes: 3 additions & 1 deletion clients/go/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ tag whose matching `## [X.Y.Z]` heading is missing from this file.

### Added

- `SubscribeParams.Delivery.MaxLatencyMs` and `Client.SubscribeWithDelivery`
for clients to request a maximum subscription delivery latency, including
`0` for no intentional coalescing.
- Optional `capabilities` field on `AgentInfo` (`AgentCapabilities` with a
nested `multipleChats` capability carrying `fork`) so clients gate multi-chat
and fork via advertised capabilities instead of provider-id switches.

- `SessionState.InputNeeded` — a session-level aggregate of outstanding input
requests across all chats (`SessionInputRequest` union with
`SessionChatInputRequest`, `SessionToolConfirmationRequest`, and
Expand Down
9 changes: 8 additions & 1 deletion clients/go/ahp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,9 +658,16 @@ func (c *Client) Reconnect(ctx context.Context, clientID string, lastSeenServerS
// Subscribe sends a `subscribe` request and returns the initial snapshot
// together with a per-URI [Subscription] handle.
func (c *Client) Subscribe(ctx context.Context, uri string) (*ahptypes.SubscribeResult, *Subscription, error) {
return c.SubscribeWithDelivery(ctx, uri, nil)
}

// SubscribeWithDelivery sends a `subscribe` request with advisory delivery
// preferences and returns the initial snapshot together with a per-URI
// [Subscription] handle.
func (c *Client) SubscribeWithDelivery(ctx context.Context, uri string, delivery *ahptypes.SubscriptionDeliveryOptions) (*ahptypes.SubscribeResult, *Subscription, error) {
sub := c.AttachSubscription(uri)
var out ahptypes.SubscribeResult
if err := c.Request(ctx, "subscribe", ahptypes.SubscribeParams{Channel: uri}, &out); err != nil {
if err := c.Request(ctx, "subscribe", ahptypes.SubscribeParams{Channel: uri, Delivery: delivery}, &out); err != nil {
sub.Close()
return nil, nil, err
}
Expand Down
15 changes: 15 additions & 0 deletions clients/go/ahptypes/commands.generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,21 @@ type ReconnectSnapshotResult struct {
type SubscribeParams struct {
// Channel URI this command targets.
Channel URI `json:"channel"`
// Optional delivery preferences for this subscription.
//
// Servers MAY use these preferences to buffer and coalesce high-frequency
// updates while preserving the same reduced state. Omit this field for the
// server's default delivery behavior.
Delivery *SubscriptionDeliveryOptions `json:"delivery,omitempty"`
}

// Advisory delivery preferences for a single subscription.
type SubscriptionDeliveryOptions struct {
// Maximum time, in milliseconds, that the server may intentionally delay
// delivery while buffering/coalescing updates for this subscription.
//
// A value of `0` requests immediate delivery with no intentional coalescing.
MaxLatencyMs *int64 `json:"maxLatencyMs,omitempty"`
}

// Result of the `subscribe` command.
Expand Down
3 changes: 2 additions & 1 deletion clients/kotlin/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ versions (`*-SNAPSHOT`) are explicitly rejected by the publish pipeline; bump

### Added

- `SubscribeParams.delivery.maxLatencyMs` for clients to request a maximum
subscription delivery latency, including `0` for no intentional coalescing.
- Optional `capabilities` field on `AgentInfo` (`AgentCapabilities` with a
nested `multipleChats` capability carrying `fork`) so clients gate multi-chat
and fork via advertised capabilities instead of provider-id switches.

- `SessionState.inputNeeded` — a session-level aggregate of outstanding input
requests across all chats (`SessionInputRequest` sealed interface with
`SessionChatInputRequest`, `SessionToolConfirmationRequest`, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,26 @@ data class SubscribeParams(
/**
* Channel URI this command targets.
*/
val channel: String
val channel: String,
/**
* Optional delivery preferences for this subscription.
*
* Servers MAY use these preferences to buffer and coalesce high-frequency
* updates while preserving the same reduced state. Omit this field for the
* server's default delivery behavior.
*/
val delivery: SubscriptionDeliveryOptions? = null
)

@Serializable
data class SubscriptionDeliveryOptions(
/**
* Maximum time, in milliseconds, that the server may intentionally delay
* delivery while buffering/coalescing updates for this subscription.
*
* A value of `0` requests immediate delivery with no intentional coalescing.
*/
val maxLatencyMs: Long? = null
)

@Serializable
Expand Down
10 changes: 9 additions & 1 deletion clients/rust/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ matching `## [X.Y.Z]` heading is missing from this file.

### Added

- `SubscribeParams.delivery.max_latency_ms` and
`Client::subscribe_with_delivery` for clients to request a maximum
subscription delivery latency, including `0` for no intentional coalescing.
- Optional `capabilities` field on `AgentInfo` (`AgentCapabilities` with a
nested `multipleChats` capability carrying `fork`) so clients gate multi-chat
and fork via advertised capabilities instead of provider-id switches.

- `SessionState.input_needed` — a session-level aggregate of outstanding input
requests across all chats (`SessionInputRequest` enum with
`SessionChatInputRequest`, `SessionToolConfirmationRequest`, and
Expand All @@ -34,6 +36,12 @@ matching `## [X.Y.Z]` heading is missing from this file.
- Optional `model` and `tools` fields on `AgentCustomization` for a custom
agent's pinned model and tool allowlist.

### Changed

- Direct Rust struct literals for `SubscribeParams` must now include
`delivery: None`; use `SubscribeParams::new(channel)` or
`Client::subscribe` to keep the default delivery behavior.

## [0.5.0] — 2026-06-26

Implements AHP 0.5.0.
Expand Down
37 changes: 37 additions & 0 deletions clients/rust/crates/ahp-types/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,43 @@ pub struct ReconnectSnapshotResult {
pub struct SubscribeParams {
/// Channel URI this command targets.
pub channel: Uri,
/// Optional delivery preferences for this subscription.
///
/// Servers MAY use these preferences to buffer and coalesce high-frequency
/// updates while preserving the same reduced state. Omit this field for the
/// server's default delivery behavior.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub delivery: Option<SubscriptionDeliveryOptions>,
}

impl SubscribeParams {
/// Create subscribe params with default delivery behavior.
pub fn new(channel: impl Into<Uri>) -> Self {
Self {
channel: channel.into(),
delivery: None,
}
}

/// Create subscribe params with advisory delivery preferences.
pub fn with_delivery(channel: impl Into<Uri>, delivery: SubscriptionDeliveryOptions) -> Self {
Self {
channel: channel.into(),
delivery: Some(delivery),
}
}
}

/// Advisory delivery preferences for a single subscription.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct SubscriptionDeliveryOptions {
/// Maximum time, in milliseconds, that the server may intentionally delay
/// delivery while buffering/coalescing updates for this subscription.
///
/// A value of `0` requests immediate delivery with no intentional coalescing.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_latency_ms: Option<i64>,
}

/// Result of the `subscribe` command.
Expand Down
20 changes: 18 additions & 2 deletions clients/rust/crates/ahp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::time::Duration;
use ahp_types::actions::{ActionEnvelope, StateAction};
use ahp_types::commands::{
DispatchActionParams, InitializeParams, InitializeResult, ReconnectParams, ReconnectResult,
SubscribeParams, SubscribeResult, UnsubscribeParams,
SubscribeParams, SubscribeResult, SubscriptionDeliveryOptions, UnsubscribeParams,
};
use ahp_types::common::{Uri, ROOT_RESOURCE_URI};
use ahp_types::messages::{
Expand Down Expand Up @@ -390,10 +390,26 @@ impl Client {
pub async fn subscribe(
&self,
uri: String,
) -> Result<(SubscribeResult, SessionSubscription), ClientError> {
self.subscribe_with_delivery(uri, None).await
}

/// Subscribe to a URI with advisory delivery preferences and obtain a
/// handle that streams [`SubscriptionEvent`]s for that channel.
pub async fn subscribe_with_delivery(
&self,
uri: String,
delivery: Option<SubscriptionDeliveryOptions>,
) -> Result<(SubscribeResult, SessionSubscription), ClientError> {
let sub = self.attach_subscription(&uri).await;
let result: SubscribeResult = self
.request("subscribe", SubscribeParams { channel: uri })
.request(
"subscribe",
SubscribeParams {
channel: uri,
delivery,
},
)
.await?;
Ok((result, sub))
}
Expand Down
1 change: 1 addition & 0 deletions clients/rust/crates/ahp/src/hosts/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ impl HostRuntime {
"subscribe",
SubscribeParams {
channel: uri.clone(),
delivery: None,
},
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,33 @@ public struct ReconnectSnapshotResult: Codable, Sendable {
public struct SubscribeParams: Codable, Sendable {
/// Channel URI this command targets.
public var channel: String
/// Optional delivery preferences for this subscription.
///
/// Servers MAY use these preferences to buffer and coalesce high-frequency
/// updates while preserving the same reduced state. Omit this field for the
/// server's default delivery behavior.
public var delivery: SubscriptionDeliveryOptions?

public init(
channel: String
channel: String,
delivery: SubscriptionDeliveryOptions? = nil
) {
self.channel = channel
self.delivery = delivery
}
}

public struct SubscriptionDeliveryOptions: Codable, Sendable {
/// Maximum time, in milliseconds, that the server may intentionally delay
/// delivery while buffering/coalescing updates for this subscription.
///
/// A value of `0` requests immediate delivery with no intentional coalescing.
public var maxLatencyMs: Int?

public init(
maxLatencyMs: Int? = nil
) {
self.maxLatencyMs = maxLatencyMs
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,15 @@ public actor AHPClient {
/// If the request fails (RPC error, timeout, transport drop), the local
/// listener is removed and its stream finished — callers don't need to
/// clean up the partially-attached subscription.
public func subscribe(_ uri: String) async throws -> (SubscribeResult, AsyncStream<SubscriptionEvent>) {
public func subscribe(
_ uri: String,
delivery: SubscriptionDeliveryOptions? = nil
) async throws -> (SubscribeResult, AsyncStream<SubscriptionEvent>) {
let (stream, listenerId) = attachSubscriptionInternal(uri)
do {
let result: SubscribeResult = try await request(
method: "subscribe",
params: SubscribeParams(channel: uri)
params: SubscribeParams(channel: uri, delivery: delivery)
)
return (result, stream)
} catch {
Expand Down
4 changes: 3 additions & 1 deletion clients/swift/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ the tag matches the version pinned in [`VERSION`](VERSION).

### Added

- `SubscribeParams.delivery.maxLatencyMs` and
`AHPClient.subscribe(_:delivery:)` for clients to request a maximum
subscription delivery latency, including `0` for no intentional coalescing.
- Optional `capabilities` field on `AgentInfo` (`AgentCapabilities` with a
nested `multipleChats` capability carrying `fork`) so clients gate multi-chat
and fork via advertised capabilities instead of provider-id switches.

- `SessionState.inputNeeded` — a session-level aggregate of outstanding input
requests across all chats (`SessionInputRequest` enum with
`SessionChatInputRequest`, `SessionToolConfirmationRequest`, and
Expand Down
4 changes: 3 additions & 1 deletion clients/typescript/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ hotfix escape hatch.

### Added

- `SubscribeParams.delivery.maxLatencyMs` and `AhpClient.subscribe` delivery
options for clients to request a maximum subscription delivery latency,
including `0` for no intentional coalescing.
- Optional `capabilities` field on `AgentInfo` (`AgentCapabilities` with a
nested `multipleChats` capability carrying `fork`) so clients gate multi-chat
and fork via advertised capabilities instead of provider-id switches.

- `SessionState.inputNeeded` — a session-level aggregate of outstanding input
requests across all chats (`SessionInputRequest` union with
`SessionChatInputRequest`, `SessionToolConfirmationRequest`, and
Expand Down
17 changes: 15 additions & 2 deletions clients/typescript/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type {
ReconnectParams,
ReconnectResult,
SubscribeParams,
SubscriptionDeliveryOptions,
SubscribeResult,
UnsubscribeParams,
} from '../types/common/commands.js';
Expand Down Expand Up @@ -75,6 +76,12 @@ export interface AhpClientConfig {
subscriptionBuffer?: number;
}

/** Optional preferences for a `subscribe` request. */
export interface SubscribeOptions {
/** Advisory delivery preferences for this subscription. */
delivery?: SubscriptionDeliveryOptions;
}

const DEFAULT_REQUEST_TIMEOUT_MS = 30_000;
const DEFAULT_SUBSCRIPTION_BUFFER = 4096;

Expand Down Expand Up @@ -291,10 +298,16 @@ export class AhpClient {
* before the `subscribe` request is sent, so no events delivered during
* the round-trip are missed.
*/
async subscribe(uri: URI): Promise<{ result: SubscribeResult; subscription: Subscription }> {
async subscribe(
uri: URI,
options: SubscribeOptions = {},
): Promise<{ result: SubscribeResult; subscription: Subscription }> {
const subscription = this.attachSubscription(uri);
try {
const params: SubscribeParams = { channel: uri };
const params: SubscribeParams = {
channel: uri,
...(options.delivery ? { delivery: options.delivery } : {}),
};
const result = await this.request('subscribe', params);
return { result, subscription };
} catch (err) {
Expand Down
2 changes: 1 addition & 1 deletion clients/typescript/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

export { AhpClient, Subscription } from './client.js';
export type { AhpClientConfig, DispatchHandle, ServerRequestHandler } from './client.js';
export type { AhpClientConfig, DispatchHandle, ServerRequestHandler, SubscribeOptions } from './client.js';
export type { ClientEvent, ClosedReason, ConnectionState, SubscriptionEvent } from './events.js';
export {
AhpClientError,
Expand Down
3 changes: 2 additions & 1 deletion clients/typescript/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ test('subscribe attaches before sending the request and fans out an action', asy
const client = new AhpClient(c);
client.connect();

const subPromise = client.subscribe('ahp-session:/s1');
const subPromise = client.subscribe('ahp-session:/s1', { delivery: { maxLatencyMs: 100 } });
const req = await readRequest(s);
assert.equal(req.method, 'subscribe');
assert.equal((req.params as SubscribeParams).channel, 'ahp-session:/s1');
assert.deepEqual((req.params as SubscribeParams).delivery, { maxLatencyMs: 100 });

const result: SubscribeResult = {
channel: 'ahp-session:/s1',
Expand Down
14 changes: 13 additions & 1 deletion docs/specification/subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ Future channel types (LSP relay, MCP relay, …) introduce their own URI schemes
"jsonrpc": "2.0",
"id": 1,
"method": "subscribe",
"params": { "channel": "ahp-session:/<uuid>" }
"params": {
"channel": "ahp-session:/<uuid>",
"delivery": { "maxLatencyMs": 100 }
}
}

// Server → Client (state-bearing channel)
Expand Down Expand Up @@ -73,6 +76,15 @@ Future channel types (LSP relay, MCP relay, …) introduce their own URI schemes

After subscribing, the client receives all messages scoped to that channel — both action envelopes (for state channels) and any channel-specific notifications.

### Delivery preferences

Clients MAY include `delivery.maxLatencyMs` on `subscribe` to request an upper
bound, in milliseconds, on intentional server-side buffering for that
subscription. Servers MAY use that budget to coalesce high-frequency updates
while preserving the same reduced state a client would observe from immediate
delivery. A value of `0` requests immediate delivery with no intentional
coalescing. Omitting `delivery` uses the server's default delivery behavior.

## Unsubscribe (Notification)

`unsubscribe` is a fire-and-forget client → server notification. Like every wire message, its params carry the channel URI being released.
Expand Down
Loading
Loading