Skip to content

feat(mq): add RocksMQ input/output stub (config + protocol placeholder)#217

Closed
luoluoyuyu wants to merge 1 commit intoFunctionStream:mainfrom
luoluoyuyu:feature/rocksmq-mq
Closed

feat(mq): add RocksMQ input/output stub (config + protocol placeholder)#217
luoluoyuyu wants to merge 1 commit intoFunctionStream:mainfrom
luoluoyuyu:feature/rocksmq-mq

Conversation

@luoluoyuyu
Copy link
Collaborator

@luoluoyuyu luoluoyuyu commented Mar 11, 2026

Note

Low Risk
Primarily additive changes gated behind a new config type; existing Kafka paths are untouched, but selecting rocksmq will currently drop/ignore messages due to stubbed poll/send behavior.

Overview
Adds a new rocksmq input/output type to task configuration and wiring, so YAML can specify InputConfig::RocksMQ / OutputConfig::RocksMQ and providers construct corresponding runners.

Introduces placeholder RocksMQ protocol modules and config structs for input and output; the implementations are explicitly stubs (poll() always returns Ok(None), send() is a no-op) intended to be replaced once a real RocksMQ client is available.

Written by Cursor Bugbot for commit e8bb938. This will update automatically on new commits. Configure here.

@luoluoyuyu
Copy link
Collaborator Author

@cursor review

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

pub fn input_type(&self) -> &'static str {
match self {
InputConfig::Kafka { .. } => "kafka",
InputConfig::RocksMQ { .. } => "rocksmq",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serde tag and input_type/output_type value mismatch

High Severity

The RocksMQ variant on an enum with #[serde(rename_all = "kebab-case")] produces a serde tag value like "rocks-m-q" (serde inserts a hyphen before each uppercase letter). But input_type() returns "rocksmq" and output_type() returns "rocksmq". The validation in InputGroup::from_yaml_value and WasmTaskConfig::from_yaml_value compares the raw YAML tag against these method return values, so it will always fail with a type mismatch error for RocksMQ configs. Either add #[serde(rename = "rocksmq")] to the RocksMQ variants, or change the return values of input_type() and output_type() to match the serde-generated tag.

Additional Locations (1)
Fix in Cursor Fix in Web

@luoluoyuyu luoluoyuyu closed this Mar 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant