Bermula dengan Pemprosesan Aliran dengan Aliran Data Awan Musim Semi

1. Pengenalan

Spring Cloud Data Flow adalah model pengaturcaraan dan operasi asli cloud untuk perkhidmatan mikro data yang dapat digabungkan.

Dengan Spring Cloud Data Flow , pembangun dapat membuat dan mengatur saluran data untuk kes penggunaan umum seperti pengambilan data, analisis masa nyata, dan import / eksport data.

Saluran data ini hadir dalam dua perisa, aliran dan aliran data kumpulan.

Dalam kes pertama, jumlah data tanpa batas digunakan atau dihasilkan melalui perisian tengah pesanan. Sementara dalam kes kedua, tugas jangka pendek memproses satu set data yang terbatas dan kemudian dihentikan.

Artikel ini akan memberi tumpuan kepada pemprosesan penstriman.

2. Gambaran Keseluruhan Senibina

Komponen utama jenis seni bina ini adalah Aplikasi , Data Flow Server , dan jangka masa sasaran.

Selain komponen utama ini, kami juga biasanya memiliki Data Flow Shell dan broker pesanan dalam seni bina.

Mari lihat semua komponen ini dengan lebih terperinci.

2.1. Permohonan

Biasanya, saluran data streaming merangkumi memakan peristiwa dari sistem luaran, pemprosesan data, dan ketekunan polyglot. Fasa-fasa ini biasanya disebut sebagai sumber , Pemproses , dan istilah Tenggelam dalam Spring Cloud :

  • Sumber: adalah aplikasi yang memakan peristiwa
  • Pemproses: menggunakan data dari Sumber , melakukan pemprosesan di atasnya, dan memancarkan data yang diproses ke aplikasi berikutnya dalam perancangan
  • Tenggelam: sama ada menggunakan dari Sumber atau Pemproses dan menulis data ke lapisan ketekunan yang diinginkan

Aplikasi ini boleh dikemas dalam dua cara:

  • Spring Boot uber-jar yang dihoskan di repositori, file, http atau pelaksanaan sumber Spring yang lain (kaedah ini akan digunakan dalam artikel ini)
  • Docker

Banyak sumber, pemproses, dan aplikasi sink untuk kes penggunaan umum (misalnya jdbc, hdfs, http, router) sudah disediakan dan siap digunakan oleh pasukan Spring Cloud Data Flow .

2.2. Masa Jalan

Juga, runtime diperlukan agar aplikasi ini dapat dijalankan. Waktu berjalan yang disokong adalah:

  • Cloud Foundry
  • Apache YARN
  • Kubernetes
  • Mesos Apache
  • Pelayan Tempatan untuk pembangunan (yang akan digunakan dalam artikel ini)

2.3. Pelayan Aliran Data

Komponen yang bertanggungjawab menyebarkan aplikasi ke runtime adalah Data Flow Server . Terdapat balang pelayan Data Flow Server yang disediakan untuk setiap jangka masa sasaran.

The Data Flow Server bertanggungjawab untuk mentafsir:

  • Aliran DSL yang menerangkan aliran data logik melalui pelbagai aplikasi.
  • Manifes penerapan yang menjelaskan pemetaan aplikasi ke waktu proses.

2.4. Shell Aliran Data

Data Flow Shell adalah pelanggan untuk Data Flow Server. Shell memungkinkan kita untuk melaksanakan perintah DSL yang diperlukan untuk berinteraksi dengan pelayan.

Sebagai contoh, DSL untuk menggambarkan aliran data dari sumber http ke sinki jdbc akan ditulis sebagai "http | jdbc ”. Nama-nama ini dalam DSL didaftarkan dengan Data Flow Server dan memetakan ke artifak aplikasi yang boleh dihoskan di repositori Maven atau Docker.

Spring juga menawarkan antara muka grafik, bernama Flo , untuk membuat dan memantau aliran data saluran. Walau bagaimanapun, penggunaannya berada di luar perbincangan artikel ini.

2.5. Broker Mesej

Seperti yang telah kita lihat dalam contoh bahagian sebelumnya, kita telah menggunakan simbol paip ke dalam definisi aliran data. Simbol paip mewakili komunikasi antara kedua-dua aplikasi melalui alat tengah pesanan.

Ini bermaksud bahawa kita memerlukan broker mesej dan berjalan di lingkungan sasaran.

Dua broker middleware pesanan yang disokong adalah:

  • Apache Kafka
  • ArnabMQ

Oleh itu, sekarang kita mempunyai gambaran keseluruhan komponen seni bina - sudah waktunya untuk membina saluran pemprosesan aliran pertama kami.

3. Pasang Broker Mesej

Seperti yang telah kita lihat, aplikasi dalam perancangan memerlukan alat tengah pesanan untuk berkomunikasi. Untuk tujuan artikel ini, kami akan menggunakan RabbitMQ .

Untuk maklumat lengkap pemasangan, anda boleh mengikuti arahan di laman web rasmi.

4. Pelayan Aliran Data Tempatan

Untuk mempercepat proses menghasilkan aplikasi kami, kami akan menggunakan Spring Initializr; dengan bantuannya, kami dapat memperoleh aplikasi Spring Boot kami dalam beberapa minit.

Setelah melayari laman web, cukup pilih nama Kumpulan dan Artifak .

Setelah ini selesai, klik pada butang Hasilkan Projek untuk memulakan muat turun artifak Maven.

Setelah muat turun selesai, buka zip projek dan import sebagai projek Maven di IDE pilihan anda.

Mari tambahkan kebergantungan Maven ke projek. Oleh kerana kita memerlukan perpustakaan Pelayan Tempatan Dataflow , mari tambahkan pergantungan spring-cloud-starter-dataflow-server-local:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

Sekarang kita perlu memberi anotasi kelas utama Spring Boot dengan anotasi @EnableDataFlowServer :

@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } } 

Itu sahaja. Pelayan Aliran Data Tempatan kami siap dijalankan:

mvn spring-boot:run

Aplikasi akan dimuat di port 9393.

5. Shell Aliran Data

Sekali lagi, pergi ke Spring Initializr dan pilih nama Kumpulan dan Artifak .

Sebaik sahaja kami memuat turun dan mengimport projek, mari tambah pergantungan spring-cloud-dataflow-shell:

 org.springframework.cloud spring-cloud-dataflow-shell 

Sekarang kita perlu menambahkan anotasi @EnableDataFlowShell ke kelas utama Spring Boot :

@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } } 

Kita sekarang boleh menjalankan shell:

mvn spring-boot:run

After the shell is running, we can type the help command in the prompt to see a complete list of command that we can perform.

6. The Source Application

Similarly, on Initializr, we'll now create a simple application and add a Stream Rabbit dependency called spring-cloud-starter-stream-rabbit:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 

We'll then add the @EnableBinding(Source.class) annotation to the Spring Boot main class:

@EnableBinding(Source.class) @SpringBootApplication public class SpringDataFlowTimeSourceApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeSourceApplication.class, args); } }

Now we need to define the source of the data that must be processed. This source could be any potentially endless workload (internet-of-things sensor data, 24/7 event processing, online transaction data ingest).

In our sample application, we produce one event (for simplicity a new timestamp) every 10 seconds with a Poller.

The @InboundChannelAdapter annotation sends a message to the source’s output channel, using the return value as the payload of the message:

@Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSource timeMessageSource() { return () -> MessageBuilder.withPayload(new Date().getTime()).build(); } 

Our data source is ready.

7. The Processor Application

Next- we'll create an application and add a Stream Rabbit dependency.

We'll then add the @EnableBinding(Processor.class) annotation to the Spring Boot main class:

@EnableBinding(Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeProcessorApplication.class, args); } }

Next, we need to define a method to process the data that coming from the source application.

To define a transformer, we need to annotate this method with @Transformer annotation:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(Long timestamp) { DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy"); String date = dateFormat.format(timestamp); return date; }

It converts a timestamp from the ‘input' channel to a formatted date which will be sent to the ‘output' channel.

8. The Sink Application

The last application to create is the Sink application.

Again, go to the Spring Initializr and choose a Group, an Artifact name. After downloading the project let's add a Stream Rabbit dependency.

Then add the @EnableBinding(Sink.class) annotation to the Spring Boot main class:

@EnableBinding(Sink.class) @SpringBootApplication public class SpringDataFlowLoggingSinkApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowLoggingSinkApplication.class, args); } }

Now we need a method to intercept the messages coming from the processor application.

To do this, we need to add the @StreamListener(Sink.INPUT) annotation to our method:

@StreamListener(Sink.INPUT) public void loggerSink(String date) { logger.info("Received: " + date); }

The method simply prints the timestamp transformed in a formatted date to a log file.

9. Register a Stream App

The Spring Cloud Data Flow Shell allow us to Register a Stream App with the App Registry using the app register command.

We must provide a unique name, application type, and a URI that can be resolved to the app artifact. For the type, specify “source“, “processor“, or “sink“.

When providing a URI with the maven scheme, the format should conform to the following:

maven://:[:[:]]:

To register the Source, Processor and Sink applications previously created , go to the Spring Cloud Data Flow Shell and issue the following commands from the prompt:

app register --name time-source --type source --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT app register --name time-processor --type processor --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT app register --name logging-sink --type sink --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT 

10. Create and Deploy the Stream

To create a new stream definition go to the Spring Cloud Data Flow Shell and execute the following shell command:

stream create --name time-to-log --definition 'time-source | time-processor | logging-sink'

This defines a stream named time-to-log based on the DSL expression ‘time-source | time-processor | logging-sink'.

Then to deploy the stream execute the following shell command:

stream deploy --name time-to-log

The Data Flow Server resolves time-source, time-processor, and logging-sink to maven coordinates and uses those to launch the time-source, time-processor and logging-sink applications of the stream.

If the stream is correctly deployed you’ll see in the Data Flow Server logs that the modules have been started and tied together:

2016-08-24 12:29:10.516 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink 2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor 2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. Reviewing the Result

In this example, the source simply sends the current timestamp as a message each second, the processor format it and the log sink outputs the formatted timestamp using the logging framework.

The log files are located within the directory displayed in the Data Flow Server’s log output, as shown above. To see the result, we can tail the log:

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log 2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01 2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11 2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. Conclusion

In this article, we have seen how to build a data pipeline for stream processing through the use of Spring Cloud Data Flow.

Juga, kami melihat peranan aplikasi Source , Processor dan Sink di dalam aliran dan cara memasang dan mengikat modul ini di dalam Data Flow Server melalui penggunaan Data Flow Shell .

Contoh kod boleh didapati dalam projek GitHub.