diff --git a/Cargo.lock b/Cargo.lock index 26f07400..6733df24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -210,6 +210,18 @@ dependencies = [ "num", ] +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -469,7 +481,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8144c22e24bbcf26ade86cb6501a0916c46b7e4787abdb0045a467eb1645a1d" dependencies = [ "ambient-authority", - "rand", + "rand 0.8.5", ] [[package]] @@ -637,6 +649,15 @@ dependencies = [ "unicode-width 0.2.2", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-random" version = "0.1.18" @@ -657,6 +678,16 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -822,6 +853,21 @@ version = "0.128.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4e378a54e7168a689486d67ee1f818b7e5356e54ae51a1d7a53f4f13f7f8b7a" +[[package]] +name = "crc" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32fast" version = "1.5.0" @@ -1053,6 +1099,27 @@ version = "3.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dea2df4cf52843e0452895c455a1a2cfbb842a1e7329671acf418fdc53ed4c59" +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -1116,6 +1183,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -1149,6 +1231,7 @@ dependencies = [ "bincode", "clap", "crossbeam-channel", + "futures", "log", "lru", "num_cpus", @@ -1157,6 +1240,7 @@ dependencies = [ "pest_derive", "proctitle", "protocol", + "pulsar", "rdkafka", "rocksdb", "serde", @@ -1199,6 +1283,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -1221,12 +1306,34 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -1248,6 +1355,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1649,7 +1757,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af1955a75fa080c677d3972822ec4bad316169ab1cfc6c257a942c2265dbe5fe" dependencies = [ "bitmaps", - "rand_core", + "rand_core 0.6.4", "rand_xoshiro", "sized-chunks", "typenum", @@ -2061,6 +2169,29 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "murmur3" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" + +[[package]] +name = "native-tls" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -2237,6 +2368,38 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "openssl" +version = "0.10.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" +dependencies = [ + "bitflags 2.10.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-probe" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" + [[package]] name = "openssl-sys" version = "0.9.111" @@ -2259,6 +2422,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -2288,6 +2457,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64", + "serde_core", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -2556,6 +2735,35 @@ dependencies = [ "syn", ] +[[package]] +name = "pulsar" +version = "6.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4493cb89dac20434cceeab8c1d21b59f703892a033e64e1de06a1a5d8f88169f" +dependencies = [ + "async-channel", + "async-trait", + "bytes", + "chrono", + "crc", + "futures", + "log", + "murmur3", + "native-tls", + "nom", + "pem", + "prost", + "prost-build", + "prost-derive", + "rand 0.8.5", + "regex", + "tokio", + "tokio-native-tls", + "tokio-util", + "url", + "uuid", +] + [[package]] name = "quote" version = "1.0.42" @@ -2588,8 +2796,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.5", ] [[package]] @@ -2599,7 +2817,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.5", ] [[package]] @@ -2611,13 +2839,22 @@ dependencies = [ "getrandom 0.2.16", ] +[[package]] +name = "rand_core" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" +dependencies = [ + "getrandom 0.3.4", +] + [[package]] name = "rand_xoshiro" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" dependencies = [ - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -2856,12 +3093,44 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "schannel" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "3.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d17b898a6d6948c3a8ee4372c17cb384f90d2e6e912ef00895b14fd7ab54ec38" +dependencies = [ + "bitflags 2.10.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.27" @@ -3267,6 +3536,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.18" @@ -3397,7 +3676,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -3605,6 +3884,7 @@ checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ "getrandom 0.3.4", "js-sys", + "rand 0.9.2", "wasm-bindgen", ] diff --git a/Cargo.toml b/Cargo.toml index 4b855aa9..ca059c3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,8 @@ async-trait = "0.1" num_cpus = "1.0" protocol = { path = "./protocol" } rdkafka = { version = "0.38", features = ["cmake-build", "ssl", "gssapi"] } +pulsar = { version = "6", default-features = false, features = ["tokio-runtime"] } +futures = "0.3" crossbeam-channel = "0.5" pest = "2.7" pest_derive = "2.7" diff --git a/src/runtime/input/input_provider.rs b/src/runtime/input/input_provider.rs index 3f6606cd..e9fd20c1 100644 --- a/src/runtime/input/input_provider.rs +++ b/src/runtime/input/input_provider.rs @@ -105,6 +105,43 @@ impl InputProvider { runtime, ))) } + InputConfig::Pulsar { + url, + topic, + subscription, + subscription_type, + extra, + runtime: _, + } => { + use crate::runtime::input::InputRunner; + use crate::runtime::input::protocol::pulsar::{PulsarConfig, PulsarProtocol}; + + if url.is_empty() { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Invalid pulsar url in input config (group #{}): empty (topic: {})", + group_idx + 1, + topic + ), + )) as Box); + } + + let pulsar_config = PulsarConfig::new( + url.clone(), + topic.clone(), + subscription.clone(), + subscription_type.clone(), + extra.clone(), + ); + let runtime = input_config.input_runtime_config(); + Ok(Box::new(InputRunner::new( + PulsarProtocol::new(pulsar_config), + group_idx, + input_idx, + runtime, + ))) + } } } } diff --git a/src/runtime/input/protocol/mod.rs b/src/runtime/input/protocol/mod.rs index b9574391..773172f3 100644 --- a/src/runtime/input/protocol/mod.rs +++ b/src/runtime/input/protocol/mod.rs @@ -11,3 +11,4 @@ // limitations under the License. pub mod kafka; +pub mod pulsar; diff --git a/src/runtime/input/protocol/pulsar/config.rs b/src/runtime/input/protocol/pulsar/config.rs new file mode 100644 index 00000000..d8a25e66 --- /dev/null +++ b/src/runtime/input/protocol/pulsar/config.rs @@ -0,0 +1,41 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +/// PulsarConfig - Pulsar consumer configuration +#[derive(Debug, Clone)] +pub struct PulsarConfig { + pub url: String, + pub topic: String, + pub subscription: String, + pub subscription_type: Option, + pub properties: HashMap, +} + +impl PulsarConfig { + pub fn new( + url: String, + topic: String, + subscription: String, + subscription_type: Option, + properties: HashMap, + ) -> Self { + Self { + url, + topic, + subscription, + subscription_type, + properties, + } + } +} diff --git a/src/runtime/input/protocol/pulsar/mod.rs b/src/runtime/input/protocol/pulsar/mod.rs new file mode 100644 index 00000000..2f85909f --- /dev/null +++ b/src/runtime/input/protocol/pulsar/mod.rs @@ -0,0 +1,17 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod config; +pub mod pulsar_protocol; + +pub use config::PulsarConfig; +pub use pulsar_protocol::PulsarProtocol; diff --git a/src/runtime/input/protocol/pulsar/pulsar_protocol.rs b/src/runtime/input/protocol/pulsar/pulsar_protocol.rs new file mode 100644 index 00000000..1fb5395d --- /dev/null +++ b/src/runtime/input/protocol/pulsar/pulsar_protocol.rs @@ -0,0 +1,127 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::config::PulsarConfig; +use crate::runtime::buffer_and_event::BufferOrEvent; +use crate::runtime::input::input_protocol::InputProtocol; +use futures::StreamExt; +use pulsar::message::proto::command_subscribe::SubType; +use pulsar::{Consumer, Pulsar, TokioExecutor}; +use std::cell::RefCell; +use std::time::Duration; + +thread_local! { + static PULSAR_RT: RefCell> = const { RefCell::new(None) }; + static PULSAR_CONSUMER: RefCell, TokioExecutor>>> = const { RefCell::new(None) }; +} + +pub struct PulsarProtocol { + config: PulsarConfig, +} + +impl PulsarProtocol { + pub fn new(config: PulsarConfig) -> Self { + Self { config } + } +} + +impl InputProtocol for PulsarProtocol { + fn name(&self) -> String { + format!("pulsar-{}", self.config.topic) + } + + fn init(&self) -> Result<(), Box> { + // Lazy init is done in poll() on the worker thread which owns the runtime/consumer. + Ok(()) + } + + fn poll( + &self, + timeout: Duration, + ) -> Result, Box> { + PULSAR_RT.with(|rt_cell| { + PULSAR_CONSUMER.with(|consumer_cell| { + let mut rt_opt = rt_cell.borrow_mut(); + let mut consumer_opt = consumer_cell.borrow_mut(); + + if consumer_opt.is_none() { + let rt = tokio::runtime::Runtime::new().map_err(|e| { + Box::new(std::io::Error::other(e)) as Box + })?; + let url = self.config.url.clone(); + let topic = self.config.topic.clone(); + let subscription = self.config.subscription.clone(); + let sub_type = self + .config + .subscription_type + .as_deref() + .unwrap_or("Exclusive"); + let sub_type_enum = match sub_type.to_lowercase().as_str() { + "shared" => SubType::Shared, + "key_shared" => SubType::KeyShared, + "failover" => SubType::Failover, + _ => SubType::Exclusive, + }; + + let consumer: Consumer, _> = rt + .block_on(async { + let pulsar = Pulsar::builder(&url, TokioExecutor).build().await?; + let builder = pulsar + .consumer() + .with_topic(&topic) + .with_subscription(&subscription) + .with_subscription_type(sub_type_enum); + let consumer = builder.build().await?; + Result::<_, pulsar::Error>::Ok(consumer) + }) + .map_err(|e| { + Box::new(std::io::Error::other(e)) as Box + })?; + + *rt_opt = Some(rt); + *consumer_opt = Some(consumer); + } + + let rt = rt_opt.as_ref().unwrap(); + let consumer = consumer_opt.as_mut().unwrap(); + + let timeout_ms = timeout.as_millis() as u64; + let topic = self.config.topic.clone(); + let result = rt.block_on(async { + let next_fut = consumer.next(); + match tokio::time::timeout(Duration::from_millis(timeout_ms), next_fut).await { + Ok(Some(Ok(msg))) => { + let payload = msg.payload.data.clone(); + let _ = consumer.ack(&msg).await; + Some(Ok(payload)) + } + Ok(Some(Err(e))) => Some(Err(e)), + Ok(None) | Err(_) => None, + } + }); + + match result { + Some(Ok(payload)) => Ok(Some(BufferOrEvent::new_buffer( + payload, + Some(topic), + false, + false, + ))), + Some(Err(e)) => { + Err(Box::new(std::io::Error::other(e)) as Box) + } + None => Ok(None), + } + }) + }) + } +} diff --git a/src/runtime/output/output_provider.rs b/src/runtime/output/output_provider.rs index c6d01fef..7bce5597 100644 --- a/src/runtime/output/output_provider.rs +++ b/src/runtime/output/output_provider.rs @@ -88,6 +88,33 @@ impl OutputProvider { let runtime = output_config.output_runtime_config(); Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime))) } + OutputConfig::Pulsar { + url, + topic, + extra, + runtime: _, + } => { + use crate::runtime::output::output_runner::OutputRunner; + use crate::runtime::output::protocol::pulsar::{ + PulsarOutputProtocol, PulsarProducerConfig, + }; + + if url.is_empty() { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Invalid pulsar url in output config: empty (topic: {})", + topic + ), + )) as Box); + } + + let pulsar_config = + PulsarProducerConfig::new(url.clone(), topic.clone(), extra.clone()); + let protocol = PulsarOutputProtocol::new(pulsar_config); + let runtime = output_config.output_runtime_config(); + Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime))) + } } } } diff --git a/src/runtime/output/protocol/mod.rs b/src/runtime/output/protocol/mod.rs index 20e4d1c5..3ff03fa6 100644 --- a/src/runtime/output/protocol/mod.rs +++ b/src/runtime/output/protocol/mod.rs @@ -15,3 +15,4 @@ // Provides implementations of various output protocols pub mod kafka; +pub mod pulsar; diff --git a/src/runtime/output/protocol/pulsar/mod.rs b/src/runtime/output/protocol/pulsar/mod.rs new file mode 100644 index 00000000..37e4feb9 --- /dev/null +++ b/src/runtime/output/protocol/pulsar/mod.rs @@ -0,0 +1,17 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod producer_config; +pub mod pulsar_protocol; + +pub use producer_config::PulsarProducerConfig; +pub use pulsar_protocol::PulsarOutputProtocol; diff --git a/src/runtime/output/protocol/pulsar/producer_config.rs b/src/runtime/output/protocol/pulsar/producer_config.rs new file mode 100644 index 00000000..e5f7ad52 --- /dev/null +++ b/src/runtime/output/protocol/pulsar/producer_config.rs @@ -0,0 +1,31 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +/// PulsarProducerConfig - Pulsar producer configuration +#[derive(Debug, Clone)] +pub struct PulsarProducerConfig { + pub url: String, + pub topic: String, + pub properties: HashMap, +} + +impl PulsarProducerConfig { + pub fn new(url: String, topic: String, properties: HashMap) -> Self { + Self { + url, + topic, + properties, + } + } +} diff --git a/src/runtime/output/protocol/pulsar/pulsar_protocol.rs b/src/runtime/output/protocol/pulsar/pulsar_protocol.rs new file mode 100644 index 00000000..2474fc0f --- /dev/null +++ b/src/runtime/output/protocol/pulsar/pulsar_protocol.rs @@ -0,0 +1,110 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::producer_config::PulsarProducerConfig; +use crate::runtime::buffer_and_event::BufferOrEvent; +use crate::runtime::output::output_protocol::OutputProtocol; +use pulsar::{Producer, Pulsar, TokioExecutor}; +use std::cell::RefCell; + +thread_local! { + static PULSAR_OUT_RT: RefCell> = const { RefCell::new(None) }; + static PULSAR_PRODUCER: RefCell>> = const { RefCell::new(None) }; +} + +pub struct PulsarOutputProtocol { + config: PulsarProducerConfig, +} + +impl PulsarOutputProtocol { + pub fn new(config: PulsarProducerConfig) -> Self { + Self { config } + } +} + +impl OutputProtocol for PulsarOutputProtocol { + fn name(&self) -> String { + format!("pulsar-{}", self.config.topic) + } + + fn init(&self) -> Result<(), Box> { + Ok(()) + } + + fn send(&self, data: BufferOrEvent) -> Result<(), Box> { + if let Some(payload) = data.into_buffer() { + PULSAR_OUT_RT.with(|rt_cell| { + PULSAR_PRODUCER.with( + |producer_cell| -> Result<(), Box> { + let mut rt_opt = rt_cell.borrow_mut(); + let mut producer_opt = producer_cell.borrow_mut(); + + if producer_opt.is_none() { + let rt = tokio::runtime::Runtime::new().map_err(|e| { + Box::new(std::io::Error::other(e)) + as Box + })?; + let url = self.config.url.clone(); + let topic = self.config.topic.clone(); + + let producer: Producer = rt + .block_on(async { + let pulsar = + Pulsar::builder(&url, TokioExecutor).build().await?; + let producer = + pulsar.producer().with_topic(&topic).build().await?; + Result::<_, pulsar::Error>::Ok(producer) + }) + .map_err(|e| { + Box::new(std::io::Error::other(e)) + as Box + })?; + + *rt_opt = Some(rt); + *producer_opt = Some(producer); + } + + let rt = rt_opt.as_ref().unwrap(); + let producer = producer_opt.as_mut().unwrap(); + + rt.block_on(async { + producer + .create_message() + .with_content(payload) + .send_non_blocking() + .await + .map_err(|e| { + Box::new(std::io::Error::other(e)) + as Box + }) + })?; + Ok(()) + }, + ) + })?; + } + Ok(()) + } + + fn flush(&self) -> Result<(), Box> { + PULSAR_OUT_RT.with(|rt_cell| { + PULSAR_PRODUCER.with(|producer_cell| { + let rt_opt = rt_cell.borrow(); + let mut producer_opt = producer_cell.borrow_mut(); + if let (Some(rt), Some(producer)) = (rt_opt.as_ref(), producer_opt.as_mut()) { + let _ = rt.block_on(producer.send_batch()); + } + }); + }); + Ok(()) + } +} diff --git a/src/runtime/task/processor_config.rs b/src/runtime/task/processor_config.rs index fe515647..7230926a 100644 --- a/src/runtime/task/processor_config.rs +++ b/src/runtime/task/processor_config.rs @@ -144,6 +144,17 @@ pub enum InputConfig { #[serde(default)] runtime: InputRuntimeConfig, }, + Pulsar { + url: String, + topic: String, + subscription: String, + #[serde(default)] + subscription_type: Option, + #[serde(flatten)] + extra: HashMap, + #[serde(default)] + runtime: InputRuntimeConfig, + }, } impl InputConfig { @@ -172,12 +183,14 @@ impl InputConfig { pub fn input_type(&self) -> &'static str { match self { InputConfig::Kafka { .. } => "kafka", + InputConfig::Pulsar { .. } => "pulsar", } } pub fn input_runtime_config(&self) -> InputRuntimeConfig { match self { InputConfig::Kafka { runtime, .. } => runtime.clone(), + InputConfig::Pulsar { runtime, .. } => runtime.clone(), } } } @@ -520,6 +533,14 @@ pub enum OutputConfig { #[serde(default)] runtime: OutputRuntimeConfig, }, + Pulsar { + url: String, + topic: String, + #[serde(flatten)] + extra: HashMap, + #[serde(default)] + runtime: OutputRuntimeConfig, + }, } impl OutputConfig { @@ -547,12 +568,14 @@ impl OutputConfig { pub fn output_type(&self) -> &'static str { match self { OutputConfig::Kafka { .. } => "kafka", + OutputConfig::Pulsar { .. } => "pulsar", } } pub fn output_runtime_config(&self) -> OutputRuntimeConfig { match self { OutputConfig::Kafka { runtime, .. } => runtime.clone(), + OutputConfig::Pulsar { runtime, .. } => runtime.clone(), } } }