Pengenalan Apache Flink dengan Java

1. Gambaran keseluruhan

Apache Flink adalah kerangka pemprosesan Data Besar yang membolehkan pengaturcara memproses sejumlah besar data dengan cara yang sangat efisien dan berskala.

Dalam artikel ini, kami akan memperkenalkan beberapa konsep inti API dan transformasi data standard yang tersedia di Apache Flink Java API . Gaya lancar API ini menjadikannya senang digunakan dengan konstruk pusat Flink - koleksi yang diedarkan.

Pertama, kita akan melihat transformasi Flink's DataSet API dan menggunakannya untuk melaksanakan program penghitungan kata. Kemudian kita akan melihat ringkas Flink's DataStream API, yang membolehkan anda memproses aliran acara secara real-time.

2. Ketergantungan Maven

Untuk memulakan, kita perlu menambahkan pergantungan Maven ke perpustakaan flink-java dan flink-test-utils :

 org.apache.flink flink-java 1.2.0   org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. Konsep API Teras

Semasa bekerja dengan Flink, kita perlu mengetahui beberapa perkara yang berkaitan dengan APInya:

  • Setiap program Flink melakukan transformasi pada koleksi data yang diedarkan. Berbagai fungsi untuk mengubah data disediakan, termasuk penyaringan, pemetaan, penggabungan, pengelompokan, dan penggabungan
  • A sink operasi dalam Flink mencetuskan pelaksanaan sungai untuk menghasilkan keputusan yang dikehendaki program ini , seperti menyimpan hasil kepada sistem fail atau mencetaknya output standard
  • Transformasi flink adalah malas, yang bermaksud bahawa ia tidak akan dilaksanakan sehingga operasi sink dilakukan
  • Apache Flink API menyokong dua mod operasi - kumpulan dan masa nyata. Sekiranya anda berurusan dengan sumber data terhad yang dapat diproses dalam mod kumpulan, anda akan menggunakan API DataSet . Sekiranya anda ingin memproses aliran data tanpa batas dalam waktu nyata, anda perlu menggunakan API DataStream

4. Transformasi API DataSet

Titik masuk ke program Flink adalah contoh kelas ExecutionEnvironment - ini menentukan konteks di mana program dijalankan.

Mari buat ExecutionEnvironment untuk memulakan proses kami:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Perhatikan bahawa semasa anda melancarkan aplikasi di mesin tempatan, aplikasi akan melakukan pemprosesan pada JVM tempatan. Sekiranya anda ingin memulakan pemprosesan pada sekumpulan mesin, anda perlu memasang Apache Flink pada mesin tersebut dan mengkonfigurasi ExecutionEnvironment yang sesuai.

4.1. Membuat Set Data

Untuk mula melakukan transformasi data, kita perlu menyediakan data dengan program kita.

Mari buat contoh kelas DataSet menggunakan ExecutionEnvirgment kami :

DataSet amounts = env.fromElements(1, 29, 40, 50);

Anda boleh membuat set data daripada pelbagai sumber, seperti Apache Kafka, CSV, fail atau hampir mana-mana sumber data lain.

4.2. Tapis dan Kurangkan

Sebaik sahaja anda membuat contoh kelas DataSet , anda boleh menerapkan transformasi kepadanya.

Katakan anda mahu menapis nombor yang berada di atas ambang tertentu dan jumlahnya seterusnya . Anda boleh menggunakan penapis () dan mengurangkan () transformasi untuk mencapai ini:

int threshold = 30; List collect = amounts .filter(a -> a > threshold) .reduce((integer, t1) -> integer + t1) .collect(); assertThat(collect.get(0)).isEqualTo(90); 

Perhatikan bahawa kaedah collect () adalah operasi sink yang mencetuskan transformasi data sebenar.

4.3. Peta

Katakan bahawa anda mempunyai objek DataSet of Person :

private static class Person { private int age; private String name; // standard constructors/getters/setters }

Seterusnya, mari buat Set Data objek ini:

DataSet personDataSource = env.fromCollection( Arrays.asList( new Person(23, "Tom"), new Person(75, "Michael")));

Katakan bahawa anda ingin mengekstrak bidang umur hanya dari setiap objek koleksi. Anda boleh menggunakan transformasi peta () untuk mendapatkan hanya bidang tertentu dari kelas Orang :

List ages = personDataSource .map(p -> p.age) .collect(); assertThat(ages).hasSize(2); assertThat(ages).contains(23, 75);

4.4. Sertailah

Apabila anda mempunyai dua set data, anda mungkin mahu menyertainya di beberapa medan id . Untuk ini, anda boleh menggunakan transformasi bergabung () .

Mari buat koleksi transaksi dan alamat pengguna:

Tuple3 address = new Tuple3(1, "5th Avenue", "London"); DataSet
    
      addresses = env.fromElements(address); Tuple2 firstTransaction = new Tuple2(1, "Transaction_1"); DataSet
     
       transactions = env.fromElements(firstTransaction, new Tuple2(12, "Transaction_2")); 
     
    

Medan pertama di kedua-dua tupel adalah jenis Integer , dan ini adalah medan id di mana kita mahu bergabung dengan kedua set data.

Untuk melaksanakan logik penyertaan sebenar, kita perlu menerapkan antara muka KeySelector untuk alamat dan transaksi:

private static class IdKeySelectorTransaction implements KeySelector
    
      { @Override public Integer getKey(Tuple2 value) { return value.f0; } } private static class IdKeySelectorAddress implements KeySelector
     
       { @Override public Integer getKey(Tuple3 value) { return value.f0; } }
     
    

Setiap pemilih hanya mengembalikan bidang di mana penggabungan harus dilakukan.

Sayangnya, tidak mungkin menggunakan ungkapan lambda di sini kerana Flink memerlukan maklumat jenis generik.

Seterusnya, mari kita laksanakan penggabungan logik menggunakan pemilih tersebut:

List
    
     > joined = transactions.join(addresses) .where(new IdKeySelectorTransaction()) .equalTo(new IdKeySelectorAddress()) .collect(); assertThat(joined).hasSize(1); assertThat(joined).contains(new Tuple2(firstTransaction, address)); 
    

4.5. Susun

Katakan anda mempunyai koleksi Tuple2 berikut:

Tuple2 secondPerson = new Tuple2(4, "Tom"); Tuple2 thirdPerson = new Tuple2(5, "Scott"); Tuple2 fourthPerson = new Tuple2(200, "Michael"); Tuple2 firstPerson = new Tuple2(1, "Jack"); DataSet
    
      transactions = env.fromElements( fourthPerson, secondPerson, thirdPerson, firstPerson); 
    

Sekiranya anda ingin menyusun koleksi ini dengan bidang pertama tuple, anda boleh menggunakan transformasi sortPartitions () :

List
    
      sorted = transactions .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING) .collect(); assertThat(sorted) .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);
    

5. Bilangan Perkataan

Masalah pengiraan kata adalah masalah yang biasa digunakan untuk menunjukkan kemampuan kerangka pemprosesan Big Data. Penyelesaian asas melibatkan pengiraan kejadian kata dalam input teks. Mari gunakan Flink untuk melaksanakan penyelesaian untuk masalah ini.

Sebagai langkah pertama dalam penyelesaian kami, kami membuat kelas LineSplitter yang membagi input kami menjadi token (kata), mengumpulkan untuk setiap token Tuple2 pasangan nilai-kunci. Dalam setiap tupel ini, kuncinya adalah kata yang terdapat dalam teks, dan nilainya adalah bilangan bulat (1).

This class implements the FlatMapFunction interface that takes String as an input and produces a Tuple2:

public class LineSplitter implements FlatMapFunction
    
      { @Override public void flatMap(String value, Collector
     
       out) { Stream.of(value.toLowerCase().split("\\W+")) .filter(t -> t.length() > 0) .forEach(token -> out.collect(new Tuple2(token, 1))); } }
     
    

We call the collect() method on the Collector class to push data forward in the processing pipeline.

Our next and final step is to group the tuples by their first elements (words) and then perform a sum aggregate on the second elements to produce a count of the word occurrences:

public static DataSet
    
      startWordCount( ExecutionEnvironment env, List lines) throws Exception { DataSet text = env.fromCollection(lines); return text.flatMap(new LineSplitter()) .groupBy(0) .aggregate(Aggregations.SUM, 1); }
    

We are using three types of the Flink transformations: flatMap(), groupBy(), and aggregate().

Let's write a test to assert that the word count implementation is working as expected:

List lines = Arrays.asList( "This is a first sentence", "This is a second sentence with a one word"); DataSet
    
      result = WordCount.startWordCount(env, lines); List
     
       collect = result.collect(); assertThat(collect).containsExactlyInAnyOrder( new Tuple2("a", 3), new Tuple2("sentence", 2), new Tuple2("word", 1), new Tuple2("is", 2), new Tuple2("this", 2), new Tuple2("second", 1), new Tuple2("first", 1), new Tuple2("with", 1), new Tuple2("one", 1));
     
    

6. DataStream API

6.1. Creating a DataStream

Apache Flink also supports the processing of streams of events through its DataStream API. If we want to start consuming events, we first need to use the StreamExecutionEnvironment class:

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

Next, we can create a stream of events using the executionEnvironment from a variety of sources. It could be some message bus like Apache Kafka, but in this example, we will simply create a source from a couple of string elements:

DataStream dataStream = executionEnvironment.fromElements( "This is a first sentence", "This is a second sentence with a one word");

We can apply transformations to every element of the DataStream like in the normal DataSet class:

SingleOutputStreamOperator upperCase = text.map(String::toUpperCase);

To trigger the execution, we need to invoke a sink operation such as print() that will just print the result of transformations to the standard output, following with the execute() method on the StreamExecutionEnvironment class:

upperCase.print(); env.execute();

It will produce the following output:

1> THIS IS A FIRST SENTENCE 2> THIS IS A SECOND SENTENCE WITH A ONE WORD

6.2. Windowing of Events

When processing a stream of events in real time, you may sometimes need to group events together and apply some computation on a window of those events.

Suppose we have a stream of events, where each event is a pair consisting of the event number and the timestamp when the event was sent to our system, and that we can tolerate events that are out-of-order but only if they are no more than twenty seconds late.

For this example, let's first create a stream simulating two events that are several minutes apart and define a timestamp extractor that specifies our lateness threshold:

SingleOutputStreamOperator
    
      windowed = env.fromElements( new Tuple2(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()), new Tuple2(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond())) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor 
     
      (Time.seconds(20)) { @Override public long extractTimestamp(Tuple2 element) { return element.f1 * 1000; } });
     
    

Next, let's define a window operation to group our events into five-second windows and apply a transformation on those events:

SingleOutputStreamOperator
    
      reduced = windowed .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .maxBy(0, true); reduced.print();
    

It will get the last element of every five-second window, so it prints out:

1> (15,1491221519)

Note that we do not see the second event because it arrived later than the specified lateness threshold.

7. Conclusion

In this article, we introduced the Apache Flink framework and looked at some of the transformations supplied with its API.

We implemented a word count program using Flink's fluent and functional DataSet API. Then we looked at the DataStream API and implemented a simple real-time transformation on a stream of events.

Pelaksanaan semua contoh dan potongan kode ini boleh didapati di GitHub - ini adalah projek Maven, jadi mudah untuk diimport dan dijalankan sebagaimana adanya.