Integration testing with Spring Boot and embedded kafka

Integration testing with Spring Boot and embedded kafka

In several projects, I have encountered difficulties in implementing integration tests for Spring Boot applications using Kafka, and developers are often put off by the effort required to implement tests involving Kafka. This post describes the implementation of a simple integration test using an embedded Kafka broker and the test utility code provided by the spring-kafka-test dependency, based on a simple example application.
The sample application ingests messages from the not-enriched-user-data Kafka topic and then enriches them with data from a database. Finally, the enriched messages are published to the enriched-user-data Kafka topic.

You can find the code of the application here. For the application an integration test consisting of the following steps is implemented.

publish test data to Kafka topic not-enriched-user-data

message is consumed by the Kafka listener
application enriches message with data
application sends enriched message to the topic enriched-user-data

verify that topic enriched-user-data contains message with expected content

Implementation Integration test

Before the test case can be implemented, some code must be written to enable the actual test case implementation.

@SpringBootTest
@EmbeddedKafka(ports = 9092)
class EmbeddedKafkaIntegrationTest {

@Autowired
KafkaTemplate<String, UserData> kafkaTemplate;

@Autowired
ConsumerFactory<String, EnrichedUserData> consumerFactory;

@Autowired
AdditionalUserInformationRepository additionalUserInformationRepository;

@Test
void executeIntegrationTest() {
…..
}

}

With the annotation @SpringBootTest the Spring Boot application context is made available during the test execution. From the application context the KafkaTemplate, ConsumerFactory and AdditionalUserInformationRepository are injected using the @Autowired annotation. The annotation @EmbeddedKafka is used to start an in memory Kafka instance reachable at port 9092.
The following code shows the actual implementation of the test case.

@Test
void executeIntegrationTest() {
//arrange
final String customerNumber = “customerNumber”;
final String userName = “userName”;
final String interestingAdditionalInformation = “interesting additional information”;

AdditionalUserInformation additionalUserInformation = new AdditionalUserInformation();
additionalUserInformation.setAdditionalInformation(interestingAdditionalInformation);
additionalUserInformation.setCustomerNumber(customerNumber);
additionalUserInformationRepository.save(additionalUserInformation);

Consumer<String, EnrichedUserData> testConsumer = consumerFactory.createConsumer(“test”, “test”);
testConsumer.subscribe(List.of(“enriched-user-data”));

//act
kafkaTemplate.send(“not-enriched-user-data”, new UserData(userName, customerNumber));

//assert
ConsumerRecord<String, EnrichedUserData> receivedRecord = KafkaTestUtils.getSingleRecord(testConsumer, “enriched-user-data”);
Assertions.assertAll(“”,
() -> assertEquals(userName, receivedRecord.value().getUserName()),
() -> assertEquals(customerNumber, receivedRecord.value().getCustomerNumber()),
() -> assertEquals(interestingAdditionalInformation, receivedRecord.value().getEnrichedInfo())
);

}

First, an additionalUserInformation object is built and saved in the database via the injected additionalUserInformationRepository. Then the injected consumerFactory object is used to create the Kafka consumer testConsumer which subscribes to the enriched-user-data topic. With the autowired Kafka template object, a message is sent to the not-enriched-user-data topic.
The send message is automatically processed by the Kafka listener of the application. The getSingleRecord method from the class KafkaTestUtils makes the passed consumer testConsumer poll the topic enriched-user-data until it receives one record. The retrieved record is used to validate the correct processing of the message.

Conclusion

The combination of functionality provided by KafkaTestUtils and the embedded Kafka instance allows the implementation of integration tests without a lot of effort caused by involvement of Kafka. A key advantage of using an embedded Kafka instance is that it does not require the pulling of container images. As a result, execution is faster than test implementations using the Testcontainers framework, and the tests do not require changes to the existing CI/CD infrastructure to enable image pulling during test execution.