diff --git a/src/runtime/input/input_provider.rs b/src/runtime/input/input_provider.rs index 3f6606cd..f015182c 100644 --- a/src/runtime/input/input_provider.rs +++ b/src/runtime/input/input_provider.rs @@ -105,6 +105,30 @@ impl InputProvider { runtime, ))) } + InputConfig::RocksMQ { + path, + topic, + consumer_id, + extra, + runtime: _, + } => { + use crate::runtime::input::InputRunner; + use crate::runtime::input::protocol::rocksmq::{RocksMQConfig, RocksMQProtocol}; + + let rocksmq_config = RocksMQConfig::new( + path.clone(), + topic.clone(), + consumer_id.clone(), + extra.clone(), + ); + let runtime = input_config.input_runtime_config(); + Ok(Box::new(InputRunner::new( + RocksMQProtocol::new(rocksmq_config), + group_idx, + input_idx, + runtime, + ))) + } } } } diff --git a/src/runtime/input/protocol/mod.rs b/src/runtime/input/protocol/mod.rs index b9574391..75cecc28 100644 --- a/src/runtime/input/protocol/mod.rs +++ b/src/runtime/input/protocol/mod.rs @@ -11,3 +11,4 @@ // limitations under the License. pub mod kafka; +pub mod rocksmq; diff --git a/src/runtime/input/protocol/rocksmq/config.rs b/src/runtime/input/protocol/rocksmq/config.rs new file mode 100644 index 00000000..1c6c7591 --- /dev/null +++ b/src/runtime/input/protocol/rocksmq/config.rs @@ -0,0 +1,43 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! RocksMQ input config. +//! +//! RocksMQ is an embedded message queue (e.g. used by Milvus). There is no +//! standalone Rust client at present; this is a stub. Replace with a real +//! implementation when a client (e.g. Milvus gRPC or native crate) is available. + +use std::collections::HashMap; + +#[derive(Debug, Clone)] +pub struct RocksMQConfig { + pub path: String, + pub topic: String, + pub consumer_id: Option, + pub properties: HashMap, +} + +impl RocksMQConfig { + pub fn new( + path: String, + topic: String, + consumer_id: Option, + properties: HashMap, + ) -> Self { + Self { + path, + topic, + consumer_id, + properties, + } + } +} diff --git a/src/runtime/input/protocol/rocksmq/mod.rs b/src/runtime/input/protocol/rocksmq/mod.rs new file mode 100644 index 00000000..e6b77be6 --- /dev/null +++ b/src/runtime/input/protocol/rocksmq/mod.rs @@ -0,0 +1,17 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod config; +pub mod rocksmq_protocol; + +pub use config::RocksMQConfig; +pub use rocksmq_protocol::RocksMQProtocol; diff --git a/src/runtime/input/protocol/rocksmq/rocksmq_protocol.rs b/src/runtime/input/protocol/rocksmq/rocksmq_protocol.rs new file mode 100644 index 00000000..57916f86 --- /dev/null +++ b/src/runtime/input/protocol/rocksmq/rocksmq_protocol.rs @@ -0,0 +1,51 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! RocksMQ input protocol (stub). +//! +//! RocksMQ (e.g. Milvus) has no official Rust client. This stub implements +//! InputProtocol but poll() always returns Ok(None). Replace with a real +//! consumer when a client is available (e.g. Milvus gRPC or future crate). + +use super::config::RocksMQConfig; +use crate::runtime::buffer_and_event::BufferOrEvent; +use crate::runtime::input::input_protocol::InputProtocol; +use std::time::Duration; + +pub struct RocksMQProtocol { + config: RocksMQConfig, +} + +impl RocksMQProtocol { + pub fn new(config: RocksMQConfig) -> Self { + Self { config } + } +} + +impl InputProtocol for RocksMQProtocol { + fn name(&self) -> String { + format!("rocksmq-{}", self.config.topic) + } + + fn init(&self) -> Result<(), Box> { + // Stub: no-op until a real RocksMQ client is integrated. + Ok(()) + } + + fn poll( + &self, + _timeout: Duration, + ) -> Result, Box> { + // Stub: always no message. Replace with real RocksMQ consumer when available. + Ok(None) + } +} diff --git a/src/runtime/output/output_provider.rs b/src/runtime/output/output_provider.rs index c6d01fef..49b49eb0 100644 --- a/src/runtime/output/output_provider.rs +++ b/src/runtime/output/output_provider.rs @@ -88,6 +88,23 @@ impl OutputProvider { let runtime = output_config.output_runtime_config(); Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime))) } + OutputConfig::RocksMQ { + path, + topic, + extra, + runtime: _, + } => { + use crate::runtime::output::output_runner::OutputRunner; + use crate::runtime::output::protocol::rocksmq::{ + RocksMQOutputProtocol, RocksMQProducerConfig, + }; + + let rocksmq_config = + RocksMQProducerConfig::new(path.clone(), topic.clone(), extra.clone()); + let protocol = RocksMQOutputProtocol::new(rocksmq_config); + let runtime = output_config.output_runtime_config(); + Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime))) + } } } } diff --git a/src/runtime/output/protocol/mod.rs b/src/runtime/output/protocol/mod.rs index 20e4d1c5..ee299d6f 100644 --- a/src/runtime/output/protocol/mod.rs +++ b/src/runtime/output/protocol/mod.rs @@ -15,3 +15,4 @@ // Provides implementations of various output protocols pub mod kafka; +pub mod rocksmq; diff --git a/src/runtime/output/protocol/rocksmq/mod.rs b/src/runtime/output/protocol/rocksmq/mod.rs new file mode 100644 index 00000000..c5e7ebb5 --- /dev/null +++ b/src/runtime/output/protocol/rocksmq/mod.rs @@ -0,0 +1,17 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod producer_config; +pub mod rocksmq_protocol; + +pub use producer_config::RocksMQProducerConfig; +pub use rocksmq_protocol::RocksMQOutputProtocol; diff --git a/src/runtime/output/protocol/rocksmq/producer_config.rs b/src/runtime/output/protocol/rocksmq/producer_config.rs new file mode 100644 index 00000000..29fab1f2 --- /dev/null +++ b/src/runtime/output/protocol/rocksmq/producer_config.rs @@ -0,0 +1,32 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! RocksMQ producer config (stub). Replace with real client config when available. + +use std::collections::HashMap; + +#[derive(Debug, Clone)] +pub struct RocksMQProducerConfig { + pub path: String, + pub topic: String, + pub properties: HashMap, +} + +impl RocksMQProducerConfig { + pub fn new(path: String, topic: String, properties: HashMap) -> Self { + Self { + path, + topic, + properties, + } + } +} diff --git a/src/runtime/output/protocol/rocksmq/rocksmq_protocol.rs b/src/runtime/output/protocol/rocksmq/rocksmq_protocol.rs new file mode 100644 index 00000000..3b283291 --- /dev/null +++ b/src/runtime/output/protocol/rocksmq/rocksmq_protocol.rs @@ -0,0 +1,49 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! RocksMQ output protocol (stub). +//! +//! RocksMQ has no official Rust client. This stub implements OutputProtocol +//! but send() is a no-op. Replace with a real producer when a client is available. + +use super::producer_config::RocksMQProducerConfig; +use crate::runtime::buffer_and_event::BufferOrEvent; +use crate::runtime::output::output_protocol::OutputProtocol; + +pub struct RocksMQOutputProtocol { + config: RocksMQProducerConfig, +} + +impl RocksMQOutputProtocol { + pub fn new(config: RocksMQProducerConfig) -> Self { + Self { config } + } +} + +impl OutputProtocol for RocksMQOutputProtocol { + fn name(&self) -> String { + format!("rocksmq-{}", self.config.topic) + } + + fn init(&self) -> Result<(), Box> { + Ok(()) + } + + fn send(&self, _data: BufferOrEvent) -> Result<(), Box> { + // Stub: no-op until a real RocksMQ producer is integrated. + Ok(()) + } + + fn flush(&self) -> Result<(), Box> { + Ok(()) + } +} diff --git a/src/runtime/task/processor_config.rs b/src/runtime/task/processor_config.rs index fe515647..5670d816 100644 --- a/src/runtime/task/processor_config.rs +++ b/src/runtime/task/processor_config.rs @@ -144,6 +144,17 @@ pub enum InputConfig { #[serde(default)] runtime: InputRuntimeConfig, }, + /// RocksMQ (e.g. Milvus embedded MQ). Stub implementation; requires a real client when available. + RocksMQ { + path: String, + topic: String, + #[serde(default)] + consumer_id: Option, + #[serde(flatten)] + extra: HashMap, + #[serde(default)] + runtime: InputRuntimeConfig, + }, } impl InputConfig { @@ -172,12 +183,14 @@ impl InputConfig { pub fn input_type(&self) -> &'static str { match self { InputConfig::Kafka { .. } => "kafka", + InputConfig::RocksMQ { .. } => "rocksmq", } } pub fn input_runtime_config(&self) -> InputRuntimeConfig { match self { InputConfig::Kafka { runtime, .. } => runtime.clone(), + InputConfig::RocksMQ { runtime, .. } => runtime.clone(), } } } @@ -520,6 +533,15 @@ pub enum OutputConfig { #[serde(default)] runtime: OutputRuntimeConfig, }, + /// RocksMQ (e.g. Milvus embedded MQ). Stub implementation; requires a real client when available. + RocksMQ { + path: String, + topic: String, + #[serde(flatten)] + extra: HashMap, + #[serde(default)] + runtime: OutputRuntimeConfig, + }, } impl OutputConfig { @@ -547,12 +569,14 @@ impl OutputConfig { pub fn output_type(&self) -> &'static str { match self { OutputConfig::Kafka { .. } => "kafka", + OutputConfig::RocksMQ { .. } => "rocksmq", } } pub fn output_runtime_config(&self) -> OutputRuntimeConfig { match self { OutputConfig::Kafka { runtime, .. } => runtime.clone(), + OutputConfig::RocksMQ { runtime, .. } => runtime.clone(), } } }