Skip to content

eschizoid/kpipe

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

71 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

β˜• KPipe - A Modern Kafka Consumer

GitHub release Codecov Build Status License

A modern, functional, and high-performance Kafka consumer built using Java 24 features like virtual threads, composable message processors, and DslJson for JSON processing. Features robust error handling, configurable retries, built-in metrics, and support for both parallel and sequential processing. Ideal for large-scale systems.


πŸš€ Why This Library?

βœ… Modern Java Features

  • Virtual Threads for massive concurrency with minimal overhead
  • Functional programming patterns for clean, maintainable code
  • High-performance JSON processing with DslJson

🧩 Functional Processing Pipeline

  • Message processors are pure functions (Function<V, V>) that transform data without side effects

  • Build complex pipelines through function composition using Function::andThen or the registry

  • Declarative processing lets you describe what to do, not how to do it

  • Higher-order functions enable conditional processing, retry logic, and error handling

  • Teams can register their own processors in a central registry via:

    // Register team-specific processors
    MessageProcessorRegistry.register("sanitizeData",
        JsonMessageProcessor.removeFields("password", "ssn"));
    
    // Create pipelines from registered processors
    final var pipeline = MessageProcessorRegistry.pipeline(
        "parseJson", "validateSchema", "sanitizeData", "addMetadata");
    
    // Apply transformations with built-in error handling and retry logic
    final var consumer = new FunctionalConsumer.<byte[], byte[]>builder()
      .withProcessor(pipeline)
      .withRetry(3, Duration.ofSeconds(1))
      .build();
    consumer.start();
    // Register custom processors for your team's needs
    MessageProcessorRegistry.register("extractMetadata", message -> {
      // Custom extraction logic here
      return processedMessage;
    });
    
    // Load processors from configuration
    String[] configuredProcessors = config.getStringArray("message.processors");
    Function<byte[], byte[]> pipeline = MessageProcessorRegistry.pipeline(configuredProcessors);
    
    // Create a consumer with team-specific processing pipeline
    final var consumer = new FunctionalConsumer.<byte[], byte[]>builder()
      .withProperties(kafkaProps)
      .withTopic("team-topic")
      .withProcessor(MessageProcessorRegistry.pipeline(
          "parseJson",
          "validateSchema",
          "extractMetadata",
          "addTeamIdentifier"))
    .withErrorHandler(error -> publishToErrorTopic(error))
    .withRetry(3, Duration.ofSeconds(1))
    .build();
    
    // Use the consumer
    consumer.start();

πŸ“¦ Installation

Maven

<dependency>
    <groupId>io.github.eschizoid</groupId>
    <artifactId>kpipe</artifactId>
    <version>0.2.0</version>
</dependency>

Gradle (Groovy)

implementation 'io.github.eschizoid:kpipe:0.2.0'

Gradle (Kotlin)

implementation("io.github.eschizoid:kpipe:0.2.0")

SBT

libraryDependencies += "io.github.eschizoid" % "kpipe" % "0.2.0"

πŸ“ Project Structure

KPipe is organized into two main modules:

Library Module (lib)

The core library that provides the Kafka consumer functionality:

β”œβ”€β”€ src/main/java/org/kpipe/
β”‚   β”œβ”€β”€ consumer/                         # Core consumer components
β”‚   β”‚   β”œβ”€β”€ FunctionalConsumer.java       # Main functional consumer implementation
β”‚   β”‚   β”œβ”€β”€ OffsetManager.java            # Manages Kafka offsets for reliable processing
β”‚   β”‚   β”œβ”€β”€ MessageTracker.java           # Tracks message processing state
β”‚   β”‚   β”œβ”€β”€ RebalanceListener.java        # Handles Kafka consumer rebalancing
β”‚   β”‚   └── enums/                        # Enums for consumer states and commands
β”‚   β”‚
β”‚   β”œβ”€β”€ processor/                        # Message processors
β”‚   β”‚   β”œβ”€β”€ JsonMessageProcessor.java     # JSON processing with DslJson
β”‚   β”‚   └── AvroMessageProcessor.java     # Avro processing with Apache Avro
β”‚   β”‚
β”‚   β”œβ”€β”€ registry/                         # Registry components
β”‚   β”‚   β”œβ”€β”€ MessageProcessorRegistry.java # Registry for processor functions
β”‚   β”‚   β”œβ”€β”€ MessageSinkRegistry.java      # Registry for message sinks
β”‚   β”‚   β”œβ”€β”€ MessageFormat.java            # Enum for message format types
β”‚   β”‚   └── RegistryFunctions.java        # Shared utilities for registries
β”‚   β”‚
β”‚   β”œβ”€β”€ sink/                             # Message sink implementations
β”‚   β”‚   β”œβ”€β”€ JsonConsoleSink.java          # Console sink for JSON messages
β”‚   β”‚   β”œβ”€β”€ AvroConsoleSink.java          # Console sink for Avro messages
β”‚   β”‚   └── MessageSink.java              # Message sink interface
β”‚   β”‚
β”‚   β”œβ”€β”€ config/                           # Configuration components
β”‚   β”‚   β”œβ”€β”€ AppConfig.java                # Application configuration
β”‚   β”‚   └── KafkaConsumerConfig.java      # Kafka consumer configuration
β”‚   β”‚
β”‚   └── metrics/                          # Metrics components
β”‚       β”œβ”€β”€ ConsumerMetricsReporter.java  # Reports consumer metrics
β”‚       β”œβ”€β”€ MetricsReporter.java          # Metrics reporting interface
β”‚       └── ProcessorMetricsReporter.java # Reports processor metrics

Application Module (app)

A ready-to-use application that demonstrates the library:

β”œβ”€β”€ src/main/java/org/kpipe/
β”‚   |── json/
β”‚   β”‚   └── App.java # Main application class to demonostrate JSON integration
|   |
β”‚   └── avro/
β”‚       └── App.java # Main application class to demonostrate Avro integration

βš™οΈ Example: Add Custom Processor

Extend the registry like this:

// Register a processor for JSON field transformations
MessageProcessorRegistry.register("uppercase", bytes ->
    JsonMessageProcessor.transformField("text", value -> {
        if (value instanceof String text) {
            return text.toUpperCase();
        }
        return value;
    }).apply(bytes)
);

// Register a processor that adds environment information
MessageProcessorRegistry.register("addEnvironment",
    JsonMessageProcessor.addField("environment", "production"));

// Create a reusable processor pipeline
final var pipeline = MessageProcessorRegistry.pipeline(
    "parseJson", "validateSchema", "addEnvironment", "uppercase", "addTimestamp"
);

// Use the pipeline with a consumer
final var consumer = new FunctionalConsumer.<byte[], byte[]>builder()
    .withProperties(kafkaProps)
    .withTopic("events")
    .withProcessor(pipeline)
    .withRetry(3, Duration.ofSeconds(1))
    .build();

// Start processing messages
consumer.start();

πŸ“Š Built-in Metrics

Monitor your consumer with built-in metrics:

// Access consumer metrics
Map<String, Long> metrics = consumer.getMetrics();
System.out.println("Messages received: " + metrics.get("messagesReceived"));
System.out.println("Successfully processed: " + metrics.get("messagesProcessed"));
System.out.println("Processing errors: " + metrics.get("processingErrors"));

Configure automatic metrics reporting:

new App(config)
  .withMetricsInterval(Duration.ofSeconds(30))
  .start();

πŸ›‘οΈ Graceful Shutdown

The consumer supports graceful shutdown with in-flight message handling:

// Initiate graceful shutdown with 5-second timeout
boolean allProcessed = kafkaApp.shutdownGracefully(5000);
if (allProcessed) {
  LOGGER.info("All messages processed successfully before shutdown");
} else {
  LOGGER.warning("Shutdown completed with some messages still in flight");
}

// Register as JVM shutdown hook
Runtime.getRuntime().addShutdownHook(
  new Thread(() -> app.shutdownGracefully(5000))
);

πŸ”§ Working with Messages

JSON Processing

The JSON processors handle deserialization and transformation of JSON data:

