Skip to content

Commit e8c27b9

Browse files
committed
update input and output
1 parent fd27f46 commit e8c27b9

25 files changed

Lines changed: 1347 additions & 2445 deletions

src/runtime/input/input.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
use crate::runtime::buffer_and_event::BufferOrEvent;
14+
use crate::runtime::taskexecutor::InitContext;
15+
16+
pub use crate::runtime::common::ComponentState as InputState;
17+
18+
pub trait Input: Send + Sync {
19+
fn init_with_context(
20+
&mut self,
21+
init_context: &InitContext,
22+
) -> Result<(), Box<dyn std::error::Error + Send>>;
23+
24+
fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send>>;
25+
26+
fn stop(&mut self) -> Result<(), Box<dyn std::error::Error + Send>>;
27+
28+
fn close(&mut self) -> Result<(), Box<dyn std::error::Error + Send>>;
29+
30+
fn get_next(&mut self) -> Result<Option<BufferOrEvent>, Box<dyn std::error::Error + Send>>;
31+
32+
fn poll_next(&mut self) -> Result<Option<BufferOrEvent>, Box<dyn std::error::Error + Send>> {
33+
self.get_next()
34+
}
35+
36+
fn take_checkpoint(
37+
&mut self,
38+
checkpoint_id: u64,
39+
) -> Result<(), Box<dyn std::error::Error + Send>>;
40+
41+
fn finish_checkpoint(
42+
&mut self,
43+
checkpoint_id: u64,
44+
) -> Result<(), Box<dyn std::error::Error + Send>>;
45+
46+
fn get_group_id(&self) -> usize;
47+
48+
fn set_error_state(&self) -> Result<(), Box<dyn std::error::Error + Send>>;
49+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
use crate::runtime::buffer_and_event::BufferOrEvent;
14+
use std::time::Duration;
15+
16+
pub trait InputProtocol: Send + Sync + 'static {
17+
fn init(&self) -> Result<(), Box<dyn std::error::Error + Send>>;
18+
fn poll(
19+
&self,
20+
timeout: Duration,
21+
) -> Result<Option<BufferOrEvent>, Box<dyn std::error::Error + Send>>;
22+
fn name(&self) -> String;
23+
fn on_start(&self) -> Result<(), Box<dyn std::error::Error + Send>> {
24+
Ok(())
25+
}
26+
fn on_stop(&self) -> Result<(), Box<dyn std::error::Error + Send>> {
27+
Ok(())
28+
}
29+
fn on_close(&self) -> Result<(), Box<dyn std::error::Error + Send>> {
30+
Ok(())
31+
}
32+
fn on_checkpoint(&self, _id: u64) -> Result<(), Box<dyn std::error::Error + Send>> {
33+
Ok(())
34+
}
35+
fn on_checkpoint_finish(&self, _id: u64) -> Result<(), Box<dyn std::error::Error + Send>> {
36+
Ok(())
37+
}
38+
}

src/runtime/input/input_source_provider.rs renamed to src/runtime/input/input_provider.rs

Lines changed: 9 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -10,32 +10,16 @@
1010
// See the License for the specific language governing permissions and
1111
// limitations under the License.
1212

13-
// InputSourceProvider - Input source provider
14-
//
15-
// Creates InputSource instances from configuration objects
16-
17-
use crate::runtime::input::InputSource;
13+
use crate::runtime::input::Input;
1814
use crate::runtime::task::InputConfig;
1915

20-
/// InputSourceProvider - Input source provider
21-
///
22-
/// Creates InputSource instances from configuration objects
23-
pub struct InputSourceProvider;
16+
pub struct InputProvider;
2417

25-
impl InputSourceProvider {
26-
/// Create multiple InputSource from InputConfig list
27-
///
28-
/// # Arguments
29-
/// - `input_configs`: InputConfig list
30-
/// - `group_idx`: Input group index (used to identify which group the input source belongs to)
31-
///
32-
/// # Returns
33-
/// - `Ok(Vec<Box<dyn InputSource>>)`: Successfully created input source list
34-
/// - `Err(...)`: Configuration parsing or creation failed
18+
impl InputProvider {
3519
pub fn from_input_configs(
3620
input_configs: &[InputConfig],
3721
group_idx: usize,
38-
) -> Result<Vec<Box<dyn InputSource>>, Box<dyn std::error::Error + Send>> {
22+
) -> Result<Vec<Box<dyn Input>>, Box<dyn std::error::Error + Send>> {
3923
if input_configs.is_empty() {
4024
return Err(Box::new(std::io::Error::new(
4125
std::io::ErrorKind::InvalidData,
@@ -46,7 +30,6 @@ impl InputSourceProvider {
4630
)) as Box<dyn std::error::Error + Send>);
4731
}
4832

49-
// Check input source count limit (maximum 64)
5033
const MAX_INPUTS: usize = 64;
5134
if input_configs.len() > MAX_INPUTS {
5235
return Err(Box::new(std::io::Error::new(
@@ -60,7 +43,6 @@ impl InputSourceProvider {
6043
)) as Box<dyn std::error::Error + Send>);
6144
}
6245

63-
// Create InputSource for each InputConfig
6446
let mut inputs = Vec::new();
6547
for (input_idx, input_config) in input_configs.iter().enumerate() {
6648
let input = Self::from_input_config(input_config, group_idx, input_idx)?;
@@ -70,21 +52,11 @@ impl InputSourceProvider {
7052
Ok(inputs)
7153
}
7254

73-
/// Create InputSource from single InputConfig
74-
///
75-
/// # Arguments
76-
/// - `input_config`: Input source configuration
77-
/// - `group_idx`: Input group index (used to identify which group the input source belongs to)
78-
/// - `input_idx`: Input source index within group (used to identify different input sources within the same group)
79-
///
80-
/// # Returns
81-
/// - `Ok(Box<dyn InputSource>)`: Successfully created input source
82-
/// - `Err(...)`: Parsing failed
8355
fn from_input_config(
8456
input_config: &InputConfig,
8557
group_idx: usize,
8658
input_idx: usize,
87-
) -> Result<Box<dyn InputSource>, Box<dyn std::error::Error + Send>> {
59+
) -> Result<Box<dyn Input>, Box<dyn std::error::Error + Send>> {
8860
match input_config {
8961
InputConfig::Kafka {
9062
bootstrap_servers,
@@ -93,9 +65,9 @@ impl InputSourceProvider {
9365
group_id,
9466
extra,
9567
} => {
96-
use crate::runtime::input::protocol::kafka::{KafkaConfig, KafkaInputSource};
68+
use crate::runtime::input::InputRunner;
69+
use crate::runtime::input::protocol::kafka::{KafkaConfig, KafkaProtocol};
9770

98-
// Convert bootstrap_servers string to Vec<String>
9971
let servers: Vec<String> = bootstrap_servers
10072
.split(',')
10173
.map(|s| s.trim().to_string())
@@ -114,13 +86,8 @@ impl InputSourceProvider {
11486
)) as Box<dyn std::error::Error + Send>);
11587
}
11688

117-
// Convert partition from Option<u32> to Option<i32>
11889
let partition_i32 = partition.map(|p| p as i32);
119-
120-
// Merge extra configuration into properties
12190
let properties = extra.clone();
122-
123-
// Create KafkaConfig
12491
let kafka_config = KafkaConfig::new(
12592
servers,
12693
topic.clone(),
@@ -129,9 +96,8 @@ impl InputSourceProvider {
12996
properties,
13097
);
13198

132-
// Create KafkaInputSource, pass in group_idx and input_idx
133-
Ok(Box::new(KafkaInputSource::from_config(
134-
kafka_config,
99+
Ok(Box::new(InputRunner::new(
100+
KafkaProtocol::new(kafka_config),
135101
group_idx,
136102
input_idx,
137103
)))

0 commit comments

Comments
 (0)