Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
641 changes: 611 additions & 30 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async-trait = "0.1"
num_cpus = "1.0"
protocol = { path = "./protocol" }
rdkafka = { version = "0.38", features = ["cmake-build", "ssl", "gssapi"] }
nats = "0.18"
crossbeam-channel = "0.5"
pest = "2.7"
pest_derive = "2.7"
Expand Down
35 changes: 35 additions & 0 deletions src/runtime/input/input_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,41 @@ impl InputProvider {
runtime,
)))
}
InputConfig::Nats {
url,
subject,
queue_group,
extra,
runtime: _,
} => {
use crate::runtime::input::InputRunner;
use crate::runtime::input::protocol::nats::{NatsConfig, NatsProtocol};

if url.is_empty() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Invalid nats url in input config (group #{}): empty (subject: {})",
group_idx + 1,
subject
),
)) as Box<dyn std::error::Error + Send>);
}
Comment on lines +118 to +127

let nats_config = NatsConfig::new(
url.clone(),
subject.clone(),
queue_group.clone(),
extra.clone(),
);
let runtime = input_config.input_runtime_config();
Ok(Box::new(InputRunner::new(
NatsProtocol::new(nats_config),
group_idx,
input_idx,
runtime,
)))
}
}
}
}
1 change: 1 addition & 0 deletions src/runtime/input/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
// limitations under the License.

pub mod kafka;
pub mod nats;
42 changes: 42 additions & 0 deletions src/runtime/input/protocol/nats/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

/// NatsConfig - NATS consumer configuration
#[derive(Debug, Clone)]
pub struct NatsConfig {
/// NATS server URL(s), comma-separated
pub url: String,
/// Subject to subscribe to
pub subject: String,
/// Optional queue group for load balancing
pub queue_group: Option<String>,
/// Extra options (e.g. token, user, pass)
pub properties: HashMap<String, String>,
}

impl NatsConfig {
pub fn new(
url: String,
subject: String,
queue_group: Option<String>,
properties: HashMap<String, String>,
) -> Self {
Self {
url,
subject,
queue_group,
properties,
}
}
}
18 changes: 18 additions & 0 deletions src/runtime/input/protocol/nats/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod config;
pub mod nats_protocol;
pub mod options;

pub use config::NatsConfig;
pub use nats_protocol::NatsProtocol;
83 changes: 83 additions & 0 deletions src/runtime/input/protocol/nats/nats_protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::config::NatsConfig;
use super::options;
use crate::runtime::buffer_and_event::BufferOrEvent;
use crate::runtime::input::input_protocol::InputProtocol;
use std::sync::OnceLock;
use std::time::Duration;

pub struct NatsProtocol {
config: NatsConfig,
subscription: OnceLock<nats::Subscription>,
}

impl NatsProtocol {
pub fn new(config: NatsConfig) -> Self {
Self {
config,
subscription: OnceLock::new(),
}
}
}

impl InputProtocol for NatsProtocol {
fn name(&self) -> String {
format!("nats-{}", self.config.subject)
}

fn init(&self) -> Result<(), Box<dyn std::error::Error + Send>> {
let nc = options::nats_connect(&self.config.url, &self.config.properties)
.map_err(|e| Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>)?;

let sub = if let Some(q) = &self.config.queue_group {
nc.queue_subscribe(&self.config.subject, q).map_err(|e| {
Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>
})?
} else {
nc.subscribe(&self.config.subject).map_err(|e| {
Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>
})?
};

self.subscription.set(sub).map_err(|_| {
Box::new(std::io::Error::other("NATS subscription already init"))
as Box<dyn std::error::Error + Send>
})?;
Ok(())
}

fn poll(
&self,
timeout: Duration,
) -> Result<Option<BufferOrEvent>, Box<dyn std::error::Error + Send>> {
let sub = self.subscription.get().ok_or_else(|| {
Box::new(std::io::Error::other("NATS subscription not init"))
as Box<dyn std::error::Error + Send>
})?;

match sub.next_timeout(timeout) {
Ok(msg) => {
let payload = msg.data.to_vec();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary payload copy via to_vec() on owned Vec

Medium Severity

msg.data is already an owned Vec<u8> (since msg is consumed by value from next_timeout), but .to_vec() is called on it, which goes through Deref to [u8] and allocates a brand-new Vec with a full copy of the payload. Using msg.data directly would avoid this unnecessary allocation and copy for every single message received, which matters in a high-throughput messaging system.

Fix in Cursor Fix in Web

Ok(Some(BufferOrEvent::new_buffer(
payload,
Some(self.config.subject.clone()),
false,
false,
)))
}
Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => Ok(None),
Err(e) => Err(Box::new(e) as Box<dyn std::error::Error + Send>),
}
}
}
95 changes: 95 additions & 0 deletions src/runtime/input/protocol/nats/options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Build `nats::Options` from URL and properties map.
//! Used by both NATS input and output protocols so that auth and other options
//! are not silently ignored.