// Add a timestamp field to messages
final var addTimestampProcessor = JsonMessageProcessor.addTimestamp("processedAt");

// Remove sensitive fields
final var sanitizeProcessor = JsonMessageProcessor.removeFields("password", "ssn", "creditCard");

// Transform specific fields
final var uppercaseSubjectProcessor = JsonMessageProcessor.transformField("subject", value -> {
    if (value instanceof String text) {
        return text.toUpperCase();
    }
    return value;
});

// Add metadata to messages
final var metadata = new HashMap<String, Object>();
metadata.put("version", "1.0");
metadata.put("environment", "production");
var addMetadataProcessor = JsonMessageProcessor.mergeWith(metadata);

// Combine processors into a pipeline
Function<byte[], byte[]> pipeline = message -> addMetadataProcessor.apply(
    uppercaseSubjectProcessor.apply(
        sanitizeProcessor.apply(
            addTimestampProcessor.apply(message)
        )
    )
);

// Or use the registry to build pipelines
final var registryPipeline = MessageProcessorRegistry.pipeline(
    "sanitize", "addTimestamp", "uppercaseSubject", "addMetadata"
);

Avro Processing

The Avro processors handle deserialization and transformation of Avro data:

// Register an Avro schema
AvroMessageProcessor.registerSchema("userSchema", userSchemaJson);

// Parse Avro messages
final var parseProcessor = AvroMessageProcessor.parseAvro("userSchema");

// Add a timestamp field to messages
final var addTimestampProcessor = AvroMessageProcessor.addTimestamp("userSchema", "processedAt");

// Remove sensitive fields
final var sanitizeProcessor = AvroMessageProcessor.removeFields("userSchema", "password", "creditCard");

// Transform specific fields
final var uppercaseNameProcessor = AvroMessageProcessor.transformField(
  "userSchema", 
  "name", 
  value -> {
    if (value instanceof String text) {
      return text.toUpperCase();
    }
    return value;
  }
);

// Add multiple fields at once
final var fieldsToAdd = Map.of(
  "version", "1.0",
  "environment", "production"
);
final var addFieldsProcessor = AvroMessageProcessor.addFields("userSchema", fieldsToAdd);

// Compose processors into a pipeline
final var pipeline = AvroMessageProcessor.compose(
  parseProcessor,
  addTimestampProcessor,
  sanitizeProcessor,
  uppercaseNameProcessor,
  addFieldsProcessor
);

// Or register in the registry and build pipelines
MessageProcessorRegistry.register("parseAvro", parseProcessor);
MessageProcessorRegistry.register("addTimestamp", addTimestampProcessor);
MessageProcessorRegistry.register("sanitize", sanitizeProcessor);
MessageProcessorRegistry.register("uppercaseName", uppercaseNameProcessor);
MessageProcessorRegistry.register("addFields", addFieldsProcessor);

final var registryPipeline = MessageProcessorRegistry.pipeline(
  "parseAvro", "addTimestamp", "sanitize", "uppercaseName", "addFields"
);

πŸ“€ Message Sinks

Message sinks provide destinations for processed messages. The MessageSink interface is a functional interface that defines a single method:

@FunctionalInterface
public interface MessageSink<K, V> {
  void send(final ConsumerRecord<K, V> record, final V processedValue);
}

Built-in Sinks

KPipe provides several built-in sinks:

// Create a JSON console sink that logs messages at INFO level
final var jsonConsoleSink = new JsonConsoleSink<>(
  System.getLogger("org.kpipe.sink.JsonConsoleSink"), 
  Level.INFO
);

// Create an Avro console sink that logs messages at INFO level
final var avroConsoleSink = new AvroConsoleSink<>(
  System.getLogger("org.kpipe.sink.AvroConsoleSink"), 
  Level.INFO
);

// Use a sink with a consumer
final var consumer = new FunctionalConsumer.<String, byte[]>builder()
  .withProperties(kafkaProps)
  .withTopic("events")
  .withProcessor(pipeline)
  .withMessageSink(jsonConsoleSink)
  .build();

Custom Sinks

You can create custom sinks using lambda expressions:

// Create a custom sink that writes to a database
MessageSink<String, byte[]> databaseSink = (record, processedValue) -> {
  try {
    // Parse the processed value
    Map<String, Object> data = JsonMessageProcessor.parseJson().apply(processedValue);

    // Write to database
    databaseService.insert(data);

    // Log success
    logger.info("Successfully wrote message to database: " + record.key());
  } catch (Exception e) {
    logger.error("Failed to write message to database", e);
  }
};

// Use the custom sink with a consumer
final var consumer = new FunctionalConsumer.<String, byte[]>builder()
  .withMessageSink(databaseSink)
  .build();

Message Sink Registry

The MessageSinkRegistry provides a centralized repository for registering and retrieving message sinks:

// Create a registry
final var registry = new MessageSinkRegistry();

// Register sinks
registry.register("console", new JsonConsoleSink<>(logger, Level.INFO));
registry.register("database", databaseSink);
registry.register("metrics", (record, value) -> metricsService.recordMessage(record.topic(), value.length));

// Create a pipeline of sinks
final var sinkPipeline = registry.<String, byte[]>pipeline("console", "database", "metrics");

// Use the sink pipeline with a consumer
final var consumer = new FunctionalConsumer.<String, byte[]>builder()
  .withMessageSink(sinkPipeline)
  .build();

Error Handling in Sinks

The registry provides utilities for adding error handling to sinks:

// Create a sink with error handling
final var safeSink = MessageSinkRegistry.withErrorHandling(
  riskySink,
  (record, value, error) -> logger.error("Error in sink: " + error.getMessage())
);

// Or use the registry's error handling
final var safePipeline = registry.<String, byte[]>pipelineWithErrorHandling(
  "console", "database", "metrics",
  (record, value, error) -> errorService.reportError(record.topic(), error)
);

πŸ”„ Consumer Runner

The ConsumerRunner provides a high-level management layer for Kafka consumers, handling lifecycle, metrics, and graceful shutdown:

// Create a consumer runner with default settings
ConsumerRunner<FunctionalConsumer<String, String>> runner = ConsumerRunner.builder(consumer)
  .build();

// Start the consumer
runner.start();

// Wait for shutdown
runner.awaitShutdown();

Advanced Configuration

The ConsumerRunner supports extensive configuration options:

// Create a consumer runner with advanced configuration
ConsumerRunner<FunctionalConsumer<String, String>> runner = ConsumerRunner.builder(consumer)
  // Configure metrics reporting
  .withMetricsReporter(new ConsumerMetricsReporter(
    consumer::getMetrics,
    () -> System.currentTimeMillis() - startTime
  ))
  .withMetricsInterval(30000) // Report metrics every 30 seconds

  // Configure health checks
  .withHealthCheck(c -> c.getState() == ConsumerState.RUNNING)

  // Configure graceful shutdown
  .withShutdownTimeout(10000) // 10 seconds timeout for shutdown
  .withShutdownHook(true) // Register JVM shutdown hook

  // Configure custom start action
  .withStartAction(c -> {
    logger.info("Starting consumer for topic: " + c.getTopic());
    c.start();
  })

  // Configure custom graceful shutdown
  .withGracefulShutdown((c, timeoutMs) -> {
      logger.info("Initiating graceful shutdown with timeout: " + timeoutMs + "ms");
      return c.shutdownGracefully(timeoutMs);
  })

  .build();

Lifecycle Management

The ConsumerRunner manages the complete lifecycle of a consumer:

// Start the consumer (idempotent - safe to call multiple times)
runner.start();

// Check if the consumer is healthy
boolean isHealthy = runner.isHealthy();

// Wait for shutdown (blocks until shutdown completes)
boolean cleanShutdown = runner.awaitShutdown();

// Initiate shutdown
runner.close();

Metrics Integration

The ConsumerRunner integrates with metrics reporting:

// Add multiple metrics reporters
ConsumerRunner<FunctionalConsumer<String, String>> runner = ConsumerRunner.builder(consumer)
  .withMetricsReporters(List.of(
    new ConsumerMetricsReporter(consumer::getMetrics, () -> System.currentTimeMillis() - startTime),
    new ProcessorMetricsReporter(registry)
  ))
  .withMetricsInterval(60000) // Report every minute
  .build();

Using with AutoCloseable

The ConsumerRunner implements AutoCloseable for use with try-with-resources:

try (ConsumerRunner<FunctionalConsumer<String, String>> runner = ConsumerRunner.builder(consumer).build()) {
  runner.start();
  // Application logic here
  // Runner will be automatically closed when exiting the try block
}

πŸ” Application Example

Here's a concise example of a KPipe application:

public class KPipeApp implements AutoCloseable {
  private final ConsumerRunner<FunctionalConsumer<byte[], byte[]>> runner;

  public static void main(final String[] args) {
    // Load configuration from environment variables
    final var config = AppConfig.fromEnv();

    try (final var app = new MyKafkaApp(config)) {
      app.start();
      app.awaitShutdown();
    } catch (final Exception e) {
      System.getLogger(MyKafkaApp.class.getName())
        .log(System.Logger.Level.ERROR, "Fatal error in application", e);
      System.exit(1);
    }
  }

  public MyKafkaApp(final AppConfig config) {
    // Create processor and sink registries
    final var processorRegistry = new MessageProcessorRegistry(config.appName());
    final var sinkRegistry = new MessageSinkRegistry();

    // Create the functional consumer
    final var functionalConsumer = FunctionalConsumer.<byte[], byte[]>builder()
      .withProperties(KafkaConsumerConfig.createConsumerConfig(
        config.bootstrapServers(), config.consumerGroup()))
      .withTopic(config.topic())
      .withProcessor(processorRegistry.pipeline(
        "parseJson", "addSource", "markProcessed", "addTimestamp"))
      .withMessageSink(sinkRegistry.<byte[], byte[]>pipeline("logging"))
      .withOffsetManagerProvider(consumer -> 
        OffsetManager.builder(consumer)
          .withCommitInterval(Duration.ofSeconds(30))
          .build())
      .withMetrics(true)
      .build();

    // Set up the consumer runner with metrics and shutdown hooks
    runner = ConsumerRunner.builder(functionalConsumer)
      .withMetricsInterval(config.metricsInterval().toMillis())
      .withShutdownTimeout(config.shutdownTimeout().toMillis())
      .withShutdownHook(true)
      .build();
  }

  public void start() { runner.start(); }
  public boolean awaitShutdown() { return runner.awaitShutdown(); }
  public void close() { runner.close(); }
}

Key Components:

  • Configuration from environment variables
  • Processor and sink registries for message handling
  • Processing pipeline with error handling
  • Metrics reporting and graceful shutdown

To Run:

# Set configuration
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export KAFKA_CONSUMER_GROUP=my-group
export KAFKA_TOPIC=json-events

# Run the application
./gradlew run

# Test with a sample message
echo '{"message":"Hello from KPipe!"}' | kcat -P -b localhost:9092 -t json-events

πŸ› οΈ Requirements

  • Java 24+
  • Gradle (for building the project)
  • kcat (for testing)
  • Docker (for local Kafka setup)

βš™οΈ Configuration

Configure via environment variables:

export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export KAFKA_CONSUMER_GROUP=my-consumer-group
export KAFKA_TOPIC=json-events
export KAFKA_PROCESSORS=parseJson,validateSchema,addTimestamp
export METRICS_INTERVAL_MS=30000
export SHUTDOWN_TIMEOUT_MS=5000

πŸ§ͺ Testing

Follow these steps to test the KPipe Kafka Consumer:

Build and Run

# Format code and build the library module
./gradlew clean :lib:spotlessApply :lib:build

# Format code and build the applications module
./gradlew :app:clean :app:spotlessApply :app:build

# Build the consumer app container and start all services
docker compose build --no-cache --build-arg MESSAGE_FORMAT=<json|avro|protobuf>
docker compose down -v
docker compose up -d

# Publish a simple JSON message to the json-topic
echo '{"message":"Hello world"}' | kcat -P -b kafka:9092 -t json-topic

# For complex JSON messages, use a file
cat test-message.json | kcat -P -b kafka:9092 -t json-topic

# Publish multiple test messages
for i in {1..10}; do echo "{\"id\":$i,\"message\":\"Test message $i\"}" | \
  kcat -P -b kafka:9092 -t json-topic; done

Working with the Schema Registry and Avro

If you want to use Avro with a schema registry, follow these steps:

# Register an Avro schema
curl -X POST \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data "{\"schema\": $(cat lib/src/test/resources/avro/customer.avsc | jq tostring)}" \
  http://localhost:8081/subjects/com.kpipe.customer/versions

# Read registered schema 
curl -s http://localhost:8081/subjects/com.kpipe.customer/versions/latest | jq -r '.schema' | jq --indent 2 '.'

# Produce an Avro message using kafka-avro-console-producer
echo '{"id":1,"name":"Mariano Gonzalez","email":{"string":"mariano@example.com"},"active":true,"registrationDate":1635724800000,"address":{"com.kpipe.customer.Address":{"street":"123 Main St","city":"Chicago","zipCode":"00000","country":"USA"}},"tags":["premium","verified"],"preferences":{"notifications":"email"}}' \
| docker run -i --rm --network=host confluentinc/cp-schema-registry:latest \
    kafka-avro-console-producer \
    --broker-list localhost:9092 \
    --topic avro-topic \
    --property schema.registry.url=http://localhost:8081 \
    --property value.schema.id=1

Kafka consumer will:

  • Connect to localhost:9092
  • Subscribe to avro-topic|json-topic|protobuf-topic
  • Compose the processing pipeline from configured processors
  • Process each message concurrently using virtual threads

πŸ” Best Practices & Advanced Patterns

Composing Complex Processing Pipelines

For maintainable pipelines, group related processors:

// Create focused processor groups
final var securityProcessors = MessageProcessorRegistry.pipeline(
    "sanitizeData", "encryptSensitiveFields", "addAuditTrail");

final var enrichmentProcessors = MessageProcessorRegistry.pipeline(
    "addMetadata", "addTimestamp", "addEnvironment");

// Compose them into a master pipeline
final var fullPipeline = message -> enrichmentProcessors.apply(
    securityProcessors.apply(message));

// Or register the composed pipeline
MessageProcessorRegistry.register("standardProcessing", fullPipeline);

Conditional Processing

The library provides a built-in when() method for conditional processing:

// Create a predicate that checks message type
Predicate<byte[]> isOrderMessage = message -> {
    try {
        Map<String, Object> parsed = JsonMessageProcessor.parseJson().apply(message);
        return "ORDER".equals(parsed.get("type"));
    } catch (Exception e) {
        return false;
    }
};

// Use the built-in conditional processor
Function<byte[], byte[]> conditionalProcessor = MessageProcessorRegistry.when(
    isOrderMessage,
    MessageProcessorRegistry.get("orderEnrichment"),
    MessageProcessorRegistry.get("defaultEnrichment")
);

// Register the conditional pipeline
MessageProcessorRegistry.register("smartProcessing", conditionalProcessor);

Thread-Safety Considerations

  • Message processors should be stateless and thread-safe
  • Avoid shared mutable state between processors
  • Use immutable data structures where possible
  • For processors with side effects (like database calls), consider using thread-local variables

Performance Optimization

  • Register frequently used processor combinations as single processors
  • For very large messages, consider streaming JSON processors
  • Profile your processor pipeline to identify bottlenecks

πŸ“ˆ Performance Notes

  • Virtual threads are 1:1 with Kafka records β€” scales to 100k+ messages/sec
  • Zero-GC JSON processing
  • Safe and efficient memory model using modern Java features

πŸ“š Inspiration

This library is inspired by the best practices from:


πŸ’¬ Contributing

If you're a team using this library, feel free to:

  • Register custom processors
  • Add metrics/observability hooks
  • Share improvements or retry strategies

πŸ“„ License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.


🧠 Final Thoughts

This Kafka consumer is:

  • Functional
  • Extensible
  • Future-proof

Use it to modernize your Kafka stack with Java 24 elegance and performance.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •