Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 287 additions & 7 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ async-trait = "0.1"
num_cpus = "1.0"
protocol = { path = "./protocol" }
rdkafka = { version = "0.38", features = ["cmake-build", "ssl", "gssapi"] }
pulsar = { version = "6", default-features = false, features = ["tokio-runtime"] }
futures = "0.3"
crossbeam-channel = "0.5"
pest = "2.7"
pest_derive = "2.7"
Expand Down
37 changes: 37 additions & 0 deletions src/runtime/input/input_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,43 @@ impl InputProvider {
runtime,
)))
}
InputConfig::Pulsar {
url,
topic,
subscription,
subscription_type,
extra,
runtime: _,
} => {
use crate::runtime::input::InputRunner;
use crate::runtime::input::protocol::pulsar::{PulsarConfig, PulsarProtocol};

if url.is_empty() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Invalid pulsar url in input config (group #{}): empty (topic: {})",
group_idx + 1,
topic
),
)) as Box<dyn std::error::Error + Send>);
}
Comment on lines +119 to +128

let pulsar_config = PulsarConfig::new(
url.clone(),
topic.clone(),
subscription.clone(),
subscription_type.clone(),
extra.clone(),
);
let runtime = input_config.input_runtime_config();
Ok(Box::new(InputRunner::new(
PulsarProtocol::new(pulsar_config),
group_idx,
input_idx,
runtime,
)))
}
}
}
}
1 change: 1 addition & 0 deletions src/runtime/input/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
// limitations under the License.

pub mod kafka;
pub mod pulsar;
41 changes: 41 additions & 0 deletions src/runtime/input/protocol/pulsar/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

/// PulsarConfig - Pulsar consumer configuration
#[derive(Debug, Clone)]
pub struct PulsarConfig {
pub url: String,
pub topic: String,
pub subscription: String,
pub subscription_type: Option<String>,
pub properties: HashMap<String, String>,
}
Comment on lines +17 to +23

impl PulsarConfig {
pub fn new(
url: String,
topic: String,
subscription: String,
subscription_type: Option<String>,
properties: HashMap<String, String>,
) -> Self {
Self {
url,
topic,
subscription,
subscription_type,
properties,
}
}
}
17 changes: 17 additions & 0 deletions src/runtime/input/protocol/pulsar/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod config;
pub mod pulsar_protocol;

pub use config::PulsarConfig;
pub use pulsar_protocol::PulsarProtocol;
127 changes: 127 additions & 0 deletions src/runtime/input/protocol/pulsar/pulsar_protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::config::PulsarConfig;
use crate::runtime::buffer_and_event::BufferOrEvent;
use crate::runtime::input::input_protocol::InputProtocol;
use futures::StreamExt;
use pulsar::message::proto::command_subscribe::SubType;
use pulsar::{Consumer, Pulsar, TokioExecutor};
use std::cell::RefCell;
use std::time::Duration;

thread_local! {
static PULSAR_RT: RefCell<Option<tokio::runtime::Runtime>> = const { RefCell::new(None) };
static PULSAR_CONSUMER: RefCell<Option<Consumer<Vec<u8>, TokioExecutor>>> = const { RefCell::new(None) };
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread-local state shared across multiple protocol instances

Medium Severity

The Pulsar runtime and consumer/producer are stored in global thread_local! statics rather than in the struct. If two PulsarProtocol instances ever run on the same thread, the second would silently reuse the first's consumer/producer (configured for a different topic/subscription), causing messages to be consumed from or sent to the wrong topic. The Kafka implementation avoids this by storing state in the struct via OnceLock/Mutex.

Additional Locations (1)
Fix in Cursor Fix in Web


pub struct PulsarProtocol {
config: PulsarConfig,
}

impl PulsarProtocol {
pub fn new(config: PulsarConfig) -> Self {
Self { config }
}
}

impl InputProtocol for PulsarProtocol {
fn name(&self) -> String {
format!("pulsar-{}", self.config.topic)
}

fn init(&self) -> Result<(), Box<dyn std::error::Error + Send>> {
// Lazy init is done in poll() on the worker thread which owns the runtime/consumer.
Ok(())
}

fn poll(
&self,
timeout: Duration,
) -> Result<Option<BufferOrEvent>, Box<dyn std::error::Error + Send>> {
PULSAR_RT.with(|rt_cell| {
PULSAR_CONSUMER.with(|consumer_cell| {
let mut rt_opt = rt_cell.borrow_mut();
let mut consumer_opt = consumer_cell.borrow_mut();

if consumer_opt.is_none() {
let rt = tokio::runtime::Runtime::new().map_err(|e| {
Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>
})?;
let url = self.config.url.clone();
let topic = self.config.topic.clone();
let subscription = self.config.subscription.clone();
let sub_type = self
.config
.subscription_type
.as_deref()
.unwrap_or("Exclusive");
let sub_type_enum = match sub_type.to_lowercase().as_str() {
"shared" => SubType::Shared,
"key_shared" => SubType::KeyShared,
"failover" => SubType::Failover,
_ => SubType::Exclusive,
Comment on lines +63 to +72
};

let consumer: Consumer<Vec<u8>, _> = rt
.block_on(async {
let pulsar = Pulsar::builder(&url, TokioExecutor).build().await?;
let builder = pulsar
.consumer()
.with_topic(&topic)
.with_subscription(&subscription)
.with_subscription_type(sub_type_enum);
let consumer = builder.build().await?;
Result::<_, pulsar::Error>::Ok(consumer)
})
.map_err(|e| {
Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>
})?;

*rt_opt = Some(rt);
*consumer_opt = Some(consumer);
}

let rt = rt_opt.as_ref().unwrap();
let consumer = consumer_opt.as_mut().unwrap();

let timeout_ms = timeout.as_millis() as u64;
let topic = self.config.topic.clone();
let result = rt.block_on(async {
let next_fut = consumer.next();
match tokio::time::timeout(Duration::from_millis(timeout_ms), next_fut).await {
Ok(Some(Ok(msg))) => {
let payload = msg.payload.data.clone();
let _ = consumer.ack(&msg).await;
Some(Ok(payload))
}
Comment on lines +101 to +106
Ok(Some(Err(e))) => Some(Err(e)),
Ok(None) | Err(_) => None,
}
});

match result {
Some(Ok(payload)) => Ok(Some(BufferOrEvent::new_buffer(
payload,
Some(topic),
false,
false,
))),
Some(Err(e)) => {
Err(Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>)
}
None => Ok(None),
}
})
})
}
}
27 changes: 27 additions & 0 deletions src/runtime/output/output_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,33 @@ impl OutputProvider {
let runtime = output_config.output_runtime_config();
Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime)))
}
OutputConfig::Pulsar {
url,
topic,
extra,
runtime: _,
} => {
use crate::runtime::output::output_runner::OutputRunner;
use crate::runtime::output::protocol::pulsar::{
PulsarOutputProtocol, PulsarProducerConfig,
};

if url.is_empty() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Invalid pulsar url in output config: empty (topic: {})",
topic
),
)) as Box<dyn std::error::Error + Send>);
}
Comment on lines +102 to +110

let pulsar_config =
PulsarProducerConfig::new(url.clone(), topic.clone(), extra.clone());
let protocol = PulsarOutputProtocol::new(pulsar_config);
let runtime = output_config.output_runtime_config();
Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime)))
}
}
}
}
1 change: 1 addition & 0 deletions src/runtime/output/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
// Provides implementations of various output protocols

pub mod kafka;
pub mod pulsar;
17 changes: 17 additions & 0 deletions src/runtime/output/protocol/pulsar/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod producer_config;
pub mod pulsar_protocol;

pub use producer_config::PulsarProducerConfig;
pub use pulsar_protocol::PulsarOutputProtocol;
31 changes: 31 additions & 0 deletions src/runtime/output/protocol/pulsar/producer_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

/// PulsarProducerConfig - Pulsar producer configuration
#[derive(Debug, Clone)]
pub struct PulsarProducerConfig {
pub url: String,
pub topic: String,
pub properties: HashMap<String, String>,
}

impl PulsarProducerConfig {
pub fn new(url: String, topic: String, properties: HashMap<String, String>) -> Self {
Self {
url,
topic,
properties,
Comment on lines +20 to +28
}
}
}
Loading
Loading