Skip to content

feat(mq): add NATS input and output support#215

Open
luoluoyuyu wants to merge 5 commits intoFunctionStream:mainfrom
luoluoyuyu:feature/nats-mq
Open

feat(mq): add NATS input and output support#215
luoluoyuyu wants to merge 5 commits intoFunctionStream:mainfrom
luoluoyuyu:feature/nats-mq

Conversation

@luoluoyuyu
Copy link
Collaborator

@luoluoyuyu luoluoyuyu commented Mar 11, 2026

Note

Medium Risk
Introduces a new network messaging integration (NATS) and TLS/auth option handling, which can affect connectivity/security configuration despite being mostly additive.

Overview
Adds NATS transport support for both ingestion and publishing.

The task YAML config now accepts input-type: nats and output-type: nats (including optional queue_group for inputs), and the input/output providers construct new NatsProtocol/NatsOutputProtocol runners with basic URL validation. A shared NATS options builder was added to map extra properties into nats::Options (token/user+pass/creds, TLS flags and cert paths, connection name), and the dependency graph is updated to include the nats crate and its transitive crypto/TLS deps.

Written by Cursor Bugbot for commit 6a7fb15. This will update automatically on new commits. Configure here.

@cursor
Copy link

cursor bot commented Mar 11, 2026

You have run out of free Bugbot PR reviews for this billing cycle. This will reset on April 21.

To receive reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

@luoluoyuyu
Copy link
Collaborator Author

@cursor review

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.


match sub.next_timeout(timeout) {
Ok(msg) => {
let payload = msg.data.to_vec();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary payload copy via to_vec() on owned Vec

Medium Severity

msg.data is already an owned Vec<u8> (since msg is consumed by value from next_timeout), but .to_vec() is called on it, which goes through Deref to [u8] and allocates a brand-new Vec with a full copy of the payload. Using msg.data directly would avoid this unnecessary allocation and copy for every single message received, which matters in a high-throughput messaging system.

Fix in Cursor Fix in Web


use super::producer_config::NatsProducerConfig;
use crate::runtime::buffer_and_event::BufferOrEvent;
use crate::runtime::input::protocol::nats::options;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shared options module misplaced under input protocol directory

Low Severity

The options module (containing nats_connect and build_nats_options) lives under src/runtime/input/protocol/nats/ but is imported by the output protocol via crate::runtime::input::protocol::nats::options. This creates an architectural coupling where the output module depends on the input module's internals. The module's own doc comment acknowledges it's "used by both NATS input and output protocols," indicating it belongs in a shared location.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds NATS as a new messaging transport in the runtime, enabling tasks to ingest from and publish to NATS via input-type: nats / output-type: nats, with shared auth/TLS option mapping via an extra properties map.

Changes:

  • Extend task YAML config enums to support NATS inputs/outputs (including optional queue group for inputs).
  • Implement NATS input/output protocols and provider wiring to instantiate the appropriate runners.
  • Add the nats crate dependency (and lockfile updates for transitive deps).

Reviewed changes

Copilot reviewed 13 out of 14 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/runtime/task/processor_config.rs Adds InputConfig::Nats and OutputConfig::Nats variants for YAML-driven configuration.
src/runtime/input/input_provider.rs Constructs NatsProtocol runners from InputConfig::Nats.
src/runtime/input/protocol/mod.rs Exposes the new nats input protocol module.
src/runtime/input/protocol/nats/mod.rs Declares NATS input protocol submodules and re-exports types.
src/runtime/input/protocol/nats/config.rs Defines NatsConfig for input-side configuration.
src/runtime/input/protocol/nats/options.rs Builds nats::Options from extra properties and provides a connect helper.
src/runtime/input/protocol/nats/nats_protocol.rs Implements the NATS input protocol (InputProtocol).
src/runtime/output/output_provider.rs Constructs NatsOutputProtocol runners from OutputConfig::Nats.
src/runtime/output/protocol/mod.rs Exposes the new nats output protocol module.
src/runtime/output/protocol/nats/mod.rs Declares NATS output protocol submodules and re-exports types.
src/runtime/output/protocol/nats/producer_config.rs Defines NatsProducerConfig for output-side configuration.
src/runtime/output/protocol/nats/nats_protocol.rs Implements the NATS output protocol (OutputProtocol).
Cargo.toml Adds the nats dependency.
Cargo.lock Locks transitive dependencies introduced by nats.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +118 to +127
if url.is_empty() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Invalid nats url in input config (group #{}): empty (subject: {})",
group_idx + 1,
subject
),
)) as Box<dyn std::error::Error + Send>);
}
Comment on lines +102 to +110
if url.is_empty() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Invalid nats url in output config: empty (subject: {})",
subject
),
)) as Box<dyn std::error::Error + Send>);
}
Comment on lines +147 to +156
Nats {
url: String,
subject: String,
#[serde(default)]
queue_group: Option<String>,
#[serde(flatten)]
extra: HashMap<String, String>,
#[serde(default)]
runtime: InputRuntimeConfig,
},
Comment on lines +13 to +16
use super::producer_config::NatsProducerConfig;
use crate::runtime::buffer_and_event::BufferOrEvent;
use crate::runtime::input::protocol::nats::options;
use crate::runtime::output::output_protocol::OutputProtocol;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants