RSocket — Getting Started

Vinoth Selvaraj
5 min readFeb 7, 2021

RSocket is a binary & message passing protocol for client & server application development / inter-Microservices communication. It supports TCP, WebSockets and Aeron (UDP).

It supports following interaction models.

Need For RSocket:

Microservices are the popular way to design distributed systems. A big monolith application is broken down into multiple independent Microservices. These Microservices have few advantages compared to traditional monolith applications — like easy to deploy, scale, reusable etc. Microservices do not come alone. There will be always more than 1 service. These Microservices communicate with each other mostly with REST using HTTP/1.1 protocol by exchanging JSON.

If an user tries to send a request to order-service to order a product, order-service might internally send multiple requests to other services to fulfill the request. One request from user might trigger multiple internal requests among microservices in an application design.

REST is simple and very easy to use. REST is great for browser. Easy to test our APIs. Developers love this. However most of our current implementation is with HTTP/1.1 which has following issues.

  • HTTP/1.1 is textual & Heavy. Sometimes microservices exchange information by using huge payload.
  • HTTP is stateless. So additional information is sent via headers which are not compressed.
  • HTTP/1.1 is unary — that is — you send a request and get a response. You can not send another request until you receive the response.
  • HTTP request requires a 3 way message exchange to set up a TCP connection first which is time consuming. This all can affect the overall performance of the microservices design.

REST is good between browser and back-end. But we need something better than REST for inter microservices communication to avoid above mentioned issues.

RSocket vs gRPC:

We had already discussed gRPC which is another replacement for REST based communication in this site here. So, you might want to compare RSocket vs gRPC.

gRPC

  • An action oriented modern RPC framework
  • Works on top of HTTP2. Works on layer 7 of OSI model.
  • Not a protocol

RSocket

  • Connection oriented, message passing protocol
  • Works on layer 5/6. It is a little bit low level compared to gRPC. So performance should be better.

RSocket With Java:

Include the below maven dependencies to get started with rsocket.

<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>1.0.1</version>
</dependency>

In this article, we would directly create server, clients without any framework integration. In the following articles, We would integrate with Spring Boot to simplify rsocket development.

RSocket — Interaction Models:

We would be taking a look at following method implementations of RSocket interface.

  • fireAndForget
  • requestResponse
  • requestStream
private static class SimpleRSocket implements RSocket {
@Override
public Mono<Void> fireAndForget(Payload payload) {
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
}

@Override
public Flux<Payload> requestStream(Payload payload) {
}
}

Do note that RSocket is a binary protocol. The request payload and response would be in the ByteBuffer format. To keep things simple, we would be using String for this article. We can use complex data types when we integrate with Spring Boot.

fireAndForget:

  • Server receives the request and prints the information on the console. It does not have to response back to the client.
@Override
public Mono<Void> fireAndForget(Payload payload) {
// just print the received string
var str = payload.getDataUtf8();
System.out.println("Received :: " + str);
return Mono.empty();
}

requestResponse:

  • Server receives the request and processes and responds back. In this case, we simply convert the string to upperCase.
@Override
public Mono<Payload> requestResponse(Payload payload) {
// just convert to upper case
var str = payload.getDataUtf8();
return Mono.just(DefaultPayload.create(str.toUpperCase()));
}

requestStream:

  • Server receives the request, processes and sends multiple response back to the client. A stream of response for a single request.
  • In our case, we split the input into char array and respond.
@Override
public Flux<Payload> requestStream(Payload payload) {
// convert the given str to char array and return
var str = payload.getDataUtf8();
return Flux.fromStream(str.chars().mapToObj(i -> (char) i))
.map(Object::toString)
.map(DefaultPayload::create);
}

Socket Acceptor Implementation:

Create a SocketAcceptor implementation as shown here with our RSocket implementation.

public class SimpleRSocketAcceptor implements SocketAcceptor {

@Override
public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
return Mono.just(new SimpleRSocket());
}

}

RSocket With Java — Server Setup:

Our RSocket server is very simple. It listens on port 6565. We add the socket acceptor instance, so that our server will know how to process the requests.

public class Server {

private static Disposable disposable;

public static void start() {
RSocketServer rSocketServer = RSocketServer.create();
rSocketServer.acceptor(new SimpleRSocketAcceptor());
rSocketServer.payloadDecoder(PayloadDecoder.ZERO_COPY);
disposable = rSocketServer.bind(TcpServerTransport.create(6565))
.subscribe();
}

public static void stop(){
disposable.dispose();
}

}

RSocket With Java — Client Setup:

Our server is ready! Lets create JUnit test class to test our RSocket method implementations.

private static RSocket rSocket;

@BeforeClass
public static void setUpClient(){
Server.start();
rSocket = RSocketConnector.connectWith(TcpClientTransport.create(6565))
.block();
}
  • This method provides us the test data
private Flux<Payload> getRequestPayload(){
return Flux.just("hi", "hello", "how", "are", "you")
.delayElements(Duration.ofSeconds(1))
.map(DefaultPayload::create);
}

fireAndForget

@Test
public void fireAndForget(){
this.getRequestPayload()
.flatMap(payload -> rSocket.fireAndForget(payload))
.blockLast(Duration.ofMinutes(1));
}
// output

Received :: hi
Received :: hello
Received :: how
Received :: are
Received :: you

requestResponse

@Test
public void requestAndResponse(){
this.getRequestPayload()
.flatMap(payload -> rSocket.requestResponse(payload))
.doOnNext(response -> System.out.println("Response from server :: " + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
// output

Response from server :: HI
Response from server :: HELLO
Response from server :: HOW
Response from server :: ARE
Response from server :: YOU

requestAndResponseStream

@Test
public void requestAndResponseStream(){
this.getRequestPayload()
.flatMap(payload -> rSocket.requestStream(payload))
.doOnNext(response -> System.out.println("Response from server :: " + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
// output

Response from server :: h
Response from server :: i
Response from server :: h
Response from server :: e
Response from server :: l
Response from server :: l
Response from server :: o
Response from server :: h
Response from server :: o
Response from server :: w
Response from server :: a
Response from server :: r
Response from server :: e
Response from server :: y
Response from server :: o
Response from server :: u

Summary:

We were able to successfully demonstrate RSocket with Java. In the following articles, Lets see Spring boot integration, performance comparison between RSocket vs gRPC etc.

Learn more about RSocket.

The complete source code is available here.

Happy learning :)

--

--

Vinoth Selvaraj

Principal Software Engineer — passionate about software architectural design, microservices.