Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/runtime/input/input_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,30 @@ impl InputProvider {
runtime,
)))
}
InputConfig::RocksMQ {
path,
topic,
consumer_id,
extra,
runtime: _,
} => {
use crate::runtime::input::InputRunner;
use crate::runtime::input::protocol::rocksmq::{RocksMQConfig, RocksMQProtocol};

let rocksmq_config = RocksMQConfig::new(
path.clone(),
topic.clone(),
consumer_id.clone(),
extra.clone(),
);
let runtime = input_config.input_runtime_config();
Ok(Box::new(InputRunner::new(
RocksMQProtocol::new(rocksmq_config),
group_idx,
input_idx,
runtime,
)))
}
}
}
}
1 change: 1 addition & 0 deletions src/runtime/input/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
// limitations under the License.

pub mod kafka;
pub mod rocksmq;
43 changes: 43 additions & 0 deletions src/runtime/input/protocol/rocksmq/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! RocksMQ input config.
//!
//! RocksMQ is an embedded message queue (e.g. used by Milvus). There is no
//! standalone Rust client at present; this is a stub. Replace with a real
//! implementation when a client (e.g. Milvus gRPC or native crate) is available.

use std::collections::HashMap;

#[derive(Debug, Clone)]
pub struct RocksMQConfig {
pub path: String,
pub topic: String,
pub consumer_id: Option<String>,
pub properties: HashMap<String, String>,
}

impl RocksMQConfig {
pub fn new(
path: String,
topic: String,
consumer_id: Option<String>,
properties: HashMap<String, String>,
) -> Self {
Self {
path,
topic,
consumer_id,
properties,
}
}
}
17 changes: 17 additions & 0 deletions src/runtime/input/protocol/rocksmq/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod config;
pub mod rocksmq_protocol;

pub use config::RocksMQConfig;
pub use rocksmq_protocol::RocksMQProtocol;
51 changes: 51 additions & 0 deletions src/runtime/input/protocol/rocksmq/rocksmq_protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! RocksMQ input protocol (stub).
//!
//! RocksMQ (e.g. Milvus) has no official Rust client. This stub implements
//! InputProtocol but poll() always returns Ok(None). Replace with a real
//! consumer when a client is available (e.g. Milvus gRPC or future crate).

use super::config::RocksMQConfig;
use crate::runtime::buffer_and_event::BufferOrEvent;
use crate::runtime::input::input_protocol::InputProtocol;
use std::time::Duration;

pub struct RocksMQProtocol {
config: RocksMQConfig,
}

impl RocksMQProtocol {
pub fn new(config: RocksMQConfig) -> Self {
Self { config }
}
}

impl InputProtocol for RocksMQProtocol {
fn name(&self) -> String {
format!("rocksmq-{}", self.config.topic)
}

fn init(&self) -> Result<(), Box<dyn std::error::Error + Send>> {
// Stub: no-op until a real RocksMQ client is integrated.
Ok(())
}

fn poll(
&self,
_timeout: Duration,
) -> Result<Option<BufferOrEvent>, Box<dyn std::error::Error + Send>> {
// Stub: always no message. Replace with real RocksMQ consumer when available.
Ok(None)
}
}
17 changes: 17 additions & 0 deletions src/runtime/output/output_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,23 @@ impl OutputProvider {
let runtime = output_config.output_runtime_config();
Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime)))
}
OutputConfig::RocksMQ {
path,
topic,
extra,
runtime: _,
} => {
use crate::runtime::output::output_runner::OutputRunner;
use crate::runtime::output::protocol::rocksmq::{
RocksMQOutputProtocol, RocksMQProducerConfig,
};

let rocksmq_config =
RocksMQProducerConfig::new(path.clone(), topic.clone(), extra.clone());
let protocol = RocksMQOutputProtocol::new(rocksmq_config);
let runtime = output_config.output_runtime_config();
Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime)))
}
}
}
}
1 change: 1 addition & 0 deletions src/runtime/output/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
// Provides implementations of various output protocols

pub mod kafka;
pub mod rocksmq;
17 changes: 17 additions & 0 deletions src/runtime/output/protocol/rocksmq/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod producer_config;
pub mod rocksmq_protocol;

pub use producer_config::RocksMQProducerConfig;
pub use rocksmq_protocol::RocksMQOutputProtocol;
32 changes: 32 additions & 0 deletions src/runtime/output/protocol/rocksmq/producer_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! RocksMQ producer config (stub). Replace with real client config when available.

use std::collections::HashMap;

#[derive(Debug, Clone)]
pub struct RocksMQProducerConfig {
pub path: String,
pub topic: String,
pub properties: HashMap<String, String>,
}

impl RocksMQProducerConfig {
pub fn new(path: String, topic: String, properties: HashMap<String, String>) -> Self {
Self {
path,
topic,
properties,
}
}
}
49 changes: 49 additions & 0 deletions src/runtime/output/protocol/rocksmq/rocksmq_protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! RocksMQ output protocol (stub).
//!
//! RocksMQ has no official Rust client. This stub implements OutputProtocol
//! but send() is a no-op. Replace with a real producer when a client is available.

use super::producer_config::RocksMQProducerConfig;
use crate::runtime::buffer_and_event::BufferOrEvent;
use crate::runtime::output::output_protocol::OutputProtocol;

pub struct RocksMQOutputProtocol {
config: RocksMQProducerConfig,
}

impl RocksMQOutputProtocol {
pub fn new(config: RocksMQProducerConfig) -> Self {
Self { config }
}
}

impl OutputProtocol for RocksMQOutputProtocol {
fn name(&self) -> String {
format!("rocksmq-{}", self.config.topic)
}

fn init(&self) -> Result<(), Box<dyn std::error::Error + Send>> {
Ok(())
}

fn send(&self, _data: BufferOrEvent) -> Result<(), Box<dyn std::error::Error + Send>> {
// Stub: no-op until a real RocksMQ producer is integrated.
Ok(())
}

fn flush(&self) -> Result<(), Box<dyn std::error::Error + Send>> {
Ok(())
}
}
24 changes: 24 additions & 0 deletions src/runtime/task/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ pub enum InputConfig {
#[serde(default)]
runtime: InputRuntimeConfig,
},
/// RocksMQ (e.g. Milvus embedded MQ). Stub implementation; requires a real client when available.
RocksMQ {
path: String,
topic: String,
#[serde(default)]
consumer_id: Option<String>,
#[serde(flatten)]
extra: HashMap<String, String>,
#[serde(default)]
runtime: InputRuntimeConfig,
},
}

impl InputConfig {
Expand Down Expand Up @@ -172,12 +183,14 @@ impl InputConfig {
pub fn input_type(&self) -> &'static str {
match self {
InputConfig::Kafka { .. } => "kafka",
InputConfig::RocksMQ { .. } => "rocksmq",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serde tag and input_type/output_type value mismatch

High Severity

The RocksMQ variant on an enum with #[serde(rename_all = "kebab-case")] produces a serde tag value like "rocks-m-q" (serde inserts a hyphen before each uppercase letter). But input_type() returns "rocksmq" and output_type() returns "rocksmq". The validation in InputGroup::from_yaml_value and WasmTaskConfig::from_yaml_value compares the raw YAML tag against these method return values, so it will always fail with a type mismatch error for RocksMQ configs. Either add #[serde(rename = "rocksmq")] to the RocksMQ variants, or change the return values of input_type() and output_type() to match the serde-generated tag.

Additional Locations (1)
Fix in Cursor Fix in Web

}
}

pub fn input_runtime_config(&self) -> InputRuntimeConfig {
match self {
InputConfig::Kafka { runtime, .. } => runtime.clone(),
InputConfig::RocksMQ { runtime, .. } => runtime.clone(),
}
}
}
Expand Down Expand Up @@ -520,6 +533,15 @@ pub enum OutputConfig {
#[serde(default)]
runtime: OutputRuntimeConfig,
},
/// RocksMQ (e.g. Milvus embedded MQ). Stub implementation; requires a real client when available.
RocksMQ {
path: String,
topic: String,
#[serde(flatten)]
extra: HashMap<String, String>,
#[serde(default)]
runtime: OutputRuntimeConfig,
},
}

impl OutputConfig {
Expand Down Expand Up @@ -547,12 +569,14 @@ impl OutputConfig {
pub fn output_type(&self) -> &'static str {
match self {
OutputConfig::Kafka { .. } => "kafka",
OutputConfig::RocksMQ { .. } => "rocksmq",
}
}

pub fn output_runtime_config(&self) -> OutputRuntimeConfig {
match self {
OutputConfig::Kafka { runtime, .. } => runtime.clone(),
OutputConfig::RocksMQ { runtime, .. } => runtime.clone(),
}
}
}
Expand Down
Loading