feat(mq): add NATS input and output support#215
feat(mq): add NATS input and output support#215luoluoyuyu wants to merge 5 commits intoFunctionStream:mainfrom
Conversation
Made-with: Cursor
|
You have run out of free Bugbot PR reviews for this billing cycle. This will reset on April 21. To receive reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial. |
|
@cursor review |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
|
|
||
| match sub.next_timeout(timeout) { | ||
| Ok(msg) => { | ||
| let payload = msg.data.to_vec(); |
There was a problem hiding this comment.
Unnecessary payload copy via to_vec() on owned Vec
Medium Severity
msg.data is already an owned Vec<u8> (since msg is consumed by value from next_timeout), but .to_vec() is called on it, which goes through Deref to [u8] and allocates a brand-new Vec with a full copy of the payload. Using msg.data directly would avoid this unnecessary allocation and copy for every single message received, which matters in a high-throughput messaging system.
|
|
||
| use super::producer_config::NatsProducerConfig; | ||
| use crate::runtime::buffer_and_event::BufferOrEvent; | ||
| use crate::runtime::input::protocol::nats::options; |
There was a problem hiding this comment.
Shared options module misplaced under input protocol directory
Low Severity
The options module (containing nats_connect and build_nats_options) lives under src/runtime/input/protocol/nats/ but is imported by the output protocol via crate::runtime::input::protocol::nats::options. This creates an architectural coupling where the output module depends on the input module's internals. The module's own doc comment acknowledges it's "used by both NATS input and output protocols," indicating it belongs in a shared location.
Additional Locations (1)
There was a problem hiding this comment.
Pull request overview
Adds NATS as a new messaging transport in the runtime, enabling tasks to ingest from and publish to NATS via input-type: nats / output-type: nats, with shared auth/TLS option mapping via an extra properties map.
Changes:
- Extend task YAML config enums to support NATS inputs/outputs (including optional queue group for inputs).
- Implement NATS input/output protocols and provider wiring to instantiate the appropriate runners.
- Add the
natscrate dependency (and lockfile updates for transitive deps).
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| src/runtime/task/processor_config.rs | Adds InputConfig::Nats and OutputConfig::Nats variants for YAML-driven configuration. |
| src/runtime/input/input_provider.rs | Constructs NatsProtocol runners from InputConfig::Nats. |
| src/runtime/input/protocol/mod.rs | Exposes the new nats input protocol module. |
| src/runtime/input/protocol/nats/mod.rs | Declares NATS input protocol submodules and re-exports types. |
| src/runtime/input/protocol/nats/config.rs | Defines NatsConfig for input-side configuration. |
| src/runtime/input/protocol/nats/options.rs | Builds nats::Options from extra properties and provides a connect helper. |
| src/runtime/input/protocol/nats/nats_protocol.rs | Implements the NATS input protocol (InputProtocol). |
| src/runtime/output/output_provider.rs | Constructs NatsOutputProtocol runners from OutputConfig::Nats. |
| src/runtime/output/protocol/mod.rs | Exposes the new nats output protocol module. |
| src/runtime/output/protocol/nats/mod.rs | Declares NATS output protocol submodules and re-exports types. |
| src/runtime/output/protocol/nats/producer_config.rs | Defines NatsProducerConfig for output-side configuration. |
| src/runtime/output/protocol/nats/nats_protocol.rs | Implements the NATS output protocol (OutputProtocol). |
| Cargo.toml | Adds the nats dependency. |
| Cargo.lock | Locks transitive dependencies introduced by nats. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| if url.is_empty() { | ||
| return Err(Box::new(std::io::Error::new( | ||
| std::io::ErrorKind::InvalidData, | ||
| format!( | ||
| "Invalid nats url in input config (group #{}): empty (subject: {})", | ||
| group_idx + 1, | ||
| subject | ||
| ), | ||
| )) as Box<dyn std::error::Error + Send>); | ||
| } |
| if url.is_empty() { | ||
| return Err(Box::new(std::io::Error::new( | ||
| std::io::ErrorKind::InvalidData, | ||
| format!( | ||
| "Invalid nats url in output config: empty (subject: {})", | ||
| subject | ||
| ), | ||
| )) as Box<dyn std::error::Error + Send>); | ||
| } |
| Nats { | ||
| url: String, | ||
| subject: String, | ||
| #[serde(default)] | ||
| queue_group: Option<String>, | ||
| #[serde(flatten)] | ||
| extra: HashMap<String, String>, | ||
| #[serde(default)] | ||
| runtime: InputRuntimeConfig, | ||
| }, |
| use super::producer_config::NatsProducerConfig; | ||
| use crate::runtime::buffer_and_event::BufferOrEvent; | ||
| use crate::runtime::input::protocol::nats::options; | ||
| use crate::runtime::output::output_protocol::OutputProtocol; |


Note
Medium Risk
Introduces a new network messaging integration (NATS) and TLS/auth option handling, which can affect connectivity/security configuration despite being mostly additive.
Overview
Adds NATS transport support for both ingestion and publishing.
The task YAML config now accepts
input-type: natsandoutput-type: nats(including optionalqueue_groupfor inputs), and the input/output providers construct newNatsProtocol/NatsOutputProtocolrunners with basic URL validation. A shared NATS options builder was added to mapextraproperties intonats::Options(token/user+pass/creds, TLS flags and cert paths, connection name), and the dependency graph is updated to include thenatscrate and its transitive crypto/TLS deps.Written by Cursor Bugbot for commit 6a7fb15. This will update automatically on new commits. Configure here.