RSocket — Getting Started

Need For RSocket:

Microservices are the popular way to design distributed systems. A big monolith application is broken down into multiple independent Microservices. These Microservices have few advantages compared to traditional monolith applications — like easy to deploy, scale, reusable etc. Microservices do not come alone. There will be always more than 1 service. These Microservices communicate with each other mostly with REST using HTTP/1.1 protocol by exchanging JSON.

  • HTTP/1.1 is textual & Heavy. Sometimes microservices exchange information by using huge payload.
  • HTTP is stateless. So additional information is sent via headers which are not compressed.
  • HTTP/1.1 is unary — that is — you send a request and get a response. You can not send another request until you receive the response.
  • HTTP request requires a 3 way message exchange to set up a TCP connection first which is time consuming. This all can affect the overall performance of the microservices design.

RSocket vs gRPC:

We had already discussed gRPC which is another replacement for REST based communication in this site here. So, you might want to compare RSocket vs gRPC.

  • An action oriented modern RPC framework
  • Works on top of HTTP2. Works on layer 7 of OSI model.
  • Not a protocol
  • Connection oriented, message passing protocol
  • Works on layer 5/6. It is a little bit low level compared to gRPC. So performance should be better.

RSocket With Java:

Include the below maven dependencies to get started with rsocket.

<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>1.0.1</version>
</dependency>

RSocket — Interaction Models:

We would be taking a look at following method implementations of RSocket interface.

  • fireAndForget
  • requestResponse
  • requestStream
private static class SimpleRSocket implements RSocket {
@Override
public Mono<Void> fireAndForget(Payload payload) {
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
}

@Override
public Flux<Payload> requestStream(Payload payload) {
}
}

fireAndForget:

  • Server receives the request and prints the information on the console. It does not have to response back to the client.
@Override
public Mono<Void> fireAndForget(Payload payload) {
// just print the received string
var str = payload.getDataUtf8();
System.out.println("Received :: " + str);
return Mono.empty();
}

requestResponse:

  • Server receives the request and processes and responds back. In this case, we simply convert the string to upperCase.
@Override
public Mono<Payload> requestResponse(Payload payload) {
// just convert to upper case
var str = payload.getDataUtf8();
return Mono.just(DefaultPayload.create(str.toUpperCase()));
}

requestStream:

  • Server receives the request, processes and sends multiple response back to the client. A stream of response for a single request.
  • In our case, we split the input into char array and respond.
@Override
public Flux<Payload> requestStream(Payload payload) {
// convert the given str to char array and return
var str = payload.getDataUtf8();
return Flux.fromStream(str.chars().mapToObj(i -> (char) i))
.map(Object::toString)
.map(DefaultPayload::create);
}

Socket Acceptor Implementation:

Create a SocketAcceptor implementation as shown here with our RSocket implementation.

public class SimpleRSocketAcceptor implements SocketAcceptor {

@Override
public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
return Mono.just(new SimpleRSocket());
}

}

RSocket With Java — Server Setup:

Our RSocket server is very simple. It listens on port 6565. We add the socket acceptor instance, so that our server will know how to process the requests.

public class Server {

private static Disposable disposable;

public static void start() {
RSocketServer rSocketServer = RSocketServer.create();
rSocketServer.acceptor(new SimpleRSocketAcceptor());
rSocketServer.payloadDecoder(PayloadDecoder.ZERO_COPY);
disposable = rSocketServer.bind(TcpServerTransport.create(6565))
.subscribe();
}

public static void stop(){
disposable.dispose();
}

}

RSocket With Java — Client Setup:

Our server is ready! Lets create JUnit test class to test our RSocket method implementations.

private static RSocket rSocket;

@BeforeClass
public static void setUpClient(){
Server.start();
rSocket = RSocketConnector.connectWith(TcpClientTransport.create(6565))
.block();
}
  • This method provides us the test data
private Flux<Payload> getRequestPayload(){
return Flux.just("hi", "hello", "how", "are", "you")
.delayElements(Duration.ofSeconds(1))
.map(DefaultPayload::create);
}

fireAndForget

@Test
public void fireAndForget(){
this.getRequestPayload()
.flatMap(payload -> rSocket.fireAndForget(payload))
.blockLast(Duration.ofMinutes(1));
}
// output

Received :: hi
Received :: hello
Received :: how
Received :: are
Received :: you

requestResponse

@Test
public void requestAndResponse(){
this.getRequestPayload()
.flatMap(payload -> rSocket.requestResponse(payload))
.doOnNext(response -> System.out.println("Response from server :: " + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
// output

Response from server :: HI
Response from server :: HELLO
Response from server :: HOW
Response from server :: ARE
Response from server :: YOU

requestAndResponseStream

@Test
public void requestAndResponseStream(){
this.getRequestPayload()
.flatMap(payload -> rSocket.requestStream(payload))
.doOnNext(response -> System.out.println("Response from server :: " + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
// output

Response from server :: h
Response from server :: i
Response from server :: h
Response from server :: e
Response from server :: l
Response from server :: l
Response from server :: o
Response from server :: h
Response from server :: o
Response from server :: w
Response from server :: a
Response from server :: r
Response from server :: e
Response from server :: y
Response from server :: o
Response from server :: u

Summary:

We were able to successfully demonstrate RSocket with Java. In the following articles, Lets see Spring boot integration, performance comparison between RSocket vs gRPC etc.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Vinoth Selvaraj

Vinoth Selvaraj

Principal Software Engineer — passionate about software architectural design, microservices.