Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,12 @@ public boolean usesDistribution() {
public int testDurationMinutes;

public int warmupDurationMinutes = 1;

/**
* If > 0, after warmup the consumers are paused for exactly this many seconds before being
* automatically resumed and the measurement window starts. Use this when you want time-based
* backlog accumulation (e.g. to ensure data is offloaded to tiered storage before consumers
* start draining), as an alternative to size-based {@link #consumerBacklogSizeGB}.
*/
public int consumeDelaySeconds = 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public TestResult run() throws Exception {
}

worker.resetStats();
applyConsumeDelayIfConfigured();
log.info("----- Starting benchmark traffic ({}m)------", workload.testDurationMinutes);

TestResult result = printAndCollectStats(workload.testDurationMinutes, TimeUnit.MINUTES);
Expand Down Expand Up @@ -253,6 +254,23 @@ public void close() throws Exception {
executor.shutdownNow();
}

private void applyConsumeDelayIfConfigured() throws IOException {
if (workload.consumeDelaySeconds <= 0) {
return;
}
worker.pauseConsumers();
log.info("----- Delaying consumption for {}s before measurement window -----",
workload.consumeDelaySeconds);
try {
Thread.sleep(workload.consumeDelaySeconds * 1000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
worker.resumeConsumers();
log.info("----- Consumers resumed after {}s delay -----", workload.consumeDelaySeconds);
}

private void createConsumers(List<String> topics) throws IOException {
ConsumerAssignment consumerAssignment = new ConsumerAssignment();

Expand Down