Pengenalan kepada RSocket

1. Pengenalan

Dalam tutorial ini, kita akan melihat RSocket pertama dan bagaimana ia membolehkan komunikasi pelayan pelanggan.

2. Apa itu RSocket ?

RSocket adalah protokol komunikasi titik-ke-titik binari yang dimaksudkan untuk digunakan dalam aplikasi yang diedarkan. Dalam erti kata itu, ia memberikan alternatif kepada protokol lain seperti HTTP.

Perbandingan lengkap antara RSocket dan protokol lain adalah di luar ruang lingkup artikel ini. Sebaliknya, kami akan memberi tumpuan kepada ciri utama RSocket: model interaksinya.

RSocket menyediakan empat model interaksi. Dengan ini, kita akan meneroka setiap satu dengan contoh.

3. Pergantungan Maven

RSocket hanya memerlukan dua pergantungan langsung untuk contoh kami:

 io.rsocket rsocket-core 0.11.13   io.rsocket rsocket-transport-netty 0.11.13 

Pergantungan rsocket-core dan rsocket-transport-netty terdapat di Maven Central.

Catatan penting adalah bahawa perpustakaan RSocket sering menggunakan aliran reaktif . Kelas Flux dan Mono digunakan sepanjang artikel ini sehingga pemahaman asas tentang mereka akan sangat membantu.

4. Persediaan Pelayan

Pertama, mari buat kelas Pelayan :

public class Server { private final Disposable server; public Server() { this.server = RSocketFactory.receive() .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl())) .transport(TcpServerTransport.create("localhost", TCP_PORT)) .start() .subscribe(); } public void dispose() { this.server.dispose(); } private class RSocketImpl extends AbstractRSocket {} }

Di sini kita menggunakan RSocketFactory untuk menyiapkan dan mendengar soket TCP. Kami menyampaikan RSocketImpl khusus kami untuk menangani permintaan dari pelanggan. Kami akan menambah kaedah ke RSocketImpl semasa kami pergi.

Seterusnya, untuk memulakan pelayan, kita hanya perlu memasangkannya:

Server server = new Server();

Contoh pelayan tunggal dapat menangani pelbagai sambungan . Hasilnya, hanya satu contoh pelayan yang akan menyokong semua contoh kami.

Apabila kita selesai, kaedah buang akan menghentikan pelayan dan melepaskan port TCP.

4. Model Interaksi

4.1. Permintaan / Respons

RSocket menyediakan model permintaan / respons - setiap permintaan menerima satu respons.

Untuk model ini, kami akan membuat perkhidmatan mudah yang mengembalikan mesej kepada pelanggan.

Mari mulakan dengan menambahkan kaedah untuk pelanjutan AbstractRSocket kami, RSocketImpl :

@Override public Mono requestResponse(Payload payload) { try { return Mono.just(payload); // reflect the payload back to the sender } catch (Exception x) { return Mono.error(x); } }

The requestResponse Cara mengembalikan hasil tunggal untuk setiap permintaan , seperti yang kita boleh lihat dengan Mono jenis tindak balas.

Muatan adalah kelas yang mengandungi kandungan dan metadata mesej . Ia digunakan oleh semua model interaksi. Kandungan muatan adalah binari, tetapi ada kaedah kemudahan yang menyokongkandungan berasaskan String .

Seterusnya, kami dapat membuat kelas pelanggan kami:

public class ReqResClient { private final RSocket socket; public ReqResClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); } public String callBlocking(String string) { return socket .requestResponse(DefaultPayload.create(string)) .map(Payload::getDataUtf8) .block(); } public void dispose() { this.socket.dispose(); } }

Pelanggan menggunakan kaedah RSocketFactory.connect () untuk memulakan sambungan soket dengan pelayan. Kami menggunakan kaedah requestResponse pada soket untuk menghantar muatan ke pelayan .

Muatan kami mengandungi String yang diberikan kepada pelanggan. Apabila orang Monotindak balas tiba kita boleh menggunakan kaedah getDataUtf8 () untuk mengakses kandungan tali tali .

Akhirnya, kita dapat menjalankan ujian integrasi untuk melihat permintaan / tindak balas dalam tindakan. Kami akan menghantar String ke pelayan dan mengesahkan bahawa String yang sama dikembalikan:

@Test public void whenSendingAString_thenRevceiveTheSameString() { ReqResClient client = new ReqResClient(); String string = "Hello RSocket"; assertEquals(string, client.callBlocking(string)); client.dispose(); }

4.2. Api-dan-Lupakan

Dengan model fire-and-lupa, pelanggan tidak akan mendapat respons dari pelayan .

Dalam contoh ini, pelanggan akan menghantar simulasi pengukuran ke pelayan dalam selang 50ms. Pelayan akan menerbitkan ukuran.

Mari tambahkan pengendali api-dan-lupa ke pelayan kami di kelas RSocketImpl :

@Override public Mono fireAndForget(Payload payload) { try { dataPublisher.publish(payload); // forward the payload return Mono.empty(); } catch (Exception x) { return Mono.error(x); } }

Penangan ini kelihatan sangat mirip dengan pengendali permintaan / respons. Walau bagaimanapun, fireAndForget mengembalikan Mono dan bukannya Mono .

The dataPublisher adalah contoh org.reactivestreams.Publisher . Oleh itu, ia menjadikan muatan tersedia untuk pelanggan. Kami akan menggunakannya dalam contoh permintaan / aliran.

Seterusnya, kami akan membuat pelanggan api-dan-lupa:

public class FireNForgetClient { private final RSocket socket; private final List data; public FireNForgetClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); } /** Send binary velocity (float) every 50ms */ public void sendData() { data = Collections.unmodifiableList(generateData()); Flux.interval(Duration.ofMillis(50)) .take(data.size()) .map(this::createFloatPayload) .flatMap(socket::fireAndForget) .blockLast(); } // ... }

Penyediaan soket sama seperti sebelumnya.

Kaedah sendData () menggunakan aliran Flux untuk menghantar banyak mesej. Untuk setiap mesej, kami meminta socket :: fireAndForget .

Kita perlu melanggan respons Mono untuk setiap mesej . Sekiranya kita lupa untuk melanggan maka soket :: fireAndForget tidak akan dilaksanakan.

The flatMap operator makes sure the Void responses are passed to the subscriber, while the blockLast operator acts as the subscriber.

We're going to wait until the next section to run the fire-and-forget test. At that point, we'll create a request/stream client to receive the data that was pushed by the fire-and-forget client.

4.3. Request/Stream

In the request/stream model, a single request may receive multiple responses. To see this in action we can build upon the fire-and-forget example. To do that, let's request a stream to retrieve the measurements we sent in the previous section.

As before, let's start by adding a new listener to the RSocketImpl on the server:

@Override public Flux requestStream(Payload payload) { return Flux.from(dataPublisher); }

The requestStream handler returns a Flux stream. As we recall from the previous section, the fireAndForget handler published incoming data to the dataPublisher. Now, we'll create a Flux stream using that same dataPublisher as the event source. By doing this the measurement data will flow asynchronously from our fire-and-forget client to our request/stream client.

Let's create the request/stream client next:

public class ReqStreamClient { private final RSocket socket; public ReqStreamClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); } public Flux getDataStream() { return socket .requestStream(DefaultPayload.create(DATA_STREAM_NAME)) .map(Payload::getData) .map(buf -> buf.getFloat()) .onErrorReturn(null); } public void dispose() { this.socket.dispose(); } }

We connect to the server in the same way as our previous clients.

In getDataStream()we use socket.requestStream() to receive a Flux stream from the server. From that stream, we extract the Float values from the binary data. Finally, the stream is returned to the caller, allowing the caller to subscribe to it and process the results.

Now let's test. We'll verify the round trip from fire-and-forget to request/stream.

We can assert that each value is received in the same order as it was sent. Then, we can assert that we receive the same number of values that were sent:

@Test public void whenSendingStream_thenReceiveTheSameStream() { FireNForgetClient fnfClient = new FireNForgetClient(); ReqStreamClient streamClient = new ReqStreamClient(); List data = fnfClient.getData(); List dataReceived = new ArrayList(); Disposable subscription = streamClient.getDataStream() .index() .subscribe( tuple -> { assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2()); dataReceived.add(tuple.getT2()); }, err -> LOG.error(err.getMessage()) ); fnfClient.sendData(); // ... dispose client & subscription assertEquals("Wrong data count received", data.size(), dataReceived.size()); }

4.4. Channel

The channel model provides bidirectional communication. In this model, message streams flow asynchronously in both directions.

Let's create a simple game simulation to test this. In this game, each side of the channel will become a player. As the game runs, these players will send messages to the other side at random time intervals. The opposite side will react to the messages.

Firstly, we'll create the handler on the server. Like before, we add to the RSocketImpl:

@Override public Flux requestChannel(Publisher payloads) { Flux.from(payloads) .subscribe(gameController::processPayload); return Flux.from(gameController); }

The requestChannel handler has Payload streams for both input and output. The Publisher input parameter is a stream of payloads received from the client. As they arrive, these payloads are passed to the gameController::processPayload function.

In response, we return a different Flux stream back to the client. This stream is created from our gameController, which is also a Publisher.

Here is a summary of the GameController class:

public class GameController implements Publisher { @Override public void subscribe(Subscriber subscriber) { // send Payload messages to the subscriber at random intervals } public void processPayload(Payload payload) { // react to messages from the other player } }

When the GameController receives a subscriber it begins sending messages to that subscriber.

Next, let's create the client:

public class ChannelClient { private final RSocket socket; private final GameController gameController; public ChannelClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); this.gameController = new GameController("Client Player"); } public void playGame() { socket.requestChannel(Flux.from(gameController)) .doOnNext(gameController::processPayload) .blockLast(); } public void dispose() { this.socket.dispose(); } }

As we have seen in our previous examples, the client connects to the server in the same way as the other clients.

The client creates its own instance of the GameController.

We use socket.requestChannel() to send our Payload stream to the server. The server responds with a Payload stream of its own.

Sebagai muatan yang diterima dari pelayan, kami menyerahkannya ke pengendali gameController :: processPayload kami .

Dalam simulasi permainan kami, pelanggan dan pelayan adalah gambar cermin antara satu sama lain. Artinya, setiap pihak mengirimkan aliran Payload dan menerima aliran Payload dari ujung yang lain .

Aliran berjalan secara bebas, tanpa penyegerakan.

Akhirnya, mari kita jalankan simulasi dalam ujian:

@Test public void whenRunningChannelGame_thenLogTheResults() { ChannelClient client = new ChannelClient(); client.playGame(); client.dispose(); }

5. Kesimpulan

Dalam artikel pengantar ini, kami telah meneroka model interaksi yang disediakan oleh RSocket. Kod sumber lengkap contoh boleh didapati di repositori Github kami.

Pastikan untuk melihat laman web RSocket untuk perbincangan yang lebih mendalam. Khususnya, FAQ dan dokumen Motivasi memberikan latar belakang yang baik.