Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions consumer_kafka_avro/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Kafka + Avro Consumer — Spring Cloud Contract samples

This module contains a Spring Boot application that listens to the `book.returned` Kafka topic.
When a message arrives, it is deserialized from Avro into a `Book` object
and passed to `EmailService` to send a notification email.

**Source:** [`BooksReturnedListener`](src/main/java/com/example/kafka/consumer/BooksReturnedListener.java)

---

## What is a collaboration test (consumer side)?

A collaboration test verifies that the consumer can correctly handle the messages
that the producer promised to publish.
Instead of running a live producer,
it uses the **stubs jar** produced by the producer's contract tests.
Spring Cloud Contract's Stub Runner loads that jar,
and the test triggers a specific message label to replay a realistic message on the Kafka topic.
The consumer processes it just as it would in production.

**Prerequisite:** build and install the producer stubs first:

---

## Background: how Avro works on the wire

Confluent's serializer writes a compact 5-byte prefix before each message —
one magic byte (`0x00`) plus a 4-byte schema ID —
and registers the schema in a **Schema Registry**.
The deserializer reads that ID, fetches the schema, and decodes the rest of the bytes.
Tests in this module use a mock schema registry (`mock://test`)
so no real registry server is needed.

---

## Two flavors of collaboration test

The producer module defines two contracts, one per flavor.
There is a matching collaboration test here for each.

### Flavor 1 — JSON (human-readable)

**Test:** [`AvroJsonCollaborationTest`](src/test/java/com/example/kafka/consumer/AvroJsonCollaborationTest.java)
**Triggered label:** `book_returned`

**How the test works:**

1. Stub Runner triggers the `book_returned` label from the stubs jar.
2. The test's `MessageVerifierSender` receives a JSON string from the stub
(the contract body fields),
converts it into a `Book` object,
and sends it to Kafka using `KafkaAvroSerializer`.
3. `BooksReturnedListener` receives the Avro-deserialized `Book`
and calls `EmailService.sendEmail()`.
4. The test asserts that `EmailService` was called with the expected email content.

**Trade-off:** Two extra JSON ↔ Avro conversions happen during the test,
but failure messages are easy to read —
you see exactly which field had the wrong value.

---

### Flavor 2 — Binary (exact wire format)

**Test:** [`AvroBinaryCollaborationTest`](src/test/java/com/example/kafka/consumer/AvroBinaryCollaborationTest.java)
**Triggered label:** `book_returned_binary`

**How the test works:**

1. Stub Runner triggers the `book_returned_binary` label,
which delivers the raw bytes from the producer's pre-serialized
[`bookReturnedMessage.bin`](../producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedMessage.bin) fixture.
2. The test's `MessageVerifierSender` puts those bytes directly on the Kafka topic
using `ByteArraySerializer` —
no Avro serialization happens in the test itself.
3. `BooksReturnedListener` receives the bytes,
`KafkaAvroDeserializer` decodes them into a `Book`,
and `EmailService.sendEmail()` is called.
4. The test asserts that `EmailService` was called with the expected email content.

**Trade-off:** The exact bytes that the producer emits in production
travel through the consumer's full deserialization stack,
with no intermediary conversion.
The downside is that failure messages show raw bytes and are harder to interpret.

> **Note:** Before the raw bytes can be deserialized,
> the `Book` Avro schema must be registered in the mock schema registry for this JVM.
> The test's `TestConfig` handles this automatically via a `@PostConstruct` method —
> see [`AvroBinaryCollaborationTest.TestConfig#registerBookSchema`](src/test/java/com/example/kafka/consumer/AvroBinaryCollaborationTest.java)
> for the explanation.
170 changes: 170 additions & 0 deletions consumer_kafka_avro/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>4.0.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>org.springframework.cloud</groupId>
<artifactId>consumer_kafka_avro</artifactId>
<version>5.0.2-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
<kafka-avro-serializer.version>8.1.1</kafka-avro-serializer.version>
<spring-cloud.version>2025.1.0</spring-cloud.version>
<avro.version>1.12.0</avro.version>
</properties>

<repositories>
<repository>
<id>confluent</id>
<name>Confluent Maven Repository</name>
<url>https://packages.confluent.io/maven/</url>
</repository>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-plugin-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.20.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka-avro-serializer.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-contract-stub-runner</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>4.0.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-sources/avro</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.example.kafka.consumer;

import com.example.kafka.avro.Book;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
class BooksReturnedListener {

private final EmailService emailService;

BooksReturnedListener(EmailService emailService) {
this.emailService = emailService;
}

@KafkaListener(topics = "book.returned")
public void sendEmailOnBookReturned(Book book) {
String emailBody = """
Dear User,

The book you borrowed has been successfully returned:
Title: %s, Author: %s, ISBN: %s

""".formatted(book.getTitle(), book.getAuthor(), book.getIsbn());

emailService.sendEmail(emailBody);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.example.kafka.consumer;

import org.springframework.stereotype.Service;

@Service
public class EmailService {

public void sendEmail(String emailBody) {
// Simulate sending an email
System.out.println("Sending email:\n" + emailBody);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.example.kafka.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaAvroConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaAvroConsumerApplication.class, args);
}

}
19 changes: 19 additions & 0 deletions consumer_kafka_avro/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
spring:
application-name: kafka-avro-consumer
kafka:
bootstrap-servers: localhost:9092
# producer settings below are only used by the collaboration tests
# (MessageVerifierSender in AvroBinary/JsonCollaborationTest publishes
# the stub-triggered message onto Kafka)
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
group-id: kafka-avro-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
auto-offset-reset: earliest
properties:
specific.avro.reader: true
properties:
schema.registry.url: mock://test
19 changes: 19 additions & 0 deletions consumer_kafka_avro/src/main/resources/avro/Book.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"type": "record",
"name": "Book",
"namespace": "com.example.kafka.avro",
"fields": [
{
"name": "isbn",
"type": "string"
},
{
"name": "title",
"type": "string"
},
{
"name": "author",
"type": "string"
}
]
}
Loading