Skip to content

feat(mq): add Pulsar input and output support#216

Open
luoluoyuyu wants to merge 4 commits intoFunctionStream:mainfrom
luoluoyuyu:feature/pulsar-mq
Open

feat(mq): add Pulsar input and output support#216
luoluoyuyu wants to merge 4 commits intoFunctionStream:mainfrom
luoluoyuyu:feature/pulsar-mq

Conversation

@luoluoyuyu
Copy link
Collaborator

@luoluoyuyu luoluoyuyu commented Mar 11, 2026

Note

Medium Risk
Introduces a new message-queue backend with new networking/TLS dependencies and runtime lifecycle behavior (per-thread Tokio runtime and consumer/producer initialization), which could impact throughput, resource usage, and error handling in production.

Overview
Adds Apache Pulsar support for both ingestion and egress.

Configuration now accepts input-type: pulsar and output-type: pulsar with new fields (url, topic, and for inputs subscription/subscription-type), plus validation for empty URLs. The runtime wires these configs through new PulsarProtocol/PulsarOutputProtocol implementations that create a per-thread Tokio runtime and Pulsar consumer/producer to poll messages (with ack) and send/flush output.

Build dependencies are updated to include pulsar (Tokio runtime) and futures, expanding the lockfile with the required TLS/openssl and async crates.

Written by Cursor Bugbot for commit ab5c6bf. This will update automatically on new commits. Configure here.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Box::new(std::io::Error::other(e))
as Box<dyn std::error::Error + Send>
})
})?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SendFuture dropped without awaiting, losing delivery confirmation

High Severity

MessageBuilder::send_non_blocking() is an async method returning Result<SendFuture, Error>. The .await resolves the async method yielding a Result<SendFuture, ...>, and ? extracts the SendFuture, which is then silently dropped without being awaited. The SendFuture represents the actual broker delivery acknowledgment — dropping it means delivery errors are silently lost, risking undetected data loss. The SendFuture needs a second .await to confirm the message was actually delivered.

Fix in Cursor Fix in Web

thread_local! {
static PULSAR_RT: RefCell<Option<tokio::runtime::Runtime>> = const { RefCell::new(None) };
static PULSAR_CONSUMER: RefCell<Option<Consumer<Vec<u8>, TokioExecutor>>> = const { RefCell::new(None) };
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread-local state shared across multiple protocol instances

Medium Severity

The Pulsar runtime and consumer/producer are stored in global thread_local! statics rather than in the struct. If two PulsarProtocol instances ever run on the same thread, the second would silently reuse the first's consumer/producer (configured for a different topic/subscription), causing messages to be consumed from or sent to the wrong topic. The Kafka implementation avoids this by storing state in the struct via OnceLock/Mutex.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds Apache Pulsar as a supported runtime input/output backend alongside Kafka by extending task config enums, wiring providers, and introducing new Pulsar protocol implementations plus dependencies.

Changes:

  • Add Pulsar variants to InputConfig and OutputConfig and propagate runtime config accessors.
  • Implement Pulsar input/output protocols and corresponding config structs/modules.
  • Wire Pulsar into InputProvider / OutputProvider and add pulsar + futures dependencies.

Reviewed changes

Copilot reviewed 12 out of 13 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
src/runtime/task/processor_config.rs Extends task configuration enums to include Pulsar inputs/outputs.
src/runtime/input/protocol/mod.rs Exposes the new Pulsar input protocol module.
src/runtime/input/protocol/pulsar/mod.rs Declares/re-exports Pulsar input protocol components.
src/runtime/input/protocol/pulsar/config.rs Adds Pulsar consumer config struct.
src/runtime/input/protocol/pulsar/pulsar_protocol.rs Implements the Pulsar input protocol (consumer/poll loop).
src/runtime/input/input_provider.rs Instantiates Pulsar input runner from config.
src/runtime/output/protocol/mod.rs Exposes the new Pulsar output protocol module.
src/runtime/output/protocol/pulsar/mod.rs Declares/re-exports Pulsar output protocol components.
src/runtime/output/protocol/pulsar/producer_config.rs Adds Pulsar producer config struct.
src/runtime/output/protocol/pulsar/pulsar_protocol.rs Implements the Pulsar output protocol (producer/send/flush).
src/runtime/output/output_provider.rs Instantiates Pulsar output runner from config.
Cargo.toml Adds pulsar and futures dependencies.
Cargo.lock Locks new transitive dependencies for Pulsar support.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +63 to +72
let sub_type = self
.config
.subscription_type
.as_deref()
.unwrap_or("Exclusive");
let sub_type_enum = match sub_type.to_lowercase().as_str() {
"shared" => SubType::Shared,
"key_shared" => SubType::KeyShared,
"failover" => SubType::Failover,
_ => SubType::Exclusive,
Comment on lines +101 to +106
match tokio::time::timeout(Duration::from_millis(timeout_ms), next_fut).await {
Ok(Some(Ok(msg))) => {
let payload = msg.payload.data.clone();
let _ = consumer.ack(&msg).await;
Some(Ok(payload))
}
Comment on lines +17 to +23
pub struct PulsarConfig {
pub url: String,
pub topic: String,
pub subscription: String,
pub subscription_type: Option<String>,
pub properties: HashMap<String, String>,
}
Comment on lines +20 to +28
pub properties: HashMap<String, String>,
}

impl PulsarProducerConfig {
pub fn new(url: String, topic: String, properties: HashMap<String, String>) -> Self {
Self {
url,
topic,
properties,
Comment on lines +119 to +128
if url.is_empty() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Invalid pulsar url in input config (group #{}): empty (topic: {})",
group_idx + 1,
topic
),
)) as Box<dyn std::error::Error + Send>);
}
Comment on lines +102 to +110
if url.is_empty() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Invalid pulsar url in output config: empty (topic: {})",
topic
),
)) as Box<dyn std::error::Error + Send>);
}
num_cpus = "1.0"
protocol = { path = "./protocol" }
rdkafka = { version = "0.38", features = ["cmake-build", "ssl", "gssapi"] }
pulsar = { version = "6", default-features = false, features = ["tokio-runtime"] }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants