Pengenalan KafkaStreams di Java

1. Gambaran keseluruhan

Dalam artikel ini, kita akan melihat perpustakaan KafkaStreams .

KafkaStreams direkayasa oleh pencipta Apache Kafka . Matlamat utama perisian ini adalah untuk membolehkan pengaturcara membuat aplikasi penstriman masa nyata yang cekap yang boleh berfungsi sebagai Mikro.

KafkaStreams membolehkan kita mengambil dari topik Kafka, menganalisis atau mengubah data, dan berpotensi, menghantarnya ke topik Kafka yang lain.

Untuk menunjukkan KafkaStreams, kami akan membuat aplikasi mudah yang membaca ayat dari topik, mengira kejadian perkataan dan mencetak jumlah per kata.

Penting untuk diperhatikan adalah bahawa perpustakaan KafkaStreams tidak reaktif dan tidak mempunyai sokongan untuk operasi async dan pengendalian tekanan balik.

2. Ketergantungan Maven

Untuk mula menulis logik pemprosesan aliran menggunakan KafkaStreams, kita perlu menambahkan kebergantungan kepada kafka-stream dan klien kafka :

 org.apache.kafka kafka-streams 1.0.0   org.apache.kafka kafka-clients 1.0.0  

Kita juga perlu memasang dan memulakan Apache Kafka kerana kita akan menggunakan topik Kafka. Topik ini akan menjadi sumber data untuk pekerjaan streaming kami.

Kita boleh memuat turun Kafka dan pergantungan lain yang diperlukan dari laman web rasmi.

3. Mengkonfigurasi Input KafkaStreams

Perkara pertama yang akan kita lakukan ialah definisi topik Kafka input.

Kita boleh menggunakan alat Confluent yang kita muat turun - ia mengandungi Pelayan Kafka. Ini juga mengandungi pengeluar konsol kafka yang dapat kita gunakan untuk menerbitkan mesej kepada Kafka.

Untuk memulakan mari jalankan kluster Kafka kami:

./confluent start

Setelah Kafka bermula, kita dapat menentukan sumber data dan nama aplikasi kita menggunakan APPLICATION_ID_CONFIG :

String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties(); streamsConfiguration.put( StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");

Parameter konfigurasi penting adalah BOOTSTRAP_SERVER_CONFIG. Ini adalah URL untuk contoh Kafka tempatan kami yang baru sahaja kami mulakan:

private String bootstrapServers = "localhost:9092"; streamsConfiguration.put( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

Seterusnya, kita perlu menyampaikan jenis kunci dan nilai mesej yang akan digunakan dari inputTopic:

streamsConfiguration.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

Pemprosesan aliran sering berlaku. Apabila kita ingin menyimpan hasil perantaraan, kita perlu menentukan parameter STATE_DIR_CONFIG .

Dalam ujian kami, kami menggunakan sistem fail tempatan:

streamsConfiguration.put( StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); 

4. Membina Topologi Streaming

Setelah kami menentukan topik input, kami dapat membuat Topologi Streaming - itulah definisi bagaimana peristiwa harus ditangani dan diubah.

Dalam contoh kami, kami ingin melaksanakan pembilang kata. Untuk setiap ayat yang dihantar ke inputTopic, kami ingin membaginya menjadi kata-kata dan mengira kejadian setiap perkataan.

Kita boleh menggunakan contoh kelas KStreamsBuilder untuk mula membina topologi kami:

KStreamBuilder builder = new KStreamBuilder(); KStream textLines = builder.stream(inputTopic); Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) .groupBy((key, word) -> word) .count();

Untuk melaksanakan pengiraan kata, pertama, kita perlu membahagikan nilai menggunakan ungkapan biasa.

Kaedah split mengembalikan array. Kami menggunakan flatMapValues ​​() untuk meratakannya. Jika tidak, kita akan berakhir dengan senarai tatasusunan, dan tidak selesa untuk menulis kod menggunakan struktur seperti itu.

Akhirnya, kami mengumpulkan nilai untuk setiap perkataan dan memanggil kiraan () yang akan mengira kejadian kata tertentu.

5. Mengendalikan Hasil

Kami sudah mengira jumlah perkataan dari mesej input kami. Sekarang mari kita mencetak hasil pada output standard menggunakan kaedah foreach () :

wordCounts .foreach((w, c) -> System.out.println("word: " + w + " -> " + c));

Pada pengeluaran, kerja streaming seperti itu mungkin menerbitkan output ke topik Kafka yang lain.

Kita boleh melakukannya dengan kaedah to ():

String outputTopic = "outputTopic"; Serde stringSerde = Serdes.String(); Serde longSerde = Serdes.Long(); wordCounts.to(stringSerde, longSerde, outputTopic);

The Serde kelas memberikan kita dipratatarajahkan serializers untuk jenis Java yang akan digunakan untuk objek cerita bersambung kepada pelbagai bait. Susunan bait kemudian akan dihantar ke topik Kafka.

Kami menggunakan String sebagai kunci topik kami dan Panjang sebagai nilai untuk jumlah sebenar. Kaedah to () akan menyimpan data yang dihasilkan ke outputTopic .

6. Memulakan Pekerjaan KafkaStream

Hingga tahap ini, kami membina topologi yang dapat dilaksanakan. Namun, pekerjaan itu belum bermula.

Kita perlu memulakan tugas kita secara eksplisit dengan memanggil kaedah start () pada contoh KafkaStreams :

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); Thread.sleep(30000); streams.close();

Note that we are waiting 30 seconds for the job to finish. In a real-world scenario, that job would be running all the time, processing events from Kafka as they arrive.

We can test our job by publishing some events to our Kafka topic.

Let's start a kafka-console-producer and manually send some events to our inputTopic:

./kafka-console-producer --topic inputTopic --broker-list localhost:9092 >"this is a pony" >"this is a horse and pony" 

This way, we published two events to Kafka. Our application will consume those events and will print the following output:

word: -> 1 word: this -> 1 word: is -> 1 word: a -> 1 word: pony -> 1 word: -> 2 word: this -> 2 word: is -> 2 word: a -> 2 word: horse -> 1 word: and -> 1 word: pony -> 2

We can see that when the first message arrived, the word pony occurred only once. But when we sent the second message, the word pony happened for the second time printing: “word: pony -> 2″.

6. Conclusion

Artikel ini membincangkan cara membuat aplikasi pemprosesan aliran utama menggunakan Apache Kafka sebagai sumber data dan perpustakaan KafkaStreams sebagai perpustakaan pemprosesan aliran.

Semua contoh dan coretan kod ini boleh didapati di projek GitHub - ini adalah projek Maven, jadi mudah diimport dan dijalankan sebagaimana adanya.