相关文章推荐
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka-test</artifactId>
  <scope>test</scope>
</dependency>

Enabling Transactional Behavior in the Producer

To ensure that we deliver each exactly once, we must configure some properties in our Spring Boot application.properties or application.yaml file:

spring.kafka.producer.enable-idempotence: true
spring.kafka.producer.transactional-id: prod-A
spring.kafka.producer.transaction-id-prefix: transaction
kafkaTemplate.executeInTransaction(kt -> {
  ListenableFuture<SendResult<String, String>> result = kt.send(event);
  result.addCallback(
      successResult -> log.debug("Message {} sent ok with result: {}", getEventAsString(event), successResult),
      exception -> {
        log.error("Non retriable exception found in send operation: {}", exception.getClass().getName());
  return result;

By using the above code snippet, the broker recognizes that we are sending each message within a transaction identified by its transactional-id and a sequence number or timestamp.

Enabling Transactional Behavior in the Consumer

The consumer consumes messages from a broker in the order they arrive. However, to ensure transactional behavior, we need to inform the broker to wait for transactional reads. We can achieve this in the application.properties or application.yaml file as follows:

spring.kafka.consumer.properties.isolation.level: read_committed
		

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

 
推荐文章