use std::collections::HashMap;
use std::path::Path;

/// Get property by key, case-insensitive. Prefer exact match, then lowercase.
fn get_prop<'a>(props: &'a HashMap<String, String>, key: &str) -> Option<&'a str> {
props.get(key).map(String::as_str).or_else(|| {
let key_lower = key.to_lowercase();
props
.iter()
.find(|(k, _)| k.to_lowercase() == key_lower)
.map(|(_, v)| v.as_str())
})
}

/// Build NATS connection options from properties.
///
/// Supported properties (case-insensitive):
/// - `token`: auth token
/// - `user` / `username`: username (with `password` or `pass`)
/// - `password` / `pass`: password
/// - `name`: client connection name
/// - `tls_required`: "true" / "false"
/// - `client_cert`: path to client cert (with `client_key`)
/// - `client_key`: path to client key
/// - `credentials` / `creds`: path to .creds file
/// - `root_certificate` / `tls_ca`: path to root CA PEM
pub fn build_nats_options(properties: &HashMap<String, String>) -> nats::Options {
let opts = if let Some(token) = get_prop(properties, "token") {
nats::Options::with_token(token)
} else if let (Some(u), Some(p)) = (
get_prop(properties, "user").or_else(|| get_prop(properties, "username")),
get_prop(properties, "password").or_else(|| get_prop(properties, "pass")),
) {
nats::Options::with_user_pass(u, p)
} else if let Some(creds) =
get_prop(properties, "credentials").or_else(|| get_prop(properties, "creds"))
{
nats::Options::with_credentials(Path::new(creds))
} else {
nats::Options::new()
};

let opts = if let Some(name) = get_prop(properties, "name") {
opts.with_name(name)
} else {
opts
};

let opts = if let Some(s) = get_prop(properties, "tls_required") {
opts.tls_required(s.eq_ignore_ascii_case("true") || s.eq_ignore_ascii_case("1"))
} else {
opts
};

let opts = if let (Some(cert), Some(key)) = (
get_prop(properties, "client_cert"),
get_prop(properties, "client_key"),
) {
opts.client_cert(Path::new(cert), Path::new(key))
} else {
opts
};

if let Some(ca) =
get_prop(properties, "root_certificate").or_else(|| get_prop(properties, "tls_ca"))
{
opts.add_root_certificate(Path::new(ca))
} else {
opts
}
}

/// Connect to NATS using URL and properties (auth, name, TLS, etc.).
pub fn nats_connect(
url: &str,
properties: &HashMap<String, String>,
) -> std::io::Result<nats::Connection> {
build_nats_options(properties).connect(url)
}
27 changes: 27 additions & 0 deletions src/runtime/output/output_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,33 @@ impl OutputProvider {
let runtime = output_config.output_runtime_config();
Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime)))
}
OutputConfig::Nats {
url,
subject,
extra,
runtime: _,
} => {
use crate::runtime::output::output_runner::OutputRunner;
use crate::runtime::output::protocol::nats::{
NatsOutputProtocol, NatsProducerConfig,
};

if url.is_empty() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Invalid nats url in output config: empty (subject: {})",
subject
),
)) as Box<dyn std::error::Error + Send>);
}
Comment on lines +102 to +110

let nats_config =
NatsProducerConfig::new(url.clone(), subject.clone(), extra.clone());
let protocol = NatsOutputProtocol::new(nats_config);
let runtime = output_config.output_runtime_config();
Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime)))
}
}
}
}
1 change: 1 addition & 0 deletions src/runtime/output/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
// Provides implementations of various output protocols

pub mod kafka;
pub mod nats;
17 changes: 17 additions & 0 deletions src/runtime/output/protocol/nats/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod nats_protocol;
pub mod producer_config;

pub use nats_protocol::NatsOutputProtocol;
pub use producer_config::NatsProducerConfig;
Loading
Loading