Tepat Sekali Memproses di Kafka dengan Java

1. Gambaran keseluruhan

Dalam tutorial ini, kita akan melihat bagaimana Kafka memastikan penghantaran tepat sekali antara aplikasi pengeluar dan pengguna melalui Transactional API yang baru diperkenalkan.

Selain itu, kami akan menggunakan API ini untuk melaksanakan pengeluar dan pengguna urus niaga untuk mencapai penghantaran tepat sekali ke hujung dalam contoh WordCount.

2. Penghantaran Mesej di Kafka

Kerana pelbagai kegagalan, sistem pesanan tidak dapat menjamin penyampaian mesej antara aplikasi pengeluar dan pengguna. Bergantung pada bagaimana aplikasi klien berinteraksi dengan sistem sedemikian, semantik mesej berikut adalah mungkin:

  • Sekiranya sistem pemesejan tidak akan menduplikasi mesej tetapi mungkin terlepas mesej sesekali, kami memanggilnya paling banyak sekali
  • Atau, jika tidak akan terlepas mesej tetapi mungkin menduplikasi mesej sekali-sekala, kami memanggilnya sekurang-kurangnya sekali
  • Tetapi, jika selalu menyampaikan semua mesej tanpa pendua, itu betul-betul sekali

Pada mulanya, Kafka hanya menyokong penghantaran mesej paling banyak sekali dan sekurang-kurangnya sekali.

Walau bagaimanapun, pengenalan Transaksi antara broker Kafka dan aplikasi pelanggan memastikan penghantaran tepat sekali di Kafka . Untuk memahaminya dengan lebih baik, mari kita tinjau API klien transaksi dengan cepat.

3. Pergantungan Maven

Untuk bekerja dengan API transaksi, kami memerlukan klien Java Kafka di pom kami:

 org.apache.kafka kafka-clients 2.0.0 

4. Gelung hasil penggunaan-transformasi Transaksional

Sebagai contoh, kita akan menggunakan mesej dari topik input, ayat .

Kemudian untuk setiap ayat, kami akan menghitung setiap perkataan dan menghantar jumlah perkataan secara individu ke topik output, jumlahnya .

Dalam contohnya, kita akan menganggap bahawa sudah ada data transaksi dalam topik ayat .

4.1. Pengeluar Perhatian Transaksi

Oleh itu mari kita tambahkan pengeluar khas Kafka.

Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:9092");

Selain itu, kita perlu menentukan transactional.id dan mengaktifkan idempotensi :

producerProps.put("enable.idempotence", "true"); producerProps.put("transactional.id", "prod-1"); KafkaProducer producer = new KafkaProducer(producerProps);

Oleh kerana kami telah mengaktifkan idempotensi, Kafka akan menggunakan id transaksi ini sebagai sebahagian daripada algoritma untuk menduplikasi setiap mesej yang dihantar oleh pengeluar ini , dan memastikan idempotensi.

Ringkasnya, jika pengeluar secara tidak sengaja mengirim mesej yang sama kepada Kafka lebih dari satu kali, tetapan ini memungkinkan untuk diperhatikan.

Yang perlu kita lakukan adalah memastikan id transaksi berbeza untuk setiap pengeluar , walaupun konsisten semasa memulakan semula.

4.2. Membolehkan Pengeluar untuk Transaksi

Setelah kami bersedia, maka kami juga perlu menghubungi initTransaction untuk mempersiapkan pengeluar menggunakan transaksi:

producer.initTransactions();

Ini mendaftarkan pengeluar dengan broker sebagai salah satu yang dapat menggunakan transaksi, mengenal pasti dengan transactional.id dan nombor urutan, atau zaman . Pada gilirannya, broker akan menggunakan ini untuk menulis tindakan apa pun ke log transaksi.

Dan akibatnya, broker akan membuang sebarang tindakan dari log yang menjadi milik pengeluar dengan id transaksi yang sama dan zaman sebelumnya , dengan anggapan ia berasal dari transaksi yang tidak berfungsi.

4.3. Pengguna Transaksi-Sedar

Apabila kita memakannya, kita dapat membaca semua mesej pada partisi topik secara berurutan. Meskipun demikian, kita dapat menunjukkan dengan terasing. Tingkatkan bahawa kita harus menunggu untuk membaca pesan transaksi sehingga transaksi yang terkait telah dilakukan :

Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:9092"); consumerProps.put("group.id", "my-group-id"); consumerProps.put("enable.auto.commit", "false"); consumerProps.put("isolation.level", "read_committed"); KafkaConsumer consumer = new KafkaConsumer(consumerProps); consumer.subscribe(singleton(“sentences”));

Menggunakan nilai read_committed memastikan bahawa kita tidak membaca pesan transaksi sebelum transaksi selesai.

Nilai lalai isolation.level adalah read_uncommitted.

4.4. Mengkonsumsi dan Transformasi dengan Transaksi

Sekarang kita mempunyai pengeluar dan pengguna yang dikonfigurasi untuk menulis dan membaca secara urus niaga, kita dapat menggunakan catatan dari topik input kita dan mengira setiap kata dalam setiap catatan:

ConsumerRecords records = consumer.poll(ofSeconds(60)); Map wordCountMap = records.records(new TopicPartition("input", 0)) .stream() .flatMap(record -> Stream.of(record.value().split(" "))) .map(word -> Tuple.of(word, 1)) .collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));

Note, that there is nothing transactional about the above code. But, since we used read_committed, it means that no messages that were written to the input topic in the same transaction will be read by this consumer until they are all written.

Now, we can send the calculated word count to the output topic.

Let's see how we can produce our results, also transactionally.

4.5. Send API

To send our counts as new messages, but in the same transaction, we call beginTransaction:

producer.beginTransaction();

Then, we can write each one to our “counts” topic with the key being the word and the count being the value:

wordCountMap.forEach((key,value) -> producer.send(new ProducerRecord("counts",key,value.toString())));

Note that because the producer can partition the data by the key, this means that transactional messages can span multiple partitions, each being read by separate consumers. Therefore, Kafka broker will store a list of all updated partitions for a transaction.

Note also that, within a transaction, a producer can use multiple threads to send records in parallel.

4.6. Committing Offsets

And finally, we need to commit our offsets that we just finished consuming. With transactions, we commit the offsets back to the input topic we read them from, like normal. Also though, we send them to the producer's transaction.

We can do all of this in a single call, but we first need to calculate the offsets for each topic partition:

Map offsetsToCommit = new HashMap(); for (TopicPartition partition : records.partitions()) { List
    
      partitionedRecords = records.records(partition); long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); }
    

Note that what we commit to the transaction is the upcoming offset, meaning we need to add 1.

Then we can send our calculated offsets to the transaction:

producer.sendOffsetsToTransaction(offsetsToCommit, "my-group-id");

4.7. Committing or Aborting the Transaction

And, finally, we can commit the transaction, which will atomically write the offsets to the consumer_offsets topic as well as to the transaction itself:

producer.commitTransaction();

This flushes any buffered message to the respective partitions. In addition, the Kafka broker makes all messages in that transaction available to the consumers.

Of course, if anything goes wrong while we are processing, for example, if we catch an exception, we can call abortTransaction:

try { // ... read from input topic // ... transform // ... write to output topic producer.commitTransaction(); } catch ( Exception e ) { producer.abortTransaction(); }

And drop any buffered messages and remove the transaction from the broker.

If we neither commit nor abort before the broker-configured max.transaction.timeout.ms, the Kafka broker will abort the transaction itself. The default value for this property is 900,000 milliseconds or 15 minutes.

5. Other consume-transform-produce Loops

What we've just seen is a basic consume-transform-produce loop which reads and writes to the same Kafka cluster.

Conversely, applications that must read and write to different Kafka clusters must use the older commitSync and commitAsync API. Typically, applications will store consumer offsets into their external state storage to maintain transactionality.

6. Conclusion

Untuk aplikasi yang kritikal data, pemprosesan tepat-ke-akhir sekali-sekali sering diperlukan.

Dalam tutorial ini, kami melihat bagaimana kami menggunakan Kafka untuk melakukan ini dengan tepat, menggunakan transaksi , dan kami menerapkan contoh penghitungan kata berdasarkan transaksi untuk menggambarkan prinsipnya.

Jangan ragu untuk melihat semua contoh kod di GitHub.