Skip to content

Commit 9433126

Browse files
committed
update test
1 parent 1bbbb92 commit 9433126

3 files changed

Lines changed: 58 additions & 73 deletions

File tree

examples/examples-validator/kafka_test.rs

Lines changed: 54 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,11 @@ use clap::Parser;
1515
use rdkafka::config::ClientConfig;
1616
use rdkafka::consumer::{Consumer, StreamConsumer};
1717
use rdkafka::message::{BorrowedMessage, Message};
18-
use rdkafka::producer::{FutureProducer, FutureRecord};
19-
use rdkafka::util::Timeout;
18+
use rdkafka::producer::{BaseRecord, DefaultProducerContext, Producer, ThreadedProducer};
2019
use serde::Deserialize;
2120
use std::collections::HashMap;
2221
use std::sync::Arc;
2322
use std::time::Duration;
24-
use tokio::signal;
2523
use tokio::sync::Notify;
2624
use tracing::{error, info, warn};
2725
#[derive(Parser, Debug, Clone)]
@@ -85,25 +83,33 @@ async fn main() -> Result<()> {
8583
// Create a notification channel for graceful shutdown
8684
let shutdown_notify = Arc::new(Notify::new());
8785
let shutdown_consumer = shutdown_notify.clone();
86+
let consumer: StreamConsumer = ClientConfig::new()
87+
.set("group.id", &config.group_id)
88+
.set("bootstrap.servers", &config.brokers)
89+
.set("enable.partition.eof", "false")
90+
.set("session.timeout.ms", "6000")
91+
// Auto-commit is enabled, but strictly robust apps might manage offsets manually
92+
.set("enable.auto.commit", "true")
93+
.set("auto.offset.reset", "earliest")
94+
.create()
95+
.context("Failed to create Kafka Consumer")?;
96+
97+
consumer
98+
.subscribe(&[&config.output_topic])
99+
.context("Failed to subscribe to topic")?;
100+
info!(
101+
"Initializing Consumer listening on topic: {}",
102+
config.output_topic
103+
);
104+
105+
let consumer_handle = tokio::spawn(run_consumer(config.clone(), shutdown_consumer, consumer));
88106

89107
// Spawn Producer and Consumer as concurrent async tasks
90-
let producer_handle = tokio::spawn(run_producer(config.clone()));
91-
let consumer_handle = tokio::spawn(run_consumer(config.clone(), shutdown_consumer));
92-
93-
// Wait for system signals (Ctrl+C / SIGTERM)
94-
match signal::ctrl_c().await {
95-
Ok(()) => {
96-
info!("Shutdown signal received. Stopping services gracefully...");
97-
// Notify the consumer loop to break
98-
shutdown_notify.notify_waiters();
99-
}
100-
Err(err) => {
101-
error!("Unable to listen for shutdown signal: {}", err);
102-
}
103-
}
108+
let _ = tokio::spawn(run_producer(config.clone()));
104109

110+
info!("Application execution completed.1");
105111
// Wait for tasks to finish (cleanup)
106-
let _ = tokio::join!(producer_handle, consumer_handle);
112+
let _ = tokio::join!(consumer_handle);
107113

108114
info!("Application execution completed.");
109115
Ok(())
@@ -114,9 +120,12 @@ async fn main() -> Result<()> {
114120
// ============================================================================
115121

116122
async fn run_producer(config: AppConfig) -> Result<()> {
117-
info!("Initializing Producer targeting topic: {}", config.input_topic);
123+
info!(
124+
"Initializing Producer targeting topic: {}",
125+
config.input_topic
126+
);
118127

119-
let producer: FutureProducer = ClientConfig::new()
128+
let producer: ThreadedProducer<DefaultProducerContext> = ClientConfig::new()
120129
.set("bootstrap.servers", &config.brokers)
121130
.set("message.timeout.ms", "5000")
122131
// Enable idempotence for exactly-once semantics
@@ -125,84 +134,60 @@ async fn run_producer(config: AppConfig) -> Result<()> {
125134
.context("Failed to create Kafka Producer")?;
126135

127136
let test_data = vec!["apple", "banana", "cherry", "date", "elderberry"];
128-
129137
// Simulate data stream
130138
for i in 0..config.msg_count {
131139
let data = test_data[i % test_data.len()];
132140
let key = format!("key-{}", i);
133141

134142
// Construct the record
135-
let record = FutureRecord::to(&config.input_topic)
136-
.key(&key)
137-
.payload(data);
143+
let record = BaseRecord::to(&config.input_topic).key(&key).payload(data);
138144

139145
// Send asynchronously with a timeout
140-
let delivery_status = producer
141-
.send(record, Timeout::After(Duration::from_secs(5)))
142-
.await;
143-
144-
match delivery_status {
145-
Ok(delivery) => {
146-
// Log progress every 1000 messages to reduce noise
147-
if (i + 1) % 1000 == 0 {
148-
info!(
149-
"✓ Produced [{}/{}] -> Partition: {}, Offset: {}",
150-
i + 1, config.msg_count, delivery.partition, delivery.offset
151-
);
152-
}
153-
}
154-
Err((e, _)) => error!("✗ Failed to send message #{}: {:?}", i, e),
155-
}
146+
let _ = producer.send(record);
156147

157-
// Slight delay to simulate real-world throughput (prevents local buffer overflow)
158-
tokio::time::sleep(Duration::from_millis(1)).await;
148+
if (i + 1) % config.msg_count == 0 {
149+
let _ = producer.flush(Duration::from_secs(10));
150+
}
159151
}
160152

161-
info!("Producer finished. Total messages sent: {}", config.msg_count);
153+
info!(
154+
"Producer finished. Total messages sent: {}",
155+
config.msg_count
156+
);
162157
Ok(())
163158
}
164159

165160
// ============================================================================
166161
// 4. Consumer Logic
167162
// ============================================================================
168163

169-
async fn run_consumer(config: AppConfig, shutdown: Arc<Notify>) -> Result<()> {
170-
info!("Initializing Consumer listening on topic: {}", config.output_topic);
171-
172-
let consumer: StreamConsumer = ClientConfig::new()
173-
.set("group.id", &config.group_id)
174-
.set("bootstrap.servers", &config.brokers)
175-
.set("enable.partition.eof", "false")
176-
.set("session.timeout.ms", "6000")
177-
// Auto-commit is enabled, but strictly robust apps might manage offsets manually
178-
.set("enable.auto.commit", "true")
179-
.set("auto.offset.reset", "earliest")
180-
.create()
181-
.context("Failed to create Kafka Consumer")?;
182-
183-
consumer
184-
.subscribe(&[&config.output_topic])
185-
.context("Failed to subscribe to topic")?;
186-
164+
async fn run_consumer(
165+
config: AppConfig,
166+
shutdown: Arc<Notify>,
167+
consumer: StreamConsumer,
168+
) -> Result<()> {
187169
info!("Consumer started. Waiting for messages...");
188170

171+
let mut i = 0;
172+
189173
loop {
190174
tokio::select! {
191-
// Branch 1: Receive message from Kafka
192175
recv_result = consumer.recv() => {
193176
match recv_result {
194177
Ok(m) => {
195-
// Process the message (Validation & Logging)
196178
if let Err(e) = process_message(&m) {
197-
error!("Business logic error while processing message: {:?}", e);
179+
error!("Error: {:?}", e);
180+
}
181+
182+
i += 1;
183+
if i >= config.msg_count {
184+
return Ok(());
198185
}
199186
},
200-
Err(e) => warn!("Kafka error during consumption: {}", e),
187+
Err(e) => warn!("Kafka error: {}", e),
201188
}
202189
}
203-
// Branch 2: Handle graceful shutdown signal
204190
_ = shutdown.notified() => {
205-
info!("Consumer received shutdown signal. Exiting loop.");
206191
break;
207192
}
208193
}
@@ -250,7 +235,7 @@ fn process_message(message: &BorrowedMessage) -> Result<()> {
250235
"✗ Data Validation FAILED: Sum mismatch"
251236
);
252237
}
253-
},
238+
}
254239
Err(e) => {
255240
// Handling malformed JSON or schema mismatch
256241
error!(
@@ -264,4 +249,4 @@ fn process_message(message: &BorrowedMessage) -> Result<()> {
264249
}
265250

266251
Ok(())
267-
}
252+
}

src/runtime/input/protocol/kafka/input_source.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ use std::time::Duration;
3333
// ==================== Constants ====================
3434

3535
/// Default channel capacity (maximum number of messages in fixed-length channel)
36-
const DEFAULT_CHANNEL_CAPACITY: usize = 1000;
36+
const DEFAULT_CHANNEL_CAPACITY: usize = 10000;
3737

3838
/// Maximum number of messages for single batch consumption (to avoid continuous consumption preventing control signals from being processed)
39-
const MAX_BATCH_CONSUME_SIZE: usize = 50;
39+
const MAX_BATCH_CONSUME_SIZE: usize = 10000;
4040

4141
/// Control operation timeout (milliseconds)
4242
const CONTROL_OPERATION_TIMEOUT_MS: u64 = 5000;

src/runtime/output/protocol/kafka/output_sink.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ use std::sync::Mutex;
2626
// ==================== Constants ====================
2727

2828
/// Default channel capacity (maximum messages in bounded channel)
29-
const DEFAULT_CHANNEL_CAPACITY: usize = 1000;
29+
const DEFAULT_CHANNEL_CAPACITY: usize = 10000;
3030

3131
/// Maximum batch size per consume (prevents control signals from being blocked)
32-
const MAX_BATCH_CONSUME_SIZE: usize = 100;
32+
const MAX_BATCH_CONSUME_SIZE: usize = 10000;
3333

3434
/// Default flush timeout (milliseconds)
3535
const DEFAULT_FLUSH_TIMEOUT_MS: u64 = 5000;

0 commit comments

Comments
 (0)