feat(mq): add Pulsar input and output support#216
feat(mq): add Pulsar input and output support#216luoluoyuyu wants to merge 4 commits intoFunctionStream:mainfrom
Conversation
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| Box::new(std::io::Error::other(e)) | ||
| as Box<dyn std::error::Error + Send> | ||
| }) | ||
| })?; |
There was a problem hiding this comment.
SendFuture dropped without awaiting, losing delivery confirmation
High Severity
MessageBuilder::send_non_blocking() is an async method returning Result<SendFuture, Error>. The .await resolves the async method yielding a Result<SendFuture, ...>, and ? extracts the SendFuture, which is then silently dropped without being awaited. The SendFuture represents the actual broker delivery acknowledgment — dropping it means delivery errors are silently lost, risking undetected data loss. The SendFuture needs a second .await to confirm the message was actually delivered.
| thread_local! { | ||
| static PULSAR_RT: RefCell<Option<tokio::runtime::Runtime>> = const { RefCell::new(None) }; | ||
| static PULSAR_CONSUMER: RefCell<Option<Consumer<Vec<u8>, TokioExecutor>>> = const { RefCell::new(None) }; | ||
| } |
There was a problem hiding this comment.
Thread-local state shared across multiple protocol instances
Medium Severity
The Pulsar runtime and consumer/producer are stored in global thread_local! statics rather than in the struct. If two PulsarProtocol instances ever run on the same thread, the second would silently reuse the first's consumer/producer (configured for a different topic/subscription), causing messages to be consumed from or sent to the wrong topic. The Kafka implementation avoids this by storing state in the struct via OnceLock/Mutex.
Additional Locations (1)
There was a problem hiding this comment.
Pull request overview
This PR adds Apache Pulsar as a supported runtime input/output backend alongside Kafka by extending task config enums, wiring providers, and introducing new Pulsar protocol implementations plus dependencies.
Changes:
- Add
Pulsarvariants toInputConfigandOutputConfigand propagate runtime config accessors. - Implement Pulsar input/output protocols and corresponding config structs/modules.
- Wire Pulsar into
InputProvider/OutputProviderand addpulsar+futuresdependencies.
Reviewed changes
Copilot reviewed 12 out of 13 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
src/runtime/task/processor_config.rs |
Extends task configuration enums to include Pulsar inputs/outputs. |
src/runtime/input/protocol/mod.rs |
Exposes the new Pulsar input protocol module. |
src/runtime/input/protocol/pulsar/mod.rs |
Declares/re-exports Pulsar input protocol components. |
src/runtime/input/protocol/pulsar/config.rs |
Adds Pulsar consumer config struct. |
src/runtime/input/protocol/pulsar/pulsar_protocol.rs |
Implements the Pulsar input protocol (consumer/poll loop). |
src/runtime/input/input_provider.rs |
Instantiates Pulsar input runner from config. |
src/runtime/output/protocol/mod.rs |
Exposes the new Pulsar output protocol module. |
src/runtime/output/protocol/pulsar/mod.rs |
Declares/re-exports Pulsar output protocol components. |
src/runtime/output/protocol/pulsar/producer_config.rs |
Adds Pulsar producer config struct. |
src/runtime/output/protocol/pulsar/pulsar_protocol.rs |
Implements the Pulsar output protocol (producer/send/flush). |
src/runtime/output/output_provider.rs |
Instantiates Pulsar output runner from config. |
Cargo.toml |
Adds pulsar and futures dependencies. |
Cargo.lock |
Locks new transitive dependencies for Pulsar support. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| let sub_type = self | ||
| .config | ||
| .subscription_type | ||
| .as_deref() | ||
| .unwrap_or("Exclusive"); | ||
| let sub_type_enum = match sub_type.to_lowercase().as_str() { | ||
| "shared" => SubType::Shared, | ||
| "key_shared" => SubType::KeyShared, | ||
| "failover" => SubType::Failover, | ||
| _ => SubType::Exclusive, |
| match tokio::time::timeout(Duration::from_millis(timeout_ms), next_fut).await { | ||
| Ok(Some(Ok(msg))) => { | ||
| let payload = msg.payload.data.clone(); | ||
| let _ = consumer.ack(&msg).await; | ||
| Some(Ok(payload)) | ||
| } |
| pub struct PulsarConfig { | ||
| pub url: String, | ||
| pub topic: String, | ||
| pub subscription: String, | ||
| pub subscription_type: Option<String>, | ||
| pub properties: HashMap<String, String>, | ||
| } |
| pub properties: HashMap<String, String>, | ||
| } | ||
|
|
||
| impl PulsarProducerConfig { | ||
| pub fn new(url: String, topic: String, properties: HashMap<String, String>) -> Self { | ||
| Self { | ||
| url, | ||
| topic, | ||
| properties, |
| if url.is_empty() { | ||
| return Err(Box::new(std::io::Error::new( | ||
| std::io::ErrorKind::InvalidData, | ||
| format!( | ||
| "Invalid pulsar url in input config (group #{}): empty (topic: {})", | ||
| group_idx + 1, | ||
| topic | ||
| ), | ||
| )) as Box<dyn std::error::Error + Send>); | ||
| } |
| if url.is_empty() { | ||
| return Err(Box::new(std::io::Error::new( | ||
| std::io::ErrorKind::InvalidData, | ||
| format!( | ||
| "Invalid pulsar url in output config: empty (topic: {})", | ||
| topic | ||
| ), | ||
| )) as Box<dyn std::error::Error + Send>); | ||
| } |
| num_cpus = "1.0" | ||
| protocol = { path = "./protocol" } | ||
| rdkafka = { version = "0.38", features = ["cmake-build", "ssl", "gssapi"] } | ||
| pulsar = { version = "6", default-features = false, features = ["tokio-runtime"] } |


Note
Medium Risk
Introduces a new message-queue backend with new networking/TLS dependencies and runtime lifecycle behavior (per-thread Tokio runtime and consumer/producer initialization), which could impact throughput, resource usage, and error handling in production.
Overview
Adds Apache Pulsar support for both ingestion and egress.
Configuration now accepts
input-type: pulsarandoutput-type: pulsarwith new fields (url,topic, and for inputssubscription/subscription-type), plus validation for empty URLs. The runtime wires these configs through newPulsarProtocol/PulsarOutputProtocolimplementations that create a per-thread Tokio runtime and Pulsar consumer/producer topollmessages (with ack) andsend/flushoutput.Build dependencies are updated to include
pulsar(Tokio runtime) andfutures, expanding the lockfile with the required TLS/openssl and async crates.Written by Cursor Bugbot for commit ab5c6bf. This will update automatically on new commits. Configure here.