How can i stop my kafka consumer from consuming messages ?

RMAG news

I am using below functions but my kafkaListener is keeps on consuming messages even if my consumer is in paused state.

import org.apache.kafka.clients.consumer.Consumer;

private Consumer<String, String> kafkaConsumer;

public void pauseKafkaConsumer() {
if (!kafkaConsumer.paused().isEmpty()) {
return;
}
// Pause all assigned partitions
Collection<TopicPartition> assignedPartitions =
kafkaConsumer.assignment();
kafkaConsumer.poll(0);
kafkaConsumer.pause(assignedPartitions);
}

// Resume the Kafka consumer
public void resumeKafkaConsumer() {
// Resume all paused partitions
Collection<TopicPartition> pausedPartitions =
kafkaConsumer.paused();
kafkaConsumer.resume(pausedPartitions);
}

@KafkaListener(topics = “#{‘${spring.kafka.consumer.topic}’}”, groupId = “#{‘${spring.kafka.consumer.groupId}’}”, containerFactory = “kafkaListenerContainerFactory”)
public void consume(String stream, Consumer<?, ?> consumer, Acknowledgment acknowledgment) {
this.kafkaConsumer = (Consumer<String, String>) consumer;

if (getEventCount() > 10) {
pauseKafkaConsumer();
return;
}
}

Please follow and like us:
Pin Share