WebSockets dengan Play Framework dan Akka

1. Gambaran keseluruhan

Apabila kita ingin pelanggan web kita berdialog dengan pelayan kita, maka WebSockets dapat menjadi penyelesaian yang berguna. WebSockets mengekalkan sambungan dupleks penuh yang berterusan. Ini memberi kita keupayaan untuk menghantar mesej dua arah antara pelayan dan pelanggan kami.

Dalam tutorial ini, kita akan belajar bagaimana menggunakan WebSockets dengan Akka dalam Play Framework.

2. Persediaan

Mari sediakan aplikasi sembang ringkas. Pengguna akan menghantar mesej ke pelayan, dan pelayan akan bertindak balas dengan mesej dari JSONPlaceholder.

2.1. Menyiapkan Aplikasi Rangka Kerja Play

Kami akan membina aplikasi ini menggunakan Play Framework.

Mari ikuti arahan dari Introduction to Play di Java untuk mengatur dan menjalankan aplikasi Play Framework yang mudah.

2.2. Menambah Fail JavaScript yang Diperlukan

Juga, kita perlu bekerjasama dengan JavaScript untuk skrip sisi pelanggan. Ini akan membolehkan kita menerima mesej baru yang ditolak dari pelayan. Kami akan menggunakan perpustakaan jQuery untuk ini.

Mari tambahkan jQuery ke bahagian bawah fail app / views / i ndex.scala.html :

2.3. Menubuhkan Akka

Akhirnya, kami akan menggunakan Akka untuk mengendalikan sambungan WebSocket di bahagian pelayan.

Mari pergi ke fail build.sbt dan tambahkan kebergantungan.

Kita perlu menambahkan pergantungan akka -pelakon dan akka-testkit :

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion

Kami memerlukannya agar dapat menggunakan dan menguji kod Kerangka Akka.

Seterusnya, kita akan menggunakan aliran Akka. Oleh itu, mari tambah pergantungan aliran-akka :

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion

Terakhir, kita perlu memanggil titik akhir rehat dari pelakon Akka. Untuk ini, kita memerlukan pergantungan akka-http . Apabila kita melakukannya, titik akhir akan mengembalikan data JSON yang harus kita deserialisasi, jadi kita juga perlu menambahkan ketergantungan akka-http-jackson :

libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion

Dan sekarang kita sudah bersedia. Mari lihat bagaimana menjadikan WebSockets berfungsi!

3. Mengendalikan WebSockets Dengan Akka Pelakon

Mekanisme pengendalian WebSocket Play dibina di sekitar aliran Akka. WebSocket dimodelkan sebagai Aliran. Jadi, mesej WebSocket yang masuk dimasukkan ke dalam aliran, dan mesej yang dihasilkan oleh aliran dikirim ke klien.

Untuk mengendalikan WebSocket menggunakan Aktor, kita memerlukan Play utiliti ActorFlow yang mengubah ActorRef menjadi aliran. Ini terutama memerlukan beberapa kod Java, dengan sedikit konfigurasi.

3.1. Kaedah Pengawal WebSocket

Pertama, kita memerlukan contoh Materializer . Materializer adalah kilang untuk mesin pelaksanaan aliran.

Kita perlu menyuntik ActorSystem dan Materializer ke dalam pengawal app / pengawal / HomeController.java :

private ActorSystem actorSystem; private Materializer materializer; @Inject public HomeController( ActorSystem actorSystem, Materializer materializer) { this.actorSystem = actorSystem; this.materializer = materializer; }

Mari sekarang tambahkan kaedah pengawal soket:

public WebSocket socket() { return WebSocket.Json .acceptOrResult(this::createActorFlow); }

Di sini kita memanggil fungsi acceptOrResult yang mengambil tajuk permintaan dan mengembalikan masa depan. Masa depan yang dikembalikan adalah aliran untuk menangani mesej WebSocket.

Sebagai gantinya, kami dapat menolak permintaan tersebut dan mengembalikan hasil penolakan.

Sekarang, mari buat aliran:

private CompletionStage
    
     > createActorFlow(Http.RequestHeader request) { return CompletableFuture.completedFuture( F.Either.Right(createFlowForActor())); }
    

The F kelas dalam Rangka Kerja Main mentakrifkan satu set pembantu gaya pengaturcaraan berfungsi. Dalam kes ini, kami menggunakan F. Entah . Betul untuk menerima sambungan dan mengembalikan aliran.

Katakan kita mahu menolak sambungan apabila pelanggan tidak disahkan.

Untuk ini, kami dapat memeriksa apakah nama pengguna ditetapkan dalam sesi. Dan jika tidak, kami memutuskan sambungan dengan HTTP 403 Forbidden:

private CompletionStage
    
     > createActorFlow2(Http.RequestHeader request) { return CompletableFuture.completedFuture( request.session() .getOptional("username") .map(username -> F.Either.
     
      Right( createFlowForActor())) .orElseGet(() -> F.Either.Left(forbidden()))); }
     
    

Kami menggunakan F.Either.Kiri untuk menolak sambungan dengan cara yang sama kerana kami menyediakan aliran dengan F.Either.Right .

Akhirnya, kami menghubungkan aliran ke pelakon yang akan menangani mesej:

private Flow createFlowForActor() { return ActorFlow.actorRef(out -> Messenger.props(out), actorSystem, materializer); }

The ActorFlow.actorRef creates a flow that is handled by the Messenger actor.

3.2. The routes File

Now, let's add the routes definitions for the controller methods in conf/routes:

GET / controllers.HomeController.index(request: Request) GET /chat controllers.HomeController.socket GET /chat/with/streams controllers.HomeController.akkaStreamsSocket GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)

These route definitions map incoming HTTP requests to controller action methods as explained in Routing in Play Applications in Java.

3.3. The Actor Implementation

The most important part of the actor class is the createReceive method which determines which messages the actor can handle:

@Override public Receive createReceive() { return receiveBuilder() .match(JsonNode.class, this::onSendMessage) .matchAny(o -> log.error("Received unknown message: {}", o.getClass())) .build(); }

The actor will forward all messages matching the JsonNode class to the onSendMessage handler method:

private void onSendMessage(JsonNode jsonNode) { RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode); String message = requestDTO.getMessage().toLowerCase(); //.. processMessage(requestDTO); }

Then the handler will respond to every message using the processMessage method:

private void processMessage(RequestDTO requestDTO) { CompletionStage responseFuture = getRandomMessage(); responseFuture.thenCompose(this::consumeHttpResponse) .thenAccept(messageDTO -> out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf())); }

3.4. Consuming Rest API with Akka HTTP

We'll send HTTP requests to the dummy message generator at JSONPlaceholder Posts. When the response arrives, we send the response to the client by writing it out.

Let's have a method that calls the endpoint with a random post id:

private CompletionStage getRandomMessage() { int postId = ThreadLocalRandom.current().nextInt(0, 100); return Http.get(getContext().getSystem()) .singleRequest(HttpRequest.create( "//jsonplaceholder.typicode.com/posts/" + postId)); }

We're also processing the HttpResponse we get from calling the service in order to get the JSON response:

private CompletionStage consumeHttpResponse( HttpResponse httpResponse) { Materializer materializer = Materializer.matFromSystem(getContext().getSystem()); return Jackson.unmarshaller(MessageDTO.class) .unmarshal(httpResponse.entity(), materializer) .thenApply(messageDTO -> { log.info("Received message: {}", messageDTO); discardEntity(httpResponse, materializer); return messageDTO; }); }

The MessageConverter class is a utility for converting between JsonNode and the DTOs:

public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) { ObjectMapper mapper = new ObjectMapper(); return mapper.convertValue(jsonNode, MessageDTO.class); }

Next, we need to discard the entity. The discardEntityBytes convenience method serves the purpose of easily discarding the entity if it has no purpose for us.

Let's see how to discard the bytes:

private void discardEntity( HttpResponse httpResponse, Materializer materializer) { HttpMessage.DiscardedEntity discarded = httpResponse.discardEntityBytes(materializer); discarded.completionStage() .whenComplete((done, ex) -> log.info("Entity discarded completely!")); }

Now having done the handling of the WebSocket, let's see how we can set up a client for this using HTML5 WebSockets.

4. Setting up the WebSocket Client

For our client, let's build a simple web-based chat application.

4.1. The Controller Action

We need to define a controller action that renders the index page. We'll put this in the controller class app.controllers.HomeController:

public Result index(Http.Request request) { String url = routes.HomeController.socket() .webSocketURL(request); return ok(views.html.index.render(url)); } 

4.2. The Template Page

Now, let's head over to the app/views/ndex.scala.html page and add a container for the received messages and a form to capture a new message:

 F   Send 

We'll also need to pass in the URL for the WebSocket controller action by declaring this parameter at the top of the app/views/index.scala.htmlpage:

@(url: String)

4.3. WebSocket Event Handlers in JavaScript

And now, we can add the JavaScript to handle the WebSocket events. For simplicity, we'll add the JavaScript functions at the bottom of the app/views/index.scala.html page.

Let's declare the event handlers:

var webSocket; var messageInput; function init() { initWebSocket(); } function initWebSocket() { webSocket = new WebSocket("@url"); webSocket.onopen = onOpen; webSocket.onclose = onClose; webSocket.onmessage = onMessage; webSocket.onerror = onError; }

Let's add the handlers themselves:

function onOpen(evt) { writeToScreen("CONNECTED"); } function onClose(evt) { writeToScreen("DISCONNECTED"); } function onError(evt) { writeToScreen("ERROR: " + JSON.stringify(evt)); } function onMessage(evt) { var receivedData = JSON.parse(evt.data); appendMessageToView("Server", receivedData.body); }

Then, to present the output, we'll use the functions appendMessageToView and writeToScreen:

function appendMessageToView(title, message) { $("#messageContent").append("

" + title + ": " + message + "

"); } function writeToScreen(message) { console.log("New message: ", message); }

4.4. Running and Testing the Application

We're ready to test the application, so let's run it:

cd websockets sbt run

With the application running, we can chat with the server by visiting //localhost:9000:

Every time we type a message and hit Send the server will immediately respond with some lorem ipsum from the JSON Placeholder service.

5. Handling WebSockets Directly with Akka Streams

If we are processing a stream of events from a source and sending these to the client, then we can model this around Akka streams.

Let's see how we can use Akka streams in an example where the server sends messages every two seconds.

We'll start with the WebSocket action in the HomeController:

public WebSocket akkaStreamsSocket() { return WebSocket.Json.accept(request -> { Sink in = Sink.foreach(System.out::println); MessageDTO messageDTO = new MessageDTO("1", "1", "Title", "Test Body"); Source out = Source.tick( Duration.ofSeconds(2), Duration.ofSeconds(2), MessageConverter.messageToJsonNode(messageDTO) ); return Flow.fromSinkAndSource(in, out); }); }

The Source#tick method takes three parameters. The first is the initial delay before the first tick is processed, and the second is the interval between successive ticks. We've set both values to two seconds in the above snippet. The third parameter is an object that should be returned on each tick.

To see this in action, we need to modify the URL in the index action and make it point to the akkaStreamsSocket endpoint:

String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);

And now refreshing the page, we'll see a new entry every two seconds:

6. Terminating the Actor

At some point, we'll need to shut down the chat, either through a user request or through a timeout.

6.1. Handling Actor Termination

How do we detect when a WebSocket has been closed?

Play will automatically close the WebSocket when the actor that handles the WebSocket terminates. So we can handle this scenario by implementing the Actor#postStop method:

@Override public void postStop() throws Exception { log.info("Messenger actor stopped at {}", OffsetDateTime.now() .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)); }

6.2. Manually Terminating the Actor

Further, if we must stop the actor, we can send a PoisonPill to the actor. In our example application, we should be able to handle a “stop” request.

Let's see how to do this in the onSendMessage method:

private void onSendMessage(JsonNode jsonNode) { RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode); String message = requestDTO.getMessage().toLowerCase(); if("stop".equals(message)) { MessageDTO messageDTO = createMessageDTO("1", "1", "Stop", "Stopping actor"); out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()); self().tell(PoisonPill.getInstance(), getSelf()); } else { log.info("Actor received. {}", requestDTO); processMessage(requestDTO); } }

When we receive a message, we check if it's a stop request. If it is, we send the PoisonPill. Otherwise, we process the request.

7. Configuration Options

We can configure several options in terms of how the WebSocket should be handled. Let's look at a few.

7.1. WebSocket Frame Length

WebSocket communication involves the exchange of data frames.

The WebSocket frame length is configurable. We have the option to adjust the frame length to our application requirements.

Configuring a shorter frame length may help reduce denial of service attacks that use long data frames. We can change the frame length for the application by specifying the max length in application.conf:

play.server.websocket.frame.maxLength = 64k

We can also set this configuration option by specifying the max length as a command-line parameter:

sbt -Dwebsocket.frame.maxLength=64k run

7.2. Connection Idle Timeout

By default, the actor we use to handle the WebSocket is terminated after one minute. This is because the Play server in which our application is running has a default idle timeout of 60 seconds. This means that all connections that do not receive a request in sixty seconds are closed automatically.

We can change this through configuration options. Let's head over to our application.conf and change the server to have no idle timeout:

play.server.http.idleTimeout = "infinite"

Or we can pass in the option as command-line arguments:

sbt -Dhttp.idleTimeout=infinite run

We can also configure this by specifying devSettings in build.sbt.

Config options specified in build.sbt are only used in development, they will be ignored in production:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"

Sekiranya kita menjalankan semula aplikasi, pelakon tidak akan berhenti.

Kita boleh menukar nilai menjadi beberapa saat:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"

Kami dapat mengetahui lebih lanjut mengenai pilihan konfigurasi yang tersedia dalam dokumentasi Play Framework.

8. Kesimpulannya

Dalam tutorial ini, kami menerapkan WebSockets dalam Play Framework dengan pelakon Akka dan Akka Streams.

Kami kemudian terus melihat cara menggunakan pelakon Akka secara langsung dan kemudian melihat bagaimana Akka Stream dapat disiapkan untuk mengendalikan sambungan WebSocket.

Di pihak klien, kami menggunakan JavaScript untuk menangani acara WebSocket kami.

Akhirnya, kami melihat beberapa pilihan konfigurasi yang boleh kami gunakan.

Seperti biasa, kod sumber untuk tutorial ini boleh didapati di GitHub.