Pengenalan kepada Apache Kafka dengan Spring

Ketekunan atas

Saya baru sahaja mengumumkan kursus Learn Spring yang baru , yang berfokus pada asas-asas Spring 5 dan Spring Boot 2:

>> SEMAK KURSUS

1. Gambaran keseluruhan

Apache Kafka adalah sistem pemprosesan aliran yang diedarkan dan bertolak ansur.

Dalam artikel ini, kami akan membahas sokongan Spring untuk Kafka dan tahap abstraksi yang diberikannya melalui API klien Java Kafka asli.

Spring Kafka membawakan model pengaturcaraan templat Spring yang sederhana dan khas dengan POJO KafkaTemplate dan Pesan melalui anotasi @KafkaListener .

2. Pemasangan dan Persediaan

Untuk memuat turun dan memasang Kafka, sila rujuk panduan rasmi di sini.

Kami juga perlu menambahkan pergantungan spring-kafka ke pom.xml kami :

 org.springframework.kafka spring-kafka 2.3.7.RELEASE 

Versi terbaru artifak ini boleh didapati di sini.

Aplikasi contoh kami akan menjadi aplikasi Spring Boot.

Artikel ini menganggap bahawa pelayan mula menggunakan konfigurasi lalai dan tidak ada port pelayan yang diubah.

3. Mengkonfigurasi Topik

Sebelum ini kami biasa menjalankan alat baris perintah untuk membuat topik di Kafka seperti:

$ bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic

Tetapi dengan pengenalan AdminClient di Kafka, kita sekarang dapat membuat topik secara program.

Kita perlu menambah kacang KafkaAdmin Spring, yang secara automatik akan menambah topik untuk semua kacang jenis NewTopic:

@Configuration public class KafkaTopicConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin() { Map configs = new HashMap(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("baeldung", 1, (short) 1); } }

4. Menghasilkan Mesej

Untuk membuat mesej, pertama, kita perlu mengkonfigurasi ProducerFactory yang mengatur strategi untuk membuat instance Kafka Producer .

Maka kita memerlukan KafkaTemplate yang membungkus instance Producer dan menyediakan kaedah kemudahan untuk menghantar mesej ke topik Kafka.

Contoh pengeluar adalah selamat untuk thread dan oleh itu menggunakan satu contoh di seluruh konteks aplikasi akan memberikan prestasi yang lebih tinggi. Akibatnya, contoh KakfaTemplate juga selamat di thread dan penggunaan satu contoh disyorkan.

4.1. Konfigurasi Pengeluar

@Configuration public class KafkaProducerConfig { @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()); } }

4.2. Menerbitkan Mesej

Kami boleh menghantar mesej menggunakan kelas KafkaTemplate :

@Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String msg) { kafkaTemplate.send(topicName, msg); }

The send API mengembalikan ListenableFuture objek. Sekiranya kita ingin menyekat utas pengiriman dan mendapatkan hasilnya mengenai pesan yang dikirim, kita dapat memanggil get API dari objek ListenableFuture . Benang akan menunggu hasilnya, tetapi ia akan memperlambat pengeluar.

Kafka adalah platform pemprosesan aliran pantas. Oleh itu, adalah lebih baik untuk menangani hasilnya secara serentak supaya mesej seterusnya tidak menunggu hasil dari mesej sebelumnya. Kami boleh melakukan ini melalui panggilan balik:

public void sendMessage(String message) { ListenableFuture
    
      future = kafkaTemplate.send(topicName, message); future.addCallback(new ListenableFutureCallback
     
      () { @Override public void onSuccess(SendResult result) { System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); } }); }
     
    

5. Mesej Menggunakan

5.1. Konfigurasi Pengguna

Untuk memakan mesej, kita perlu mengkonfigurasi ConsumerFactory dan KafkaListenerContainerFactory . Setelah kacang ini tersedia di kilang kacang Spring, pengguna berasaskan POJO dapat dikonfigurasi menggunakan anotasi @KafkaListener .

Anotasi @EnableKafka diperlukan pada kelas konfigurasi untuk membolehkan pengesanan anotasi @KafkaListener pada kacang yang diuruskan musim bunga:

@EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory consumerFactory() { Map props = new HashMap(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put( ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory(props); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); return factory; } }

5.2. Mesej Memakai

@KafkaListener(topics = "topicName", groupId = "foo") public void listenGroupFoo(String message) { System.out.println("Received Message in group foo: " + message); }

Pelbagai pendengar dapat dilaksanakan untuk topik , masing-masing dengan Id kumpulan yang berbeza. Selanjutnya, satu pengguna dapat mendengar mesej dari pelbagai topik:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring juga menyokong pengambilan satu atau lebih tajuk mesej menggunakan anotasi @Header dalam pendengar:

@KafkaListener(topics = "topicName") public void listenWithHeaders( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }

5.3. Consuming Messages from a Specific Partition

As you may have noticed, we had created the topic baeldung with only one partition. However, for a topic with multiple partitions, a @KafkaListener can explicitly subscribe to a particular partition of a topic with an initial offset:

@KafkaListener( topicPartitions = @TopicPartition(topic = "topicName", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), @PartitionOffset(partition = "3", initialOffset = "0")}), containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToPartition( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }

Since the initialOffset has been sent to 0 in this listener, all the previously consumed messages from partitions 0 and three will be re-consumed every time this listener is initialized. If setting the offset is not required, we can use the partitions property of @TopicPartition annotation to set only the partitions without the offset:

@KafkaListener(topicPartitions = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. Adding Message Filter for Listeners

Listeners can be configured to consume specific types of messages by adding a custom filter. This can be done by setting a RecordFilterStrategy to the KafkaListenerContainerFactory:

@Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setRecordFilterStrategy( record -> record.value().contains("World")); return factory; }

A listener can then be configured to use this container factory:

@KafkaListener( topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory") public void listenWithFilter(String message) { System.out.println("Received Message in filtered listener: " + message); }

In this listener, all the messages matching the filter will be discarded.

6. Custom Message Converters

So far we have only covered sending and receiving Strings as messages. However, we can also send and receive custom Java objects. This requires configuring appropriate serializer in ProducerFactory and deserializer in ConsumerFactory.

Let's look at a simple bean class, which we will send as messages:

public class Greeting { private String msg; private String name; // standard getters, setters and constructor }

6.1. Producing Custom Messages

In this example, we will use JsonSerializer. Let's look at the code for ProducerFactory and KafkaTemplate:

@Bean public ProducerFactory greetingProducerFactory() { // ... configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory(configProps); } @Bean public KafkaTemplate greetingKafkaTemplate() { return new KafkaTemplate(greetingProducerFactory()); }

This new KafkaTemplate can be used to send the Greeting message:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. Consuming Custom Messages

Similarly, let's modify the ConsumerFactory and KafkaListenerContainerFactory to deserialize the Greeting message correctly:

@Bean public ConsumerFactory greetingConsumerFactory() { // ... return new DefaultKafkaConsumerFactory( props, new StringDeserializer(), new JsonDeserializer(Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(greetingConsumerFactory()); return factory; }

The spring-kafka JSON serializer and deserializer uses the Jackson library which is also an optional maven dependency for the spring-kafka project. So let's add it to our pom.xml:

 com.fasterxml.jackson.core jackson-databind 2.9.7 

Daripada menggunakan versi terbaru Jackson, sebaiknya gunakan versi yang ditambahkan ke pom.xml spring-kafka.

Akhirnya, kita perlu menulis pendengar untuk menggunakan mesej Ucapan :

@KafkaListener( topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") public void greetingListener(Greeting greeting) { // process greeting message }

7. Kesimpulannya

Dalam artikel ini, kami membahas asas sokongan Spring untuk Apache Kafka. Kami melihat sekilas kelas yang digunakan untuk menghantar dan menerima mesej.

Kod sumber lengkap untuk artikel ini boleh didapati di GitHub. Sebelum melaksanakan kod, pastikan pelayan Kafka berjalan dan topik dibuat secara manual.

Ketekunan bawah

Saya baru sahaja mengumumkan kursus Learn Spring yang baru , yang berfokus pada asas-asas Spring 5 dan Spring Boot 2:

>> SEMAK KURSUS