Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion crates/openfang-api/src/channel_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ use openfang_channels::discourse::DiscourseAdapter;
use openfang_channels::gitter::GitterAdapter;
use openfang_channels::gotify::GotifyAdapter;
use openfang_channels::linkedin::LinkedInAdapter;
use openfang_channels::mumble::MumbleAdapter;
use openfang_channels::mqtt::MqttAdapter;
use openfang_channels::mumble::MumbleAdapter;
use openfang_channels::ntfy::NtfyAdapter;
use openfang_channels::webhook::WebhookAdapter;
use openfang_channels::wecom::WeComAdapter;
Expand Down Expand Up @@ -880,6 +880,16 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
}
}

fn set_channel_context(
&self,
agent_id: AgentId,
context: openfang_types::ChannelCallbackContext,
) {
self.kernel
.active_channel_contexts
.insert(agent_id.0.to_string(), context);
}

async fn check_auto_reply(&self, agent_id: AgentId, message: &str) -> Option<String> {
// Check if auto-reply should fire for this message
let channel_type = "bridge"; // Generic; the bridge layer handles specifics
Expand Down
3 changes: 2 additions & 1 deletion crates/openfang-api/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,8 @@ pub async fn get_agent_session(
msg.get_mut("tools").and_then(|v| v.as_array_mut())
{
if let Some(tool_obj) = tools_arr.get_mut(tool_idx) {
tool_obj["result"] = serde_json::Value::String(result.clone());
tool_obj["result"] =
serde_json::Value::String(result.clone());
tool_obj["is_error"] =
serde_json::Value::Bool(*is_error);
}
Expand Down
22 changes: 22 additions & 0 deletions crates/openfang-channels/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,16 @@ pub trait ChannelBridgeHandle: Send + Sync {
// Default: no tracking
}

/// Store channel callback context for an agent so async tool results
/// can be delivered back to the originating channel.
fn set_channel_context(
&self,
_agent_id: AgentId,
_context: openfang_types::ChannelCallbackContext,
) {
// Default: no-op
}

/// Check if auto-reply is enabled and the message should trigger one.
/// Returns Some(reply_text) if auto-reply fires, None otherwise.
async fn check_auto_reply(&self, _agent_id: AgentId, _message: &str) -> Option<String> {
Expand Down Expand Up @@ -964,6 +974,18 @@ async fn dispatch_message(
text.clone()
};

// Store channel context so async tools can deliver results back here
handle.set_channel_context(
agent_id,
openfang_types::ChannelCallbackContext {
channel_type: adapter.name().to_string(),
reply_to_platform_id: message.sender.platform_id.clone(),
reply_to_display_name: message.sender.display_name.clone(),
thread_id: thread_id.map(String::from),
agent_id: agent_id.0.to_string(),
},
);

// Send to agent and relay response
let result = handle.send_message(agent_id, &prefixed_text).await;

Expand Down
2 changes: 1 addition & 1 deletion crates/openfang-channels/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ pub mod discourse;
pub mod gitter;
pub mod gotify;
pub mod linkedin;
pub mod mumble;
pub mod mqtt;
pub mod mumble;
pub mod ntfy;
pub mod webhook;
pub mod wecom;
8 changes: 3 additions & 5 deletions crates/openfang-channels/src/line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl LineAdapter {
diff |= a ^ b;
}
if diff != 0 {
let computed = base64::engine::general_purpose::STANDARD.encode(&result);
let computed = base64::engine::general_purpose::STANDARD.encode(result);
// Log first/last 4 chars of each signature for debugging without leaking full HMAC
let comp_redacted = format!(
"{}...{}",
Expand Down Expand Up @@ -381,8 +381,7 @@ impl ChannelAdapter for LineAdapter {
axum::routing::post({
let secret = Arc::clone(&channel_secret);
let tx = Arc::clone(&tx);
move |headers: axum::http::HeaderMap,
body: axum::body::Bytes| {
move |headers: axum::http::HeaderMap, body: axum::body::Bytes| {
let secret = Arc::clone(&secret);
let tx = Arc::clone(&tx);
async move {
Expand All @@ -404,8 +403,7 @@ impl ChannelAdapter for LineAdapter {
shutdown_rx: watch::channel(false).1,
};

if !signature.is_empty()
&& !adapter.verify_signature(&body, signature)
if !signature.is_empty() && !adapter.verify_signature(&body, signature)
{
warn!("LINE: invalid webhook signature");
return axum::http::StatusCode::UNAUTHORIZED;
Expand Down
8 changes: 6 additions & 2 deletions crates/openfang-channels/src/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ impl MqttAdapter {
}

/// Parse host:port string.
fn parse_host_port(s: &str, default_port: u16) -> Result<(String, u16), Box<dyn std::error::Error>> {
fn parse_host_port(
s: &str,
default_port: u16,
) -> Result<(String, u16), Box<dyn std::error::Error>> {
let s = s.trim();
if let Some(colon_pos) = s.rfind(':') {
let host = s[..colon_pos].to_string();
Expand Down Expand Up @@ -239,7 +242,8 @@ impl ChannelAdapter for MqttAdapter {

async fn start(
&self,
) -> Result<Pin<Box<dyn Stream<Item = ChannelMessage> + Send>>, Box<dyn std::error::Error>> {
) -> Result<Pin<Box<dyn Stream<Item = ChannelMessage> + Send>>, Box<dyn std::error::Error>>
{
let options = self.build_mqtt_options()?;
let (client, mut eventloop) = AsyncClient::new(options, 10);

Expand Down
71 changes: 57 additions & 14 deletions crates/openfang-kernel/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ pub struct OpenFangKernel {
/// Hot-reloadable default model override (set via config hot-reload, read at agent spawn).
pub default_model_override:
std::sync::RwLock<Option<openfang_types::config::DefaultModelConfig>>,
/// Active channel callback contexts for async agent delegation.
/// Keyed by agent UUID string. Set by the channel bridge before each message.
pub active_channel_contexts: dashmap::DashMap<String, openfang_types::ChannelCallbackContext>,
/// Per-agent message locks — serializes LLM calls for the same agent to prevent
/// session corruption when multiple messages arrive concurrently (e.g. rapid voice
/// messages via Telegram). Different agents can still run in parallel.
Expand Down Expand Up @@ -1047,6 +1050,7 @@ impl OpenFangKernel {
whatsapp_gateway_pid: Arc::new(std::sync::Mutex::new(None)),
channel_adapters: dashmap::DashMap::new(),
default_model_override: std::sync::RwLock::new(None),
active_channel_contexts: dashmap::DashMap::new(),
agent_msg_locks: dashmap::DashMap::new(),
self_handle: OnceLock::new(),
};
Expand Down Expand Up @@ -2889,20 +2893,16 @@ impl OpenFangKernel {
model: &str,
explicit_provider: Option<&str>,
) -> KernelResult<()> {
let catalog_entry = self
.model_catalog
.read()
.ok()
.and_then(|catalog| {
// When the caller specifies a provider, use provider-aware lookup
// so we resolve the model on the correct provider — not a builtin
// from a different provider that happens to share the same name (#833).
if let Some(ep) = explicit_provider {
catalog.find_model_for_provider(model, ep).cloned()
} else {
catalog.find_model(model).cloned()
}
});
let catalog_entry = self.model_catalog.read().ok().and_then(|catalog| {
// When the caller specifies a provider, use provider-aware lookup
// so we resolve the model on the correct provider — not a builtin
// from a different provider that happens to share the same name (#833).
if let Some(ep) = explicit_provider {
catalog.find_model_for_provider(model, ep).cloned()
} else {
catalog.find_model(model).cloned()
}
});
let provider = if let Some(ep) = explicit_provider {
// User explicitly set the provider — use it as-is
Some(ep.to_string())
Expand Down Expand Up @@ -5904,6 +5904,49 @@ impl KernelHandle for OpenFangKernel {
.collect()
}

fn get_channel_context(
&self,
agent_id: &str,
) -> Option<openfang_types::ChannelCallbackContext> {
self.active_channel_contexts
.get(agent_id)
.map(|r| r.value().clone())
}

fn set_channel_context(&self, agent_id: &str, context: openfang_types::ChannelCallbackContext) {
self.active_channel_contexts
.insert(agent_id.to_string(), context);
}

async fn inject_async_callback(
&self,
context: openfang_types::ChannelCallbackContext,
hand_name: &str,
result_text: &str,
) -> Result<(), String> {
// Build callback message and send it to the caller agent
let callback_msg =
format!("{hand_name} wrote: {result_text}\n\n(Present these findings to the user.)");
let agent_id: AgentId = context
.agent_id
.parse()
.map_err(|_| "Invalid agent ID in callback context".to_string())?;
let result = self
.send_message(agent_id, &callback_msg)
.await
.map_err(|e| format!("Callback send_message failed: {e}"))?;

// Deliver the agent's response to the originating channel
self.send_channel_message(
&context.channel_type,
&context.reply_to_platform_id,
&result.response,
context.thread_id.as_deref(),
)
.await
.map(|_| ())
}

fn touch_agent(&self, agent_id: &str) {
if let Ok(id) = agent_id.parse::<AgentId>() {
self.registry.touch(id);
Expand Down
10 changes: 8 additions & 2 deletions crates/openfang-runtime/src/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,10 @@ mod tests {
Message::assistant("Done reading."),
];
let adjusted = adjust_split_for_tool_pairs(&messages, 2);
assert_eq!(adjusted, 1, "Should pull back split to keep ToolUse + ToolResult together");
assert_eq!(
adjusted, 1,
"Should pull back split to keep ToolUse + ToolResult together"
);
}

#[test]
Expand All @@ -1489,7 +1492,10 @@ mod tests {
Message::user("c"),
];
let adjusted = adjust_split_for_tool_pairs(&messages, 1);
assert_eq!(adjusted, 1, "Should not change split for plain text messages");
assert_eq!(
adjusted, 1,
"Should not change split for plain text messages"
);
}

#[test]
Expand Down
24 changes: 18 additions & 6 deletions crates/openfang-runtime/src/context_overflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ fn safe_drain_boundary(messages: &[Message], mut boundary: usize) -> usize {
// is in the last drained message (boundary - 1). Pull boundary back by 1.
if messages[boundary].role == Role::User {
if let MessageContent::Blocks(blocks) = &messages[boundary].content {
let has_tool_result = blocks.iter().any(|b| matches!(b, ContentBlock::ToolResult { .. }));
let has_tool_result = blocks
.iter()
.any(|b| matches!(b, ContentBlock::ToolResult { .. }));
if has_tool_result && boundary > 0 && messages[boundary - 1].role == Role::Assistant {
if let MessageContent::Blocks(asst_blocks) = &messages[boundary - 1].content {
let has_tool_use = asst_blocks.iter().any(|b| matches!(b, ContentBlock::ToolUse { .. }));
let has_tool_use = asst_blocks
.iter()
.any(|b| matches!(b, ContentBlock::ToolUse { .. }));
if has_tool_use {
boundary -= 1;
debug!(
Expand Down Expand Up @@ -135,7 +139,8 @@ pub fn recover_from_overflow(
debug!(
estimated_tokens = estimated,
removing = remove,
"Stage 1: moderate trim to last {} messages", messages.len() - remove
"Stage 1: moderate trim to last {} messages",
messages.len() - remove
);
messages.drain(..remove);
// Re-check after trim
Expand All @@ -156,7 +161,8 @@ pub fn recover_from_overflow(
warn!(
estimated_tokens = estimate_tokens(messages, system_prompt, tools),
removing = remove,
"Stage 2: aggressive overflow compaction to last {} messages", messages.len() - remove
"Stage 2: aggressive overflow compaction to last {} messages",
messages.len() - remove
);
let summary = Message::user(format!(
"[System: {} earlier messages were removed due to context overflow. \
Expand Down Expand Up @@ -373,7 +379,10 @@ mod tests {
];
// Boundary 2 would cut between the assistant(ToolUse) at [1] and user(ToolResult) at [2].
let adjusted = safe_drain_boundary(&msgs, 2);
assert_eq!(adjusted, 1, "Should pull boundary back to keep the ToolUse/ToolResult pair together");
assert_eq!(
adjusted, 1,
"Should pull boundary back to keep the ToolUse/ToolResult pair together"
);
}

#[test]
Expand All @@ -385,7 +394,10 @@ mod tests {
Message::assistant("d"),
];
let adjusted = safe_drain_boundary(&msgs, 2);
assert_eq!(adjusted, 2, "Should not change boundary for plain text messages");
assert_eq!(
adjusted, 2,
"Should not change boundary for plain text messages"
);
}

#[test]
Expand Down
25 changes: 25 additions & 0 deletions crates/openfang-runtime/src/kernel_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,31 @@ pub trait KernelHandle: Send + Sync {
Err("Channel file data send not available".to_string())
}

/// Get the channel callback context for an agent (if one is active).
fn get_channel_context(
&self,
agent_id: &str,
) -> Option<openfang_types::ChannelCallbackContext> {
let _ = agent_id;
None
}

/// Store a channel callback context for an agent.
fn set_channel_context(&self, agent_id: &str, context: openfang_types::ChannelCallbackContext) {
let _ = (agent_id, context);
}

/// Inject an async callback result into a channel, bypassing the normal agent loop.
async fn inject_async_callback(
&self,
context: openfang_types::ChannelCallbackContext,
hand_name: &str,
result_text: &str,
) -> Result<(), String> {
let _ = (context, hand_name, result_text);
Err("Async callback injection not available".to_string())
}

/// Refresh an agent's last_active timestamp without changing any other state.
/// Called by the agent loop before long LLM calls to prevent heartbeat false-positives.
fn touch_agent(&self, agent_id: &str) {
Expand Down
Loading
Loading