Producing messages to Kafka Topics with Spring Kafka

From Luis Gallego Hurtado - Not Another IT guy
Jump to: navigation, search


Producing messages with KafkaTemplate

We can produce messages with clas org.springframework.kafka.core.KafkaTemplate. In that case, we just need to inject the instance of a typed KafkaTemplate, and just invoking the send method.

There are several signatures for send method, some of them receiving only data, some of them receiving also key, and some of them receiving the message instance.

You can even specify the partition and timestamp of the message to be sent.

An instance of org.springframework.messaging.Message class can be built with the org.springframework.messaging.support.MessageBuilder class.

All send are asynchronous and they return a CompletableFuture, that we can block just by invoking the get method.

Additions to Kafka Producer API

There are additions to Kafka Producer API that allow producer to be idempotent and/or transactional. For further information, read javadoc of KafkaProducer class.

Important Spring properties

You can see all available spring kafka properties at https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#appendix.application-properties.integration

Common Spring properties

  • spring.kafka.client-id
  • spring.kafka-bootstrap-servers
  • spring.kafka.properties.sasl.mechanism
  • spring.kafka.properties.ssl.endpoint.identification.algorithm
  • spring.kafka.jaas.control-flag
  • spring.kafka.jaas.enabled
  • spring.kafka.jaas.login-module
  • spring.kafka.jaas.options.username
  • spring.kafka.jaas.options.password
  • spring.kafka.security.protocol

Producer related Spring properties

  • spring.kafka.producer.acks
  • spring.kafka.producer.compression-type
  • spring.kafka.producer.key-serializer
  • spring.kafka.producer.value-serializer
  • spring.kafka.producer.properties.spring.json.add.type.headers
  • spring.kafka.producer.retries

Note that most of the properties have the appropriate constant in class org.apache.kafka.clients.consumer.ProducerConfig.