From 690b722d8b7fd7963b9da4b817ad6a627d175ea9 Mon Sep 17 00:00:00 2001 From: lankerens Date: Sun, 7 Jun 2026 12:26:40 +0800 Subject: [PATCH] feat(channel): add WeChat typing indicator support --- crates/aionui-channel/src/manager.rs | 16 ++++ crates/aionui-channel/src/plugin.rs | 6 ++ .../aionui-channel/src/plugins/weixin/api.rs | 78 ++++++++++++++++++- .../src/plugins/weixin/plugin.rs | 49 +++++++++++- .../src/plugins/weixin/types.rs | 21 +++++ crates/aionui-channel/src/stream_relay.rs | 39 ++++++++++ 6 files changed, 206 insertions(+), 3 deletions(-) diff --git a/crates/aionui-channel/src/manager.rs b/crates/aionui-channel/src/manager.rs index 2d874e275..ae2b61ccf 100644 --- a/crates/aionui-channel/src/manager.rs +++ b/crates/aionui-channel/src/manager.rs @@ -523,6 +523,22 @@ impl crate::stream_relay::ChannelSender for ChannelManager { ) -> Result<(), crate::error::ChannelError> { self.edit_message(plugin_id, chat_id, message_id, message).await } + + async fn start_typing(&self, plugin_id: &str, chat_id: &str) { + if let Some(plugin) = self.plugins.get(plugin_id) { + plugin.start_typing(chat_id).await; + } else { + warn!(%plugin_id, %chat_id, "start_typing: plugin not found"); + } + } + + async fn stop_typing(&self, plugin_id: &str, chat_id: &str) { + if let Some(plugin) = self.plugins.get(plugin_id) { + plugin.stop_typing(chat_id).await; + } else { + warn!(%plugin_id, %chat_id, "stop_typing: plugin not found"); + } + } } #[cfg(test)] diff --git a/crates/aionui-channel/src/plugin.rs b/crates/aionui-channel/src/plugin.rs index f192f7e7e..c905e149d 100644 --- a/crates/aionui-channel/src/plugin.rs +++ b/crates/aionui-channel/src/plugin.rs @@ -77,6 +77,12 @@ pub trait ChannelPlugin: Send + Sync { /// The most recent error message, if status is `Error`. fn last_error(&self) -> Option<&str>; + + /// Start typing indicator for a chat. Default no-op. + async fn start_typing(&self, _chat_id: &str) {} + + /// Stop typing indicator for a chat. Default no-op. + async fn stop_typing(&self, _chat_id: &str) {} } #[cfg(test)] diff --git a/crates/aionui-channel/src/plugins/weixin/api.rs b/crates/aionui-channel/src/plugins/weixin/api.rs index 8f15fa4f2..3cca3736f 100644 --- a/crates/aionui-channel/src/plugins/weixin/api.rs +++ b/crates/aionui-channel/src/plugins/weixin/api.rs @@ -11,8 +11,9 @@ use crate::constants::{WEIXIN_API_TIMEOUT, WEIXIN_POLL_TIMEOUT}; use crate::error::ChannelError; use super::types::{ - GetUpdatesRequest, GetUpdatesResponse, ILinkResponse, ITEM_TYPE_TEXT, QrCodeData, QrCodeStatusData, - SendMessageItem, SendMessageMsg, SendMessageRequest, SendTextItem, + GetConfigRequest, GetUpdatesRequest, GetUpdatesResponse, ILinkResponse, ITEM_TYPE_TEXT, + QrCodeData, QrCodeStatusData, SendMessageItem, SendMessageMsg, SendMessageRequest, + SendTextItem, SendTypingRequest, }; /// HTTP client for the WeChat iLink Bot API. @@ -213,6 +214,79 @@ impl WeixinApi { Ok(()) } + + /// Fetch bot config including typing_ticket. + /// + /// `POST /ilink/bot/getconfig` + /// Requires `ilink_user_id` and optionally `context_token` (from incoming message). + pub async fn get_config( + &self, + ilink_user_id: &str, + context_token: Option<&str>, + ) -> Result { + let body = GetConfigRequest { + ilink_user_id: ilink_user_id.to_string(), + context_token: context_token.map(String::from), + }; + let resp: serde_json::Value = self + .authenticated_post("ilink/bot/getconfig", &body, WEIXIN_API_TIMEOUT) + .await + .map_err(|e| ChannelError::PlatformApi(format!("getconfig failed: {e}")))?; + + // Check for API-level errors + let ret = resp.get("ret").and_then(|v| v.as_i64()).unwrap_or(0); + if ret != 0 { + let errcode = resp.get("errcode").and_then(|v| v.as_i64()).unwrap_or(0); + let errmsg = resp + .get("errmsg") + .and_then(|v| v.as_str()) + .unwrap_or("unknown error"); + return Err(ChannelError::PlatformApi(format!( + "getconfig error: ret={ret}, errcode={errcode}, errmsg={errmsg}" + ))); + } + + resp.get("typing_ticket") + .and_then(|v| v.as_str()) + .map(String::from) + .ok_or_else(|| ChannelError::PlatformApi("getconfig missing typing_ticket".into())) + } + + /// Send or stop typing indicator. + /// + /// `POST /ilink/bot/sendtyping` + /// status: 1 = start, 2 = stop + pub async fn send_typing( + &self, + ilink_user_id: &str, + typing_ticket: &str, + status: i32, + ) -> Result<(), ChannelError> { + let body = SendTypingRequest { + ilink_user_id: ilink_user_id.to_string(), + typing_ticket: typing_ticket.to_string(), + status, + }; + let resp: serde_json::Value = self + .authenticated_post("ilink/bot/sendtyping", &body, WEIXIN_API_TIMEOUT) + .await + .map_err(|e| ChannelError::PlatformApi(format!("sendtyping failed: {e}")))?; + + // Check for API-level errors + let ret = resp.get("ret").and_then(|v| v.as_i64()).unwrap_or(0); + if ret != 0 { + let errcode = resp.get("errcode").and_then(|v| v.as_i64()).unwrap_or(0); + let errmsg = resp + .get("errmsg") + .and_then(|v| v.as_str()) + .unwrap_or("unknown error"); + return Err(ChannelError::PlatformApi(format!( + "sendtyping error: ret={ret}, errcode={errcode}, errmsg={errmsg}" + ))); + } + + Ok(()) + } } #[cfg(test)] diff --git a/crates/aionui-channel/src/plugins/weixin/plugin.rs b/crates/aionui-channel/src/plugins/weixin/plugin.rs index d832c3bf9..22be840dd 100644 --- a/crates/aionui-channel/src/plugins/weixin/plugin.rs +++ b/crates/aionui-channel/src/plugins/weixin/plugin.rs @@ -16,7 +16,7 @@ use crate::types::{ }; use super::api::WeixinApi; -use super::types::{ITEM_TYPE_TEXT, ITEM_TYPE_VOICE, WeixinRawItem, WeixinRawMessage}; +use super::types::{ITEM_TYPE_TEXT, ITEM_TYPE_VOICE, TYPING_START, TYPING_STOP, WeixinRawItem, WeixinRawMessage}; /// Default base URL for the iLink Bot API. const DEFAULT_BASE_URL: &str = "https://ilinkai.weixin.qq.com"; @@ -34,6 +34,7 @@ pub struct WeixinPlugin { poll_handle: Option>, shutdown_tx: Option>, context_tokens: Arc>, + typing_tickets: Arc>, // user_id → typing_ticket } impl Default for WeixinPlugin { @@ -46,6 +47,7 @@ impl Default for WeixinPlugin { poll_handle: None, shutdown_tx: None, context_tokens: Arc::new(DashMap::new()), + typing_tickets: Arc::new(DashMap::new()), } } } @@ -147,6 +149,7 @@ impl ChannelPlugin for WeixinPlugin { self.api = None; self.context_tokens.clear(); + self.typing_tickets.clear(); self.status = PluginStatus::Stopped; info!("WeChat plugin stopped"); Ok(()) @@ -195,6 +198,50 @@ impl ChannelPlugin for WeixinPlugin { fn last_error(&self) -> Option<&str> { self.last_error.as_deref() } + + async fn start_typing(&self, chat_id: &str) { + let api = match &self.api { + Some(api) => api, + None => return, + }; + + // Fetch typing ticket if not cached + if !self.typing_tickets.contains_key(chat_id) { + let context_token = self.context_tokens.get(chat_id).map(|v| v.clone()); + debug!(chat_id=%chat_id, has_context_token=context_token.is_some(), "Fetching typing ticket via get_config"); + match api.get_config(chat_id, context_token.as_deref()).await { + Ok(ticket) => { + debug!(chat_id=%chat_id, ticket_len=ticket.len(), "Got typing ticket from get_config"); + self.typing_tickets.insert(chat_id.to_string(), ticket); + } + Err(e) => { + warn!(chat_id=%chat_id, error=%e, "Failed to fetch typing ticket from WeChat get_config"); + return; + } + } + } + + if let Some(ticket) = self.typing_tickets.get(chat_id) { + if let Err(e) = api.send_typing(chat_id, &ticket, TYPING_START).await { + warn!(chat_id=%chat_id, error=%e, "Failed to send typing start to WeChat"); + self.typing_tickets.remove(chat_id); + } + } + } + + async fn stop_typing(&self, chat_id: &str) { + let api = match &self.api { + Some(api) => api, + None => return, + }; + + if let Some(ticket) = self.typing_tickets.get(chat_id) { + if let Err(e) = api.send_typing(chat_id, &ticket, TYPING_STOP).await { + warn!(chat_id=%chat_id, error=%e, "Failed to send typing stop to WeChat"); + self.typing_tickets.remove(chat_id); + } + } + } } // --------------------------------------------------------------------------- diff --git a/crates/aionui-channel/src/plugins/weixin/types.rs b/crates/aionui-channel/src/plugins/weixin/types.rs index ec2ced513..b07078e54 100644 --- a/crates/aionui-channel/src/plugins/weixin/types.rs +++ b/crates/aionui-channel/src/plugins/weixin/types.rs @@ -178,6 +178,27 @@ pub(crate) struct SendTextItem { // SSE event payloads (frontend-facing — DO NOT CHANGE field names) // --------------------------------------------------------------------------- +// --- typing ticket --- +#[derive(Debug, Serialize)] +#[serde(rename_all = "snake_case")] +pub(crate) struct GetConfigRequest { + pub ilink_user_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub context_token: Option, +} + +// --- sendtyping --- +pub(crate) const TYPING_START: i32 = 1; +pub(crate) const TYPING_STOP: i32 = 2; + +#[derive(Debug, Serialize)] +#[serde(rename_all = "snake_case")] +pub(crate) struct SendTypingRequest { + pub ilink_user_id: String, + pub typing_ticket: String, + pub status: i32, +} + #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] pub(crate) struct SseQrEvent { diff --git a/crates/aionui-channel/src/stream_relay.rs b/crates/aionui-channel/src/stream_relay.rs index 892be0f82..e3f2390f1 100644 --- a/crates/aionui-channel/src/stream_relay.rs +++ b/crates/aionui-channel/src/stream_relay.rs @@ -39,6 +39,12 @@ pub trait ChannelSender: Send + Sync { message_id: &str, message: UnifiedOutgoingMessage, ) -> Result<(), ChannelError>; + + /// Start typing indicator for a chat. Default no-op. + async fn start_typing(&self, _plugin_id: &str, _chat_id: &str) {} + + /// Stop typing indicator for a chat. Default no-op. + async fn stop_typing(&self, _plugin_id: &str, _chat_id: &str) {} } /// Relays agent stream events to an IM platform. @@ -72,6 +78,19 @@ impl ChannelStreamRelay { let mut text_buffer = String::new(); let mut has_content = false; + // Start continuous typing indicator — refreshes every 2s like Hermes + let keep_typing = { + let sender = Arc::clone(&self.sender); + let plugin_id = self.config.plugin_id.clone(); + let chat_id = self.config.chat_id.clone(); + tokio::spawn(async move { + loop { + sender.start_typing(&plugin_id, &chat_id).await; + tokio::time::sleep(Duration::from_secs(2)).await; + } + }) + }; + loop { match rx.recv().await { Ok(event) => match ChannelMessageService::process_stream_event(&event) { @@ -100,6 +119,14 @@ impl ChannelStreamRelay { .send_message(&self.config.plugin_id, &self.config.chat_id, final_msg) .await; } + + keep_typing.abort(); + + // Stop typing after sending message to avoid gap + self.sender + .stop_typing(&self.config.plugin_id, &self.config.chat_id) + .await; + info!( plugin_id = %self.config.plugin_id, chat_id = %self.config.chat_id, @@ -126,6 +153,12 @@ impl ChannelStreamRelay { .sender .send_message(&self.config.plugin_id, &self.config.chat_id, error_msg) .await; + + keep_typing.abort(); + + self.sender + .stop_typing(&self.config.plugin_id, &self.config.chat_id) + .await; break; } None => {} @@ -139,6 +172,12 @@ impl ChannelStreamRelay { .send_message(&self.config.plugin_id, &self.config.chat_id, final_msg) .await; } + + keep_typing.abort(); + + self.sender + .stop_typing(&self.config.plugin_id, &self.config.chat_id) + .await; break; } Err(broadcast::error::RecvError::Lagged(n)) => {