Membina Paip Data dengan Kafka, Spark Streaming dan Cassandra

1. Gambaran keseluruhan

Apache Kafka adalah platform latensi rendah berskala tinggi, berprestasi tinggi yang membolehkan aliran data membaca dan menulis seperti sistem pesanan . Kita boleh memulakan dengan Kafka di Jawa dengan mudah.

Streaming Spark adalah sebahagian daripada platform Apache Spark yang membolehkan pemprosesan aliran data yang berskala tinggi, throughput tinggi, toleransi kesalahan . Walaupun ditulis dalam Scala, Spark menawarkan API Java untuk digunakan.

Apache Cassandra adalah stor data NoSQL yang diedarkan dan luas . Maklumat lebih lanjut mengenai Cassandra terdapat dalam artikel kami sebelumnya.

Dalam tutorial ini, kami akan menggabungkannya untuk membuat saluran data yang sangat berskala dan bertoleransi kesalahan untuk aliran data masa nyata .

2. Pemasangan

Untuk memulakan, kami memerlukan Kafka, Spark dan Cassandra dipasang secara tempatan di mesin kami untuk menjalankan aplikasi. Kami akan melihat bagaimana mengembangkan saluran data menggunakan platform ini semasa kami mengikuti.

Walau bagaimanapun, kami akan meninggalkan semua konfigurasi lalai termasuk port untuk semua pemasangan yang akan membantu menjadikan tutorial berjalan dengan lancar.

2.1. Kafka

Memasang Kafka pada mesin tempatan kami cukup mudah dan boleh didapati sebagai sebahagian daripada dokumentasi rasmi. Kami akan menggunakan keluaran 2.1.0 Kafka.

Sebagai tambahan, Kafka memerlukan Apache Zookeeper untuk berjalan tetapi untuk tujuan tutorial ini, kami akan memanfaatkan contoh Zookeeper nod tunggal yang dibungkus dengan Kafka.

Setelah kami berjaya memulakan Zookeeper dan Kafka secara tempatan mengikuti panduan rasmi, kami dapat terus membuat topik kami, yang diberi nama "mesej":

 $KAFKA_HOME$\bin\windows\kafka-topics.bat --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic messages

Perhatikan bahawa skrip di atas adalah untuk platform Windows, tetapi ada skrip serupa yang tersedia untuk platform seperti Unix juga.

2.2. Percikan api

Spark menggunakan perpustakaan pelanggan Hadoop untuk HDFS dan YARN. Oleh itu, sangat sukar untuk menyusun semua versi yang serasi . Walau bagaimanapun, muat turun rasmi Spark dilengkapi dengan versi Hadoop yang popular. Untuk tutorial ini, kami akan menggunakan pakej versi 2.3.0 "pra-dibangun untuk Apache Hadoop 2.7 dan yang lebih baru".

Setelah paket Spark yang betul dibongkar, skrip yang tersedia dapat digunakan untuk mengirimkan aplikasi. Kami akan melihatnya kemudian apabila kami mengembangkan aplikasi kami di Spring Boot.

2.3. Cassandra

DataStax menyediakan edisi komuniti Cassandra untuk platform yang berbeza termasuk Windows. Kami boleh memuat turun dan memasangnya di mesin tempatan kami dengan mudah mengikuti dokumentasi rasmi. Kami akan menggunakan versi 3.9.0.

Setelah berjaya memasang dan memulakan Cassandra pada mesin tempatan kami, kami boleh terus membuat ruang kunci dan jadual kami. Ini dapat dilakukan dengan menggunakan CQL Shell yang disertakan bersama pemasangan kami:

CREATE KEYSPACE vocabulary WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; USE vocabulary; CREATE TABLE words (word text PRIMARY KEY, count int);

Perhatikan bahawa kami telah membuat ruang nama yang disebut perbendaharaan kata dan jadual di dalamnya disebut perkataan dengan dua lajur, kata , dan kiraan .

3. Kebergantungan

Kami dapat mengintegrasikan kebergantungan Kafka dan Spark ke dalam aplikasi kami melalui Maven. Kami akan menarik pergantungan ini dari Maven Central:

  • Teras Spark
  • SQL Spark
  • Streaming Spark
  • Streaming Kafka Spark
  • Cassandra Spark
  • Cassandra Java Spark

Dan kami boleh menambahkannya ke pom kami dengan sewajarnya:

 org.apache.spark spark-core_2.11 2.3.0 provided   org.apache.spark spark-sql_2.11 2.3.0 provided   org.apache.spark spark-streaming_2.11 2.3.0 provided   org.apache.spark spark-streaming-kafka-0-10_2.11 2.3.0   com.datastax.spark spark-cassandra-connector_2.11 2.3.0   com.datastax.spark spark-cassandra-connector-java_2.11 1.5.2 

Perhatikan bahawa beberapa kebergantungan ini ditandai sebagai disediakan dalam skop. Ini kerana ini akan disediakan oleh pemasangan Spark di mana kami akan mengemukakan aplikasi untuk pelaksanaan menggunakan spark-submit.

4. Spark Streaming - Strategi Integrasi Kafka

Pada ketika ini, ada baiknya kita berbicara sebentar mengenai strategi integrasi untuk Spark dan Kafka.

Kafka memperkenalkan API pengguna baru antara versi 0.8 dan 0.10. Oleh itu, pakej Spark Streaming yang sesuai tersedia untuk kedua-dua versi broker. Penting untuk memilih pakej yang tepat bergantung pada broker yang ada dan ciri yang diinginkan.

4.1. Streaming Spark Kafka 0.8

Versi 0.8 adalah API integrasi yang stabil dengan pilihan menggunakan Penerima atau Pendekatan Langsung . Kami tidak akan membahas perincian pendekatan ini yang dapat kami temukan dalam dokumentasi rasmi. Perkara penting yang perlu diperhatikan di sini ialah pakej ini serasi dengan Kafka Broker versi 0.8.2.1 atau lebih tinggi.

4.2. Spark Streaming Kafka 0.10

Ini kini dalam keadaan eksperimen dan serasi dengan Kafka Broker versi 0.10.0 atau lebih tinggi sahaja. Pakej ini hanya menawarkan Pendekatan Langsung, sekarang menggunakan API pengguna Kafka yang baru . Kami boleh mendapatkan lebih banyak maklumat mengenai perkara ini dalam dokumentasi rasmi. Yang penting, ia tidak serasi dengan versi Kafka Broker yang lebih lama .

Harap maklum bahawa untuk tutorial ini, kami akan menggunakan pakej 0.10. Ketergantungan yang disebutkan di bahagian sebelumnya merujuk kepada ini sahaja.

5. Membangunkan Saluran Data

We'll create a simple application in Java using Spark which will integrate with the Kafka topic we created earlier. The application will read the messages as posted and count the frequency of words in every message. This will then be updated in the Cassandra table we created earlier.

Let's quickly visualize how the data will flow:

5.1. Getting JavaStreamingContext

Firstly, we'll begin by initializing the JavaStreamingContext which is the entry point for all Spark Streaming applications:

SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("WordCountingApp"); sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = new JavaStreamingContext( sparkConf, Durations.seconds(1));

5.2. Getting DStream from Kafka

Now, we can connect to the Kafka topic from the JavaStreamingContext:

Map kafkaParams = new HashMap(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection topics = Arrays.asList("messages"); JavaInputDStream
    
      messages = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams));
    

Please note that we've to provide deserializers for key and value here. For common data types like String, the deserializer is available by default. However, if we wish to retrieve custom data types, we'll have to provide custom deserializers.

Here, we've obtained JavaInputDStream which is an implementation of Discretized Streams or DStreams, the basic abstraction provided by Spark Streaming. Internally DStreams is nothing but a continuous series of RDDs.

5.3. Processing Obtained DStream

We'll now perform a series of operations on the JavaInputDStream to obtain word frequencies in the messages:

JavaPairDStream results = messages .mapToPair( record -> new Tuple2(record.key(), record.value()) ); JavaDStream lines = results .map( tuple2 -> tuple2._2() ); JavaDStream words = lines .flatMap( x -> Arrays.asList(x.split("\\s+")).iterator() ); JavaPairDStream wordCounts = words .mapToPair( s -> new Tuple2(s, 1) ).reduceByKey( (i1, i2) -> i1 + i2 );

5.4. Persisting Processed DStream into Cassandra

Finally, we can iterate over the processed JavaPairDStream to insert them into our Cassandra table:

wordCounts.foreachRDD( javaRdd -> { Map wordCountMap = javaRdd.collectAsMap(); for (String key : wordCountMap.keySet()) { List wordList = Arrays.asList(new Word(key, wordCountMap.get(key))); JavaRDD rdd = streamingContext.sparkContext().parallelize(wordList); javaFunctions(rdd).writerBuilder( "vocabulary", "words", mapToRow(Word.class)).saveToCassandra(); } } );

5.5. Running the Application

As this is a stream processing application, we would want to keep this running:

streamingContext.start(); streamingContext.awaitTermination();

6. Leveraging Checkpoints

In a stream processing application, it's often useful to retain state between batches of data being processed.

For example, in our previous attempt, we are only able to store the current frequency of the words. What if we want to store the cumulative frequency instead? Spark Streaming makes it possible through a concept called checkpoints.

We'll now modify the pipeline we created earlier to leverage checkpoints:

Please note that we'll be using checkpoints only for the session of data processing. This does not provide fault-tolerance. However, checkpointing can be used for fault tolerance as well.

There are a few changes we'll have to make in our application to leverage checkpoints. This includes providing the JavaStreamingContext with a checkpoint location:

streamingContext.checkpoint("./.checkpoint");

Here, we are using the local filesystem to store checkpoints. However, for robustness, this should be stored in a location like HDFS, S3 or Kafka. More on this is available in the official documentation.

Next, we'll have to fetch the checkpoint and create a cumulative count of words while processing every partition using a mapping function:

JavaMapWithStateDStream
    
      cumulativeWordCounts = wordCounts .mapWithState( StateSpec.function( (word, one, state) -> { int sum = one.orElse(0) + (state.exists() ? state.get() : 0); Tuple2 output = new Tuple2(word, sum); state.update(sum); return output; } ) );
    

Once we get the cumulative word counts, we can proceed to iterate and save them in Cassandra as before.

Please note that while data checkpointing is useful for stateful processing, it comes with a latency cost. Hence, it's necessary to use this wisely along with an optimal checkpointing interval.

7. Understanding Offsets

If we recall some of the Kafka parameters we set earlier:

kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false);

These basically mean that we don't want to auto-commit for the offset and would like to pick the latest offset every time a consumer group is initialized. Consequently, our application will only be able to consume messages posted during the period it is running.

If we want to consume all messages posted irrespective of whether the application was running or not and also want to keep track of the messages already posted, we'll have to configure the offset appropriately along with saving the offset state, though this is a bit out of scope for this tutorial.

This is also a way in which Spark Streaming offers a particular level of guarantee like “exactly once”. This basically means that each message posted on Kafka topic will only be processed exactly once by Spark Streaming.

8. Deploying Application

We can deploy our application using the Spark-submit script which comes pre-packed with the Spark installation:

$SPARK_HOME$\bin\spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local[2] \target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Harap maklum bahawa balang yang kami buat menggunakan Maven harus berisi kebergantungan yang tidak ditandai seperti yang disediakan dalam skop.

Sebaik sahaja kami mengemukakan permohonan ini dan menghantar beberapa mesej dalam topik Kafka yang kami buat sebelumnya, kami akan melihat jumlah kata kumulatif diposkan dalam jadual Cassandra yang kami buat sebelumnya.

9. Kesimpulannya

Kesimpulannya, dalam tutorial ini, kami belajar bagaimana membuat saluran data sederhana menggunakan Kafka, Spark Streaming dan Cassandra. Kami juga belajar bagaimana memanfaatkan pusat pemeriksaan di Spark Streaming untuk menjaga keadaan antara kumpulan.

Seperti biasa, kod untuk contoh boleh didapati di GitHub.