Panduan untuk Aliran Akka

1. Gambaran keseluruhan

Dalam artikel ini, kita akan melihat perpustakaan akka-stream yang dibina di atas kerangka aktor Akka, yang mematuhi manifesto aliran reaktif. API Akka Streams membolehkan kita menyusun aliran transformasi data dengan mudah dari langkah bebas.

Lebih-lebih lagi, semua pemprosesan dilakukan secara reaktif, tidak menyekat dan tidak segerak.

2. Pergantungan Maven

Untuk bermula, kita perlu menambah Akka-aliran dan Akka strim-testkit perpustakaan ke dalam kami pom.xml:

 com.typesafe.akka akka-stream_2.11 2.5.2   com.typesafe.akka akka-stream-testkit_2.11 2.5.2 

3. API Akka Aliran

Untuk bekerjasama dengan Akka Streams, kita perlu mengetahui konsep inti API:

  • Sumber - titik masuk untuk memproses di perpustakaan aliran-akka - kita dapat membuat contoh kelas ini dari pelbagai sumber; sebagai contoh, kita dapat menggunakan kaedah tunggal () jika kita ingin membuat Sumber dari satu String , atau kita dapat membuat Sumber dariunsur yangdapat diganti
  • Flow - blok bangunan pemprosesan utama - setiapinstance Flow mempunyai satu input dan satu nilai output
  • Materializer - kita boleh menggunakannya jika kita mahu Arus kita mempunyai beberapa kesan sampingan seperti pembalakan atau menyimpan hasil ; paling biasa, kita akan menggunakan alias NotUsed sebagai Materializer untuk menandakan bahawa Aliran kita tidak seharusnya mempunyai kesan sampingan
  • Operasi tenggelam - ketika kita sedang membuat Aliran, operasi tidak akan dijalankan sehingga kita akan mendaftarkan operasi Tenggelam di atasnya - ini adalah operasi terminal yang memicu semua pengiraan di seluruh Aliran

4. Membuat Aliran dalam Aliran Akka

Mari kita mulakan dengan membina contoh mudah, di mana kita akan menunjukkan cara membuat dan menggabungkan beberapa Flow s - untuk memproses aliran bilangan bulat dan mengira tetingkap bergerak rata pasangan integer dari aliran.

Kami akan menguraikan rentetan bilangan bulat yang dibatasi titik koma sebagai input untuk membuat Sumber aliran-akka kami sebagai contoh.

4.1. Menggunakan Aliran untuk Menghuraikan Input

Pertama, mari buat kelas DataImporter yang akan mengambil contoh Sistem Aktor yang akan kami gunakan kemudian untuk membuat Aliran kami :

public class DataImporter { private ActorSystem actorSystem; // standard constructors, getters... }

Seterusnya, mari buat kaedah parseLine yang akan menghasilkan List of Integer dari String input yang kita hadkan . Perlu diingat bahawa kami menggunakan Java Stream API di sini hanya untuk menguraikan:

private List parseLine(String line) { String[] fields = line.split(";"); return Arrays.stream(fields) .map(Integer::parseInt) .collect(Collectors.toList()); }

Aliran awal kami akan menggunakan parseLine pada input kami untuk membuat Aliran dengan jenis input String dan jenis output Integer :

private Flow parseContent() { return Flow.of(String.class) .mapConcat(this::parseLine); }

Apabila kita memanggil kaedah parseLine () , penyusun tahu bahawa argumen untuk fungsi lambda itu akan menjadi String - sama dengan jenis input ke Aliran kita .

Perhatikan bahawa kita menggunakan mapConcat () kaedah - bersamaan dengan Java 8 flatMap () kaedah - kerana kita mahu meleperkan Senarai daripada Integer dikembalikan oleh parseLine () ke dalam Flow of Integer supaya langkah-langkah seterusnya dalam pemprosesan kami tidak perlu untuk menangani Senarai .

4.2. Menggunakan Aliran untuk Melakukan Pengiraan

Pada ketika ini, kita mempunyai aliran bilangan bulat yang dihuraikan. Sekarang, kita perlu menerapkan logik yang akan mengumpulkan semua elemen input menjadi pasangan dan mengira rata-rata pasangan tersebut .

Sekarang, kita akan membuat Flow of Integer s dan kumpulan mereka menggunakan kumpulan () kaedah .

Seterusnya, kami ingin mengira purata.

Oleh kerana kita tidak berminat dalam perintah itu di mana mereka ialah akan diproses, kita boleh mempunyai purata dikira selari menggunakan benang pelbagai dengan menggunakan mapAsyncUnordered () kaedah , lulus bilangan benang sebagai hujah untuk kaedah ini.

Tindakan yang akan dilalui sebagai lambda ke Aliran perlu mengembalikan CompletableFuture kerana tindakan itu akan dikira secara serentak dalam urutan terpisah:

private Flow computeAverage() { return Flow.of(Integer.class) .grouped(2) .mapAsyncUnordered(8, integers -> CompletableFuture.supplyAsync(() -> integers.stream() .mapToDouble(v -> v) .average() .orElse(-1.0))); }

Kami mengira purata dalam lapan utas selari. Perhatikan bahawa kami menggunakan Java 8 Stream API untuk menghitung rata-rata.

4.3. Menyusun pelbagai aliran menjadi satu aliran

The Flow API adalah abstraksi fasih yang membolehkan kita untuk mengarang pelbagai Flow keadaan untuk mencapai matlamat pemprosesan akhir kami . Kita boleh mempunyai aliran terperinci di mana satu, misalnya, menguraikan JSON, yang lain melakukan transformasi, dan yang lain mengumpulkan beberapa statistik.

Perincian seperti itu akan membantu kita membuat kod yang lebih boleh diuji kerana kita dapat menguji setiap langkah pemprosesan secara bebas.

Kami membuat dua aliran di atas yang boleh berfungsi secara bebas antara satu sama lain. Sekarang, kami mahu menyusunnya bersama.

Pertama, kami ingin menguraikan input String kami , dan seterusnya, kami ingin mengira rata-rata pada aliran elemen.

Kami dapat menyusun aliran kami menggunakan kaedah via () :

Flow calculateAverage() { return Flow.of(String.class) .via(parseContent()) .via(computeAverage()); }

Kami membuat Aliran yang mempunyai input jenis String dan dua aliran lain selepasnya. The parseContent () Aliran mengambil String input dan mengembalikan Integer sebagai output. Aliran computeAverage () menggunakan Integer itu dan mengira rata-rata Double yang mengembalikan sebagai jenis output.

5. Menambah Tenggelam ke Aliran

Seperti yang kami sebutkan, hingga saat ini keseluruhan Aliran belum dilaksanakan kerana malas. Untuk memulakan pelaksanaan Aliran, kita perlu menentukan Sink . Yang tenggelam operasi boleh, sebagai contoh, menyimpan data ke dalam pangkalan data, atau keputusan hantar ke beberapa perkhidmatan web luar.

Katakan kita mempunyai kelas AverageRepository dengan kaedah save () berikut yang menulis hasil ke pangkalan data kami:

CompletionStage save(Double average) { return CompletableFuture.supplyAsync(() -> { // write to database return average; }); }

Sekarang, kami ingin membuat operasi Sink yang menggunakan kaedah ini untuk menyimpan hasil pemprosesan Flow kami . Untuk membuat Sink kita , pertama-tama kita perlu membuat Flow yang menghasilkan hasil pemprosesan kita sebagai jenis input . Seterusnya, kami ingin menyimpan semua hasil kami ke pangkalan data.

Sekali lagi, kami tidak peduli dengan susunan elemen, jadi kami dapat melakukan operasi simpan () secara selari menggunakan kaedah mapAsyncUnordered () .

Untuk membuat Sink dari Aliran kita perlu memanggil toMat () dengan Sink.ignore () sebagai argumen pertama dan Keep.right () sebagai yang kedua kerana kami ingin mengembalikan status pemprosesan:

private Sink
    
      storeAverages() { return Flow.of(Double.class) .mapAsyncUnordered(4, averageRepository::save) .toMat(Sink.ignore(), Keep.right()); }
    

6. Menentukan Sumber untuk Aliran

Perkara terakhir yang perlu kita lakukan ialah membuat Sumber dari input String . Kita boleh memohon calculateAverage () Aliran sumber ini menggunakan melalui () kaedah.

Kemudian, untuk menambahkan Sink ke pemprosesan, kita perlu memanggil kaedah runWith () dan lulus sinkAstore () yang baru kita buat:

CompletionStage calculateAverageForContent(String content) { return Source.single(content) .via(calculateAverage()) .runWith(storeAverages(), ActorMaterializer.create(actorSystem)) .whenComplete((d, e) -> { if (d != null) { System.out.println("Import finished "); } else { e.printStackTrace(); } }); }

Note that when the processing is finished we are adding the whenComplete() callback, in which we can perform some action depending on the outcome of the processing.

7. Testing Akka Streams

We can test our processing using the akka-stream-testkit.

The best way to test the actual logic of the processing is to test all Flow logic and use TestSink to trigger the computation and assert on the results.

In our test, we are creating the Flow that we want to test, and next, we are creating a Source from the test input content:

@Test public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() { // given Flow tested = new DataImporter(actorSystem).calculateAverage(); String input = "1;9;11;0"; // when Source flow = Source.single(input).via(tested); // then flow .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem)) .request(4) .expectNextUnordered(5d, 5.5); }

We are checking that we are expecting four input arguments, and two results that are averages can arrive in any order because our processing is done in the asynchronous and parallel way.

8. Conclusion

In this article, we were looking at the akka-stream library.

We defined a process that combines multiple Flows to calculate moving average of elements. Then, we defined a Source that is an entry point of the stream processing and a Sink that triggers the actual processing.

Finally, we wrote a test for our processing using the akka-stream-testkit.

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.