Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions proto/tim-api/tim/api/thread/v1alpha1/thread_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ service ThreadService {
};
option (google.api.method_signature) = "path,working_directory";
}

// Compact a thread to reduce context size
rpc CompactThread(CompactThreadRequest) returns (CompactThreadResponse) {
option (google.api.http) = {
post: "/v1alpha1/{path=orgs/*/users/*/threads/*}:compact"
body: "*"
};
option (google.api.method_signature) = "path,compaction_config";
}
}

// GetThreadRequest is used to get a specific thread by its UID
Expand Down Expand Up @@ -293,6 +302,10 @@ message SubmitUserMessageRequest {
(buf.validate.field).required = true,
(google.api.field_behavior) = REQUIRED
];

// Optional compaction configuration for automatic compaction
// If provided and threshold is exceeded, the thread will be compacted before processing
optional CompactionConfig compaction_config = 3;
}

// Input for submitting a user message
Expand Down Expand Up @@ -365,3 +378,28 @@ message ConfigureThreadWorkingDirectoryRequest {
(buf.validate.field).string.min_len = 1
];
}

// CompactThreadRequest is used to manually compact a thread
message CompactThreadRequest {
// The resource path of the thread to compact
string path = 1 [
(google.api.field_behavior) = REQUIRED,
(aep.api.field_info).resource_reference = "tim.settlerlabs.com/thread",
(buf.validate.field).string.pattern = "^orgs/[a-fA-F0-9-]{36}/users/[a-fA-F0-9-]{36}/threads/[a-fA-F0-9-]{36}$"
];

// The compaction configuration
CompactionConfig compaction_config = 2 [
(buf.validate.field).required = true,
(google.api.field_behavior) = REQUIRED
];
}

// CompactThreadResponse is the response for compacting a thread
message CompactThreadResponse {
// The compaction result
CompactionResult result = 1 [
(buf.validate.field).required = true,
(google.api.field_behavior) = REQUIRED
];
}
108 changes: 108 additions & 0 deletions proto/tim-api/tim/api/thread/v1alpha1/thread_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,114 @@ message FileRestoration {
];
}

// Configuration for thread context compaction
message CompactionConfig {
// Threshold percentage (0.0-1.0) of model's max capacity at which to trigger compaction
// e.g., 0.8 means compact when context reaches 80% of model's max tokens
float threshold_percentage = 1 [
(buf.validate.field).float.gte = 0.0,
(buf.validate.field).float.lte = 1.0
];

// The compaction strategy to use
CompactionStrategy strategy = 2 [(buf.validate.field).enum.defined_only = true];

// Whether to create a fork (true) or compact in-place (false, for future use)
// Currently only fork mode is supported
bool create_fork = 3;

// Strategy-specific parameters
oneof parameters {
// Parameters for truncation strategy
TruncationParams truncation = 4;
// Parameters for summarization strategy
SummarizationParams summarization = 5;
// Parameters for importance-based strategy
ImportanceBasedParams importance_based = 6;
}
}

// Parameters for truncation strategy
message TruncationParams {
// Number of recent messages to keep (default: 10)
int32 keep_recent_messages = 1 [
(buf.validate.field).int32.gte = 1,
(buf.validate.field).int32.lte = 100
];
}

// Parameters for summarization strategy
message SummarizationParams {
// Target length for the summary in tokens (approximate)
int32 summary_length = 1 [
(buf.validate.field).int32.gte = 100,
(buf.validate.field).int32.lte = 10000
];

// Whether to preserve system messages
bool preserve_system_messages = 2;

// Number of recent messages to keep unchanged (will not be summarized)
int32 keep_recent_messages = 3 [
(buf.validate.field).int32.gte = 0,
(buf.validate.field).int32.lte = 50
];
}

// Parameters for importance-based strategy
message ImportanceBasedParams {
// Always preserve user messages
bool preserve_user_messages = 1;

// Always preserve tool calls and their results
bool preserve_tool_calls = 2;

// Summarize assistant text blocks that are not adjacent to tool calls
bool summarize_assistant_blocks = 3;

// Number of recent messages to keep unchanged regardless of importance
int32 keep_recent_messages = 4 [
(buf.validate.field).int32.gte = 0,
(buf.validate.field).int32.lte = 50
];
}

// Result of a compaction operation
message CompactionResult {
// The forked thread containing compacted messages
Thread forked_thread = 1;

// The strategy that was used
CompactionStrategy strategy = 2;

// Original message count before compaction
int32 original_message_count = 3;

// Compacted message count after compaction
int32 compacted_message_count = 4;

// Original total token count
int64 original_token_count = 5;

// Compacted total token count
int64 compacted_token_count = 6;

// UID of the thread context record
string thread_context_uid = 7 [(google.api.field_info).format = UUID4];
}

// CompactionStrategy defines the strategy to use for thread compaction
enum CompactionStrategy {
// Default unspecified
COMPACTION_STRATEGY_UNSPECIFIED = 0;
// Keep first message and last N messages, truncate middle
COMPACTION_STRATEGY_TRUNCATION = 1;
// Use LLM to summarize middle section, keep recent messages
COMPACTION_STRATEGY_SUMMARIZATION = 2;
// Keep user messages and tool calls, summarize assistant responses
COMPACTION_STRATEGY_IMPORTANCE_BASED = 3;
}

// An actor who may participate in creating LLM messages
enum LlmMessageRole {
// Default unspecified
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
syntax = "proto3";

package tim.api.thread_compaction.v1alpha1;

import "aep/api/field_info.proto";
import "buf/validate/validate.proto";
import "google/api/annotations.proto";
import "google/api/client.proto";
import "google/api/field_behavior.proto";
import "tim/api/thread/v1alpha1/thread_types.proto";

// ThreadCompactionService is an internal service for handling thread compaction results from workers
service ThreadCompactionService {
// Push compaction result from worker to API server
rpc PushCompactionResult(PushCompactionResultRequest) returns (PushCompactionResultResponse) {
option (google.api.http) = {
post: "/v1alpha1/{thread=orgs/*/users/*/threads/*}:pushCompactionResult"
body: "*"
};
option (google.api.method_signature) = "thread,result";
}
}

// PushCompactionResultRequest is used to push a compaction result from worker to API
message PushCompactionResultRequest {
// The resource path of the original thread that was compacted
string thread = 1 [
(google.api.field_behavior) = REQUIRED,
(aep.api.field_info).resource_reference = "tim.settlerlabs.com/thread",
(buf.validate.field).string.pattern = "^orgs/[a-fA-F0-9-]{36}/users/[a-fA-F0-9-]{36}/threads/[a-fA-F0-9-]{36}$"
];

// The compaction result
tim.api.thread.v1alpha1.CompactionResult result = 2 [
(buf.validate.field).required = true,
(google.api.field_behavior) = REQUIRED
];
}

// PushCompactionResultResponse is the response after pushing compaction result
message PushCompactionResultResponse {
// Acknowledgment that the result was received and processed
bool success = 1;
}
58 changes: 56 additions & 2 deletions shared/llm/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,60 @@ func ValidateModelID(modelID string) error {
}

type Model struct {
ID ModelID
APIModel string
ID ModelID
APIModel string
MaxContextTokens int
}

// GetModelMaxTokens returns the maximum context tokens for a given model ID
func GetModelMaxTokens(modelID ModelID) (int, error) {
if !availableModelsMap[modelID] {
return 0, fmt.Errorf("invalid model ID: %s", modelID)
}

// These values should match what providers return in their Models() map
// But we define them here as a centralized reference
switch modelID {
// Anthropic models
case ModelIDClaude45Sonnet:
return 200000, nil
case ModelIDClaude4Opus:
return 200000, nil
case ModelIDClaude45Haiku:
return 200000, nil
// OpenAI models
case ModelIDGPT5:
return 128000, nil
case ModelIDGPT5Mini:
return 128000, nil
case ModelIDGPT5Nano:
return 128000, nil
case ModelIDO3:
return 200000, nil
case ModelIDO4Mini:
return 128000, nil
// Google models
case ModelIDGemini25Pro:
return 2097152, nil // 2M tokens
case ModelIDGemini25Flash:
return 1048576, nil // 1M tokens
case ModelIDGemini25FlashLite:
return 1048576, nil // 1M tokens
default:
return 0, fmt.Errorf("max context tokens not defined for model: %s", modelID)
}
}

// CalculateThresholdTokens calculates the token threshold based on model capacity and percentage
func CalculateThresholdTokens(modelID ModelID, thresholdPct float32) (int, error) {
maxTokens, err := GetModelMaxTokens(modelID)
if err != nil {
return 0, err
}

if thresholdPct < 0.0 || thresholdPct > 1.0 {
return 0, fmt.Errorf("threshold percentage must be between 0.0 and 1.0, got: %f", thresholdPct)
}

return int(float32(maxTokens) * thresholdPct), nil
}
18 changes: 18 additions & 0 deletions tests/system/framework/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,24 @@ def configure_thread_working_directory(self, thread_path: str, working_directory
)
response.raise_for_status()

def compact_thread(self, thread_path: str, compaction_config: dict[str, Any]) -> dict[str, Any]:
"""Compact a thread to reduce context size.

Args:
thread_path: Path to the thread to compact
compaction_config: Compaction configuration including strategy and parameters

Returns:
The compaction result containing the forked thread and stats
"""
response = self.client.post(
self._url(f"{thread_path}:compact"),
headers=self._default_headers,
json={"compaction_config": compaction_config},
)
response.raise_for_status()
return response.json()

def submit_tool_result(
self,
thread_path: str,
Expand Down
2 changes: 1 addition & 1 deletion tests/system/framework/streaming_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,6 @@ def stream_events_background(
except Exception as e:
collector.error = e
if verbose:
print(f"\n Stream error (expected for test): {type(e).__name__}", flush=True)
print(f"\n Stream connection closed: {type(e).__name__}", flush=True)
finally:
collector.stream_ended.set()
Loading
Loading