Pengenalan kepada Spring Cloud Stream

1. Gambaran keseluruhan

Spring Cloud Stream adalah kerangka yang dibina di atas Spring Boot dan Spring Integration yang membantu dalam mewujudkan perkhidmatan mikro berdasarkan acara atau mesej .

Dalam artikel ini, kami akan memperkenalkan konsep dan konstruksi Spring Cloud Stream dengan beberapa contoh mudah.

2. Pergantungan Maven

Untuk memulakan, kita perlu menambahkan Spring Cloud Starter Stream dengan pergantungan broker RabbitMQ Maven sebagai perisian pemesejan ke pom.xml kami :

 org.springframework.cloud spring-cloud-starter-stream-rabbit 1.3.0.RELEASE 

Dan kami akan menambahkan kebergantungan modul dari Maven Central untuk membolehkan sokongan JUnit juga:

 org.springframework.cloud spring-cloud-stream-test-support 1.3.0.RELEASE test 

3. Konsep Utama

Senibina perkhidmatan mikro mengikut prinsip "titik akhir pintar dan paip bodoh". Komunikasi antara titik akhir didorong oleh pemesejan-pihak tengah seperti RabbitMQ atau Apache Kafka. Perkhidmatan berkomunikasi dengan menerbitkan acara domain melalui titik akhir atau saluran ini .

Mari kita teliti konsep yang membentuk kerangka Spring Cloud Stream, bersama dengan paradigma penting yang mesti kita perhatikan untuk membina perkhidmatan berdasarkan mesej.

3.1. Konstruk

Mari lihat perkhidmatan sederhana di Spring Cloud Stream yang mendengar input mengikat dan menghantar respons terhadap output yang mengikat:

@SpringBootApplication @EnableBinding(Processor.class) public class MyLoggerServiceApplication { public static void main(String[] args) { SpringApplication.run(MyLoggerServiceApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public LogMessage enrichLogMessage(LogMessage log) { return new LogMessage(String.format("[1]: %s", log.getMessage())); } }

Anotasi @EnableBinding mengkonfigurasi aplikasi untuk mengikat saluran INPUT dan OUTPUT yang ditentukan dalam prosesor antara muka . Kedua-dua saluran adalah pengikat yang dapat dikonfigurasi untuk menggunakan alat tengah atau pengikat pesanan konkrit.

Mari kita lihat definisi semua konsep ini:

  • Bindings - kumpulan antara muka yang mengenal pasti saluran input dan output secara deklaratif
  • Binder - pelaksanaan pemesejan-middleware seperti Kafka atau RabbitMQ
  • Saluran - mewakili saluran komunikasi antara pemesejan-middleware dan aplikasi
  • StreamListeners - kaedah pengendalian mesej dalam kacang yang akan dipanggil secara automatik pada mesej dari saluran setelah MessageConverter melakukan serialisasi / deserialisasi antara peristiwa khusus middleware dan jenis objek domain / POJO
  • Mes sage Schemas - digunakan untuk serialisasi dan deserialisasi mesej, skema ini dapat dibaca secara statistik dari lokasi atau dimuat secara dinamis, mendukung evolusi jenis objek domain

3.2. Corak Komunikasi

Mesej yang ditentukan ke destinasi dihantar dengan corak pesanan Publish-Subscribe . Penerbit mengkategorikan mesej ke dalam topik, masing-masing dikenali dengan nama. Pelanggan menyatakan minat dalam satu atau lebih topik. Alat tengah menyaring mesej, menyampaikan topik-topik menarik kepada pelanggan.

Kini, pelanggan dapat dikelompokkan. A kumpulan pengguna adalah satu set pelanggan atau pengguna, pasti melalui id kumpulan , di mana pesanan dari topik atau partition topik yang disampaikan dengan cara beban seimbang.

4. Model Pengaturcaraan

Bahagian ini menerangkan asas-asas pembuatan aplikasi Spring Cloud Stream.

4.1. Ujian Berfungsi

Sokongan ujian adalah pelaksanaan pengikat yang memungkinkan berinteraksi dengan saluran dan memeriksa mesej.

Mari kita hantar mesej ke perkhidmatan richLogMessage di atas dan periksa sama ada respons mengandungi teks "[1]:" pada awal mesej:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = MyLoggerServiceApplication.class) @DirtiesContext public class MyLoggerApplicationTests { @Autowired private Processor pipe; @Autowired private MessageCollector messageCollector; @Test public void whenSendMessage_thenResponseShouldUpdateText() { pipe.input() .send(MessageBuilder.withPayload(new LogMessage("This is my message")) .build()); Object payload = messageCollector.forChannel(pipe.output()) .poll() .getPayload(); assertEquals("[1]: This is my message", payload.toString()); } }

4.2. Saluran Tersuai

Dalam contoh di atas, kami menggunakan antara muka Pemproses yang disediakan oleh Spring Cloud, yang hanya mempunyai satu input dan satu saluran output.

Sekiranya kita memerlukan sesuatu yang berbeza, seperti satu input dan dua saluran output, kita dapat membuat pemproses tersuai:

public interface MyProcessor { String INPUT = "myInput"; @Input SubscribableChannel myInput(); @Output("myOutput") MessageChannel anOutput(); @Output MessageChannel anotherOutput(); }

Spring akan menyediakan pelaksanaan antara muka ini dengan betul untuk kita. Nama saluran dapat ditetapkan menggunakan anotasi seperti di @Output ("myOutput") .

Jika tidak, Spring akan menggunakan nama kaedah sebagai nama saluran. Oleh itu, kami mempunyai tiga saluran bernama myInput , myOutput , danOutput lain .

Sekarang, mari kita bayangkan kita ingin merutekan mesej ke satu output jika nilainya kurang dari 10 dan ke output lain nilainya lebih besar daripada atau sama dengan 10:

@Autowired private MyProcessor processor; @StreamListener(MyProcessor.INPUT) public void routeValues(Integer val) { if (val < 10) { processor.anOutput().send(message(val)); } else { processor.anotherOutput().send(message(val)); } } private static final  Message message(T val) { return MessageBuilder.withPayload(val).build(); }

4.3. Penghantaran Bersyarat

Dengan menggunakan anotasi @StreamListener , kami juga dapat menyaring mesej yang kami harapkan pada pengguna menggunakan syarat yang kami tentukan dengan ekspresi SpEL.

Sebagai contoh, kita dapat menggunakan pengiriman bersyarat sebagai pendekatan lain untuk menghantar mesej ke output yang berbeza:

@Autowired private MyProcessor processor; @StreamListener( target = MyProcessor.INPUT, condition = "payload = 10") public void routeValuesToAnotherOutput(Integer val) { processor.anotherOutput().send(message(val)); }

Satu-satunya batasan pendekatan ini adalah bahawa kaedah ini tidak boleh mengembalikan nilai.

5. Persediaan

Let's set up the application that will process the message from the RabbitMQ broker.

5.1. Binder Configuration

We can configure our application to use the default binder implementation via META-INF/spring.binders:

rabbit:\ org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Or we can add the binder library for RabbitMQ to the classpath by including this dependency:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 1.3.0.RELEASE 

If no binder implementation is provided, Spring will use direct message communication between the channels.

5.2. RabbitMQ Configuration

To configure the example in section 3.1 to use the RabbitMQ binder, we need to update the application.yml located at src/main/resources:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit output: destination: queue.pretty.log.messages binder: local_rabbit binders: local_rabbit: type: rabbit environment: spring: rabbitmq: host:  port: 5672 username:  password:  virtual-host: /

The input binding will use the exchange called queue.log.messages, and the output binding will use the exchange queue.pretty.log.messages. Both bindings will use the binder called local_rabbit.

Note that we don't need to create the RabbitMQ exchanges or queues in advance. When running the application, both exchanges are automatically created.

To test the application, we can use the RabbitMQ management site to publish a message. In the Publish Message panel of the exchange queue.log.messages, we need to enter the request in JSON format.

5.3. Customizing Message Conversion

Spring Cloud Stream allows us to apply message conversion for specific content types. In the above example, instead of using JSON format, we want to provide plain text.

To do this, we'll to apply a custom transformation to LogMessage using a MessageConverter:

@SpringBootApplication @EnableBinding(Processor.class) public class MyLoggerServiceApplication { //... @Bean public MessageConverter providesTextPlainMessageConverter() { return new TextPlainMessageConverter(); } //... }
public class TextPlainMessageConverter extends AbstractMessageConverter { public TextPlainMessageConverter() { super(new MimeType("text", "plain")); } @Override protected boolean supports(Class clazz) { return (LogMessage.class == clazz); } @Override protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { Object payload = message.getPayload(); String text = payload instanceof String ? (String) payload : new String((byte[]) payload); return new LogMessage(text); } }

After applying these changes, going back to the Publish Message panel, if we set the header “contentTypes” to “text/plain” and the payload to “Hello World“, it should work as before.

5.4. Consumer Groups

When running multiple instances of our application, every time there is a new message in an input channel, all subscribers will be notified.

Most of the time, we need the message to be processed only once. Spring Cloud Stream implements this behavior via consumer groups.

To enable this behavior, each consumer binding can use the spring.cloud.stream.bindings..group property to specify a group name:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit group: logMessageConsumers ...

6. Message-Driven Microservices

In this section, we introduce all the required features for running our Spring Cloud Stream applications in a microservices context.

6.1. Scaling Up

When multiple applications are running, it's important to ensure the data is split properly across consumers. To do so, Spring Cloud Stream provides two properties:

  • spring.cloud.stream.instanceCount — number of running applications
  • spring.cloud.stream.instanceIndex — index of the current application

For example, if we've deployed two instances of the above MyLoggerServiceApplication application, the property spring.cloud.stream.instanceCount should be 2 for both applications, and the property spring.cloud.stream.instanceIndex should be 0 and 1 respectively.

These properties are automatically set if we deploy the Spring Cloud Stream applications using Spring Data Flow as described in this article.

6.2. Partitioning

The domain events could be Partitioned messages. This helps when we are scaling up the storage and improving application performance.

The domain event usually has a partition key so that it ends up in the same partition with related messages.

Let's say that we want the log messages to be partitioned by the first letter in the message, which would be the partition key, and grouped into two partitions.

There would be one partition for the log messages that start with A-M and another partition for N-Z. This can be configured using two properties:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression — the expression to partition the payloads
  • spring.cloud.stream.bindings.output.producer.partitionCount — the number of groups

Sometimes the expression to partition is too complex to write it in only one line. For these cases, we can write our custom partition strategy using the property spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Health Indicator

In a microservices context, we also need to detect when a service is down or starts failing. Spring Cloud Stream provides the property management.health.binders.enabled to enable the health indicators for binders.

When running the application, we can query the health status at //:/health.

7. Conclusion

Dalam tutorial ini, kami membentangkan konsep utama Spring Cloud Stream dan menunjukkan cara menggunakannya melalui beberapa contoh mudah melalui RabbitMQ. Maklumat lanjut mengenai Spring Cloud Stream boleh didapati di sini.

Kod sumber untuk artikel ini boleh didapati di GitHub.