Consuming messages from Kafka Topics with Spring Kafka

From Luis Gallego Hurtado - Not Another IT guy
Jump to: navigation, search


Creating a Listener

Once you have defined the properties required for listener and consumition, you can just create a listener by implementing an annotated method.

For example, if key is to be deserialized as String, and value is to be deserialized as RentedVehicle, and you want to do manual acknowledgement, you can hava a listener like following:

@KafkaListneer(topics = {"topic-to-consume"}
public void consumeMessage(final ConsumerRecord<String, RentedVehicle> kakfaMessage, final Acknowledgment acknowledgment) {
}

See below all properties regarding consumition and listener.

Deserializing with JsonDeserializer

When we deserialize with org.springframework.kafka.support.serializer.JsonDeserializer, it is important to have into consideration 2 spring properties:

  • spring.kafka.consumer.properties.spring.json.trusted.packages
  • spring.kafka.consumer.properties.spring.json.value.default.type

Deserializing with ErrorHandlingDeserializer

We use org.springframework.kafka.support.serializer.ErrorHandlingDeserializer when we want to receive in the listener the message even if it could not been deserialized. This deserializer requires to define the property spring.kafka.consumer.properties.spring.deserializer.value.delegate.class with the deserializer to be used when there is not error.

When there is not eror, this deserializer will return the message according with the delegated class. However, when error, this deserializer will return a null object, and it will store the deserializing exception into a Kafka Header. In order to retrieve it, we just need to use SerializationUtils.getExceptionFromHeader.

Here it is very important to remark, that getExceptionFromHeader method requires a LogAccesor, which is a class from Commons Logging, the internal logging library of Spring, so let's see below a full example that shows how to use it on a project with slf4j, where ErrorHandlingDeserializer is used to deserialize the value of Kafka message:

@Component
public class MyConsumer {
  private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer.class);
  private final LogAccessor logAccessor;

  public MyConsumer() {
    this.logAccessor = new LogAccessor(LOGGER.getName());
  }

  @KafkaListneer(topics = {"topic-to-consume"}
  public void consumeMessage(final ConsumerRecord<String, RentedVehicle> kakfaMessage) {
    if (kafkaMessage.value() == null) {
      final DeserializaitonException deserializationException = Serializationutils.getExceptionFromHeader(kafkaMessage, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logAccessor);
      if (deserializationException == null) {
        // unexpected error
      } else {
        // handle deserialization exception
      }
    } else {
      // consume message and value of message
    }
  }
}

Retrying consumition blocking the thread

You can retry the consumition defining the ErrorHandler to use in the listener. In this case, we will define the same error handler for all listeneres, with a fixed back off that will stop after a maximum number of attempts.

We can do so, by returning a DefaultErrorHandler, with a FixedBackOff policy.

When manual acknowledgement is used, it is important to define here if error handler should commit recovered messages.

See example below:

private final Long intervalInMilliseconds = 500L;
private final Long maxAttempts = 3L;

@Bean
public DefaultErrorHandler errorHandler() {
  final BackOff fixedBackOff = new FixedBackOff(intervalInMilliseconds, maxAttempts);
  final DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
    // handle error once max attempts is reached
  }, fixedBackOff);
  errorHandler.setCommitRecovered(true);
}

Important Spring properties

You can see all available spring kafka properties at https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#appendix.application-properties.integration

Common Spring properties

  • spring.kafka.client-id
  • spring.kafka-bootstrap-servers
  • spring.kafka.properties.sasl.mechanism
  • spring.kafka.properties.ssl.endpoint.identification.algorithm
  • spring.kafka.jaas.control-flag
  • spring.kafka.jaas.enabled
  • spring.kafka.jaas.login-module
  • spring.kafka.jaas.options.username
  • spring.kafka.jaas.options.password
  • spring.kafka.security.protocol

Consumer related Spring properties

  • spring.kafka.consumer.auto-offset-reset
  • spring.kafka.consuemr.enable.auto-commit
  • spring.kafka.consumer.max-poll-records
  • spring.kafka.consumer.key-deserializer
  • spring.kafka.consumer.value-deserializer
  • spring.kafka.consumer.properties.spring.deserializer.value.delegate.class
  • spring.kafka.consumer.properties.spring.json.trusted.packages
  • spring.kafka.consumer.properties.spring.json.value.default.type
  • spring.kafka.listener.ack-mode
  • spring.kafka.listener.type
  • spring.kafka.consumer.group-id
  • spring.kafka.listener.concurrency

Note that most of the properties have the appropriate constant in class org.apache.kafka.clients.consumer.ConsumerConfig.