Membina Paip Data dengan Flink dan Kafka

1. Gambaran keseluruhan

Apache Flink adalah kerangka pemprosesan aliran yang dapat digunakan dengan mudah dengan Java. Apache Kafka adalah sistem pemprosesan aliran diedarkan yang menyokong toleransi kesalahan tinggi.

Dalam tutorial ini, kita akan melihat bagaimana membina saluran data menggunakan dua teknologi tersebut.

2. Pemasangan

Untuk memasang dan mengkonfigurasi Apache Kafka, sila rujuk panduan rasmi. Setelah memasang, kami dapat menggunakan perintah berikut untuk membuat topik baru yang disebut flink_input dan flink_output:

 bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic flink_output bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic flink_input

Demi tutorial ini, kami akan menggunakan konfigurasi lalai dan port lalai untuk Apache Kafka.

3. Penggunaan Flink

Apache Flink membolehkan teknologi pemprosesan aliran masa nyata. Rangka kerja ini membolehkan penggunaan beberapa sistem pihak ketiga sebagai sumber aliran atau sink .

Di Flink - terdapat pelbagai penyambung yang tersedia:

  • Apache Kafka (sumber / sink)
  • Apache Cassandra (sink)
  • Aliran Amazon Kinesis (sumber / sink)
  • Pencarian elastik (sink)
  • Sistem Fail Hadoop (sink)
  • RabbitMQ (sumber / sink)
  • Apache NiFi (sumber / sink)
  • API Streaming Twitter (sumber)

Untuk menambahkan Flink ke projek kami, kami perlu memasukkan kebergantungan Maven berikut:

 org.apache.flink flink-core 1.5.0   org.apache.flink flink-connector-kafka-0.11_2.11 1.5.0 

Menambah kebergantungan tersebut akan membolehkan kita mengambil dan menghasilkan ke dan dari topik Kafka. Anda boleh mendapatkan versi Flink terkini di Maven Central.

4. Pengguna Kafka String

Untuk menggunakan data dari Kafka dengan Flink, kami perlu memberikan topik dan alamat Kafka. Kita juga harus memberikan id kumpulan yang akan digunakan untuk menahan offset sehingga kita tidak akan selalu membaca keseluruhan data dari awal.

Mari buat kaedah statik yang akan menjadikan penciptaan FlinkKafkaConsumer lebih mudah:

public static FlinkKafkaConsumer011 createStringConsumerForTopic( String topic, String kafkaAddress, String kafkaGroup ) { Properties props = new Properties(); props.setProperty("bootstrap.servers", kafkaAddress); props.setProperty("group.id",kafkaGroup); FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011( topic, new SimpleStringSchema(), props); return consumer; }

Kaedah ini mengambil topik, kafkaAddress, dan kafkaGroup dan membuat FlinkKafkaConsumer yang akan menggunakan data dari topik yang diberikan sebagai String kerana kami telah menggunakan SimpleStringSchema untuk menyahkod data.

Nombor 011 pada nama kelas merujuk kepada versi Kafka.

5. Penerbit String Kafka

Untuk menghasilkan data ke Kafka, kita perlu memberikan alamat dan topik Kafka yang ingin kita gunakan. Sekali lagi, kita dapat membuat kaedah statik yang akan membantu kita membuat pengeluar untuk topik yang berbeza:

public static FlinkKafkaProducer011 createStringProducer( String topic, String kafkaAddress){ return new FlinkKafkaProducer011(kafkaAddress, topic, new SimpleStringSchema()); }

Kaedah ini hanya menggunakan topik dan kafkaAddress sebagai hujah kerana tidak perlu memberikan id kumpulan semasa kita membuat topik Kafka.

6. Pemprosesan Aliran Rentetan

Apabila kita mempunyai pengguna dan pengeluar yang berfungsi sepenuhnya, kita dapat mencuba memproses data dari Kafka dan kemudian menyimpan hasilnya kembali ke Kafka. Senarai lengkap fungsi yang boleh digunakan untuk pemprosesan aliran boleh didapati di sini.

Dalam contoh ini, kita akan menggunakan huruf besar dalam setiap entri Kafka dan kemudian menulisnya kembali ke Kafka.

Untuk tujuan ini, kita perlu membuat MapFunction tersuai :

public class WordsCapitalizer implements MapFunction { @Override public String map(String s) { return s.toUpperCase(); } }

Setelah membuat fungsi, kita dapat menggunakannya dalam pemprosesan aliran:

public static void capitalize() { String inputTopic = "flink_input"; String outputTopic = "flink_output"; String consumerGroup = "baeldung"; String address = "localhost:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment .getExecutionEnvironment(); FlinkKafkaConsumer011 flinkKafkaConsumer = createStringConsumerForTopic( inputTopic, address, consumerGroup); DataStream stringInputStream = environment .addSource(flinkKafkaConsumer); FlinkKafkaProducer011 flinkKafkaProducer = createStringProducer( outputTopic, address); stringInputStream .map(new WordsCapitalizer()) .addSink(flinkKafkaProducer); }

Aplikasi akan membaca data dari topik flink_input , melakukan operasi di aliran dan kemudian menyimpan hasilnya ke topik flink_output di Kafka.

Kami telah melihat bagaimana menangani Strings menggunakan Flink dan Kafka. Tetapi selalunya diperlukan untuk melakukan operasi pada objek tersuai. Kita akan melihat bagaimana melakukan ini dalam bab-bab seterusnya.

7. Deserialisasi Objek Tersuai

Kelas berikut menunjukkan mesej ringkas dengan maklumat mengenai pengirim dan penerima:

@JsonSerialize public class InputMessage { String sender; String recipient; LocalDateTime sentAt; String message; }

Sebelumnya, kami menggunakan SimpleStringSchema untuk mendeserialisasikan mesej dari Kafka, tetapi sekarang kami ingin mendeseralisasikan data secara langsung ke objek khusus .

Untuk melakukan ini, kita memerlukan DeserializationSchema tersuai :

public class InputMessageDeserializationSchema implements DeserializationSchema { static ObjectMapper objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()); @Override public InputMessage deserialize(byte[] bytes) throws IOException { return objectMapper.readValue(bytes, InputMessage.class); } @Override public boolean isEndOfStream(InputMessage inputMessage) { return false; } @Override public TypeInformation getProducedType() { return TypeInformation.of(InputMessage.class); } }

Kami mengandaikan di sini bahawa mesej tersebut dianggap sebagai JSON di Kafka.

Oleh kerana kita mempunyai bidang jenis LocalDateTime , kita perlu menentukan JavaTimeModule, yang mengurus pemetaan objek LocalDateTime ke JSON.

Skema Flink tidak boleh mempunyai medan yang tidak dapat disenaraikan kerana semua pengendali (seperti skema atau fungsi) bersiri pada permulaan tugas.

Terdapat masalah serupa di Apache Spark. Salah satu penyelesaian yang diketahui untuk masalah ini adalah menginisialisasi bidang sebagai statik , seperti yang kita lakukan dengan ObjectMapper di atas. Ini bukan penyelesaian tercantik, tetapi ia agak mudah dan berjaya.

Kaedah isEndOfStream boleh digunakan untuk kes khas apabila aliran perlu diproses hanya sehingga beberapa data tertentu diterima. Tetapi itu tidak diperlukan dalam kes kita.

8. Serialisasi Objek Tersuai

Sekarang, anggaplah bahawa kita mahu sistem kita mempunyai kemungkinan untuk membuat cadangan mesej. Kami mahu prosesnya automatik, dan setiap sandaran harus terdiri daripada mesej yang dihantar selama satu hari penuh.

Juga, mesej sandaran harus mempunyai id unik yang diberikan.

Untuk tujuan ini, kita boleh membuat kelas berikut:

public class Backup { @JsonProperty("inputMessages") List inputMessages; @JsonProperty("backupTimestamp") LocalDateTime backupTimestamp; @JsonProperty("uuid") UUID uuid; public Backup(List inputMessages, LocalDateTime backupTimestamp) { this.inputMessages = inputMessages; this.backupTimestamp = backupTimestamp; this.uuid = UUID.randomUUID(); } }

Harap diingat bahawa mekanisme penjanaan UUID tidak sempurna, kerana ia membenarkan pendua. Walau bagaimanapun, ini cukup untuk ruang lingkup contoh ini.

Kami ingin menyimpan objek Sandaran kami sebagai JSON ke Kafka, jadi kami perlu membuat Skema Serialisasi kami :

public class BackupSerializationSchema implements SerializationSchema { ObjectMapper objectMapper; Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class); @Override public byte[] serialize(Backup backupMessage) { if(objectMapper == null) { objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()); } try { return objectMapper.writeValueAsString(backupMessage).getBytes(); } catch (com.fasterxml.jackson.core.JsonProcessingException e) { logger.error("Failed to parse JSON", e); } return new byte[0]; } }

9. Timestamping Messages

Since we want to create a backup for all messages of each day, messages need a timestamp.

Flink provides the three different time characteristics EventTime, ProcessingTime, and IngestionTime.

In our case, we need to use the time at which the message has been sent, so we'll use EventTime.

To use EventTimewe need a TimestampAssigner which will extract timestamps from our input data:

public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWatermarks { @Override public long extractTimestamp(InputMessage element, long previousElementTimestamp) { ZoneId zoneId = ZoneId.systemDefault(); return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000; } @Nullable @Override public Watermark checkAndGetNextWatermark(InputMessage lastElement, long extractedTimestamp) { return new Watermark(extractedTimestamp - 1500); } }

We need to transform our LocalDateTime to EpochSecond as this is the format expected by Flink. After assigning timestamps, all time-based operations will use time from sentAt field to operate.

Since Flink expects timestamps to be in milliseconds and toEpochSecond() returns time in seconds we needed to multiply it by 1000, so Flink will create windows correctly.

Flink defines the concept of a Watermark. Watermarks are useful in case of data that don't arrive in the order they were sent. A watermark defines the maximum lateness that is allowed for elements to be processed.

Elements that have timestamps lower than the watermark won't be processed at all.

10. Creating Time Windows

To assure that our backup gathers only messages sent during one day, we can use the timeWindowAll method on the stream, which will split messages into windows.

Walau bagaimanapun, kami masih perlu mengumpulkan mesej dari setiap tetingkap dan mengembalikannya sebagai Sandaran .

Untuk melakukan ini, kita memerlukan AggregateFunction tersuai :

public class BackupAggregator implements AggregateFunction
    
      { @Override public List createAccumulator() { return new ArrayList(); } @Override public List add( InputMessage inputMessage, List inputMessages) { inputMessages.add(inputMessage); return inputMessages; } @Override public Backup getResult(List inputMessages) { return new Backup(inputMessages, LocalDateTime.now()); } @Override public List merge(List inputMessages, List acc1) { inputMessages.addAll(acc1); return inputMessages; } }
    

11. Menyusun Sandaran

Setelah menetapkan cap waktu yang tepat dan melaksanakan AggregateFunction kami, akhirnya kami dapat mengambil input Kafka kami dan memprosesnya:

public static void createBackup () throws Exception { String inputTopic = "flink_input"; String outputTopic = "flink_output"; String consumerGroup = "baeldung"; String kafkaAddress = "192.168.99.100:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); FlinkKafkaConsumer011 flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup); flinkKafkaConsumer.setStartFromEarliest(); flinkKafkaConsumer.assignTimestampsAndWatermarks( new InputMessageTimestampAssigner()); FlinkKafkaProducer011 flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress); DataStream inputMessagesStream = environment.addSource(flinkKafkaConsumer); inputMessagesStream .timeWindowAll(Time.hours(24)) .aggregate(new BackupAggregator()) .addSink(flinkKafkaProducer); environment.execute(); }

12. Kesimpulannya

Dalam artikel ini, kami telah memaparkan cara membuat saluran data sederhana dengan Apache Flink dan Apache Kafka.

Seperti biasa, kodnya boleh didapati di Github.