RSocket — Getting Started

Need For RSocket:

  • HTTP/1.1 is textual & Heavy. Sometimes microservices exchange information by using huge payload.
  • HTTP is stateless. So additional information is sent via headers which are not compressed.
  • HTTP/1.1 is unary — that is — you send a request and get a response. You can not send another request until you receive the response.
  • HTTP request requires a 3 way message exchange to set up a TCP connection first which is time consuming. This all can affect the overall performance of the microservices design.

RSocket vs gRPC:

  • An action oriented modern RPC framework
  • Works on top of HTTP2. Works on layer 7 of OSI model.
  • Not a protocol
  • Connection oriented, message passing protocol
  • Works on layer 5/6. It is a little bit low level compared to gRPC. So performance should be better.

RSocket With Java:

<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>1.0.1</version>
</dependency>

RSocket — Interaction Models:

  • fireAndForget
  • requestResponse
  • requestStream
private static class SimpleRSocket implements RSocket {
@Override
public Mono<Void> fireAndForget(Payload payload) {
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
}

@Override
public Flux<Payload> requestStream(Payload payload) {
}
}

fireAndForget:

  • Server receives the request and prints the information on the console. It does not have to response back to the client.
@Override
public Mono<Void> fireAndForget(Payload payload) {
// just print the received string
var str = payload.getDataUtf8();
System.out.println("Received :: " + str);
return Mono.empty();
}

requestResponse:

  • Server receives the request and processes and responds back. In this case, we simply convert the string to upperCase.
@Override
public Mono<Payload> requestResponse(Payload payload) {
// just convert to upper case
var str = payload.getDataUtf8();
return Mono.just(DefaultPayload.create(str.toUpperCase()));
}

requestStream:

  • Server receives the request, processes and sends multiple response back to the client. A stream of response for a single request.
  • In our case, we split the input into char array and respond.
@Override
public Flux<Payload> requestStream(Payload payload) {
// convert the given str to char array and return
var str = payload.getDataUtf8();
return Flux.fromStream(str.chars().mapToObj(i -> (char) i))
.map(Object::toString)
.map(DefaultPayload::create);
}

Socket Acceptor Implementation:

public class SimpleRSocketAcceptor implements SocketAcceptor {

@Override
public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
return Mono.just(new SimpleRSocket());
}

}

RSocket With Java — Server Setup:

public class Server {

private static Disposable disposable;

public static void start() {
RSocketServer rSocketServer = RSocketServer.create();
rSocketServer.acceptor(new SimpleRSocketAcceptor());
rSocketServer.payloadDecoder(PayloadDecoder.ZERO_COPY);
disposable = rSocketServer.bind(TcpServerTransport.create(6565))
.subscribe();
}

public static void stop(){
disposable.dispose();
}

}

RSocket With Java — Client Setup:

private static RSocket rSocket;

@BeforeClass
public static void setUpClient(){
Server.start();
rSocket = RSocketConnector.connectWith(TcpClientTransport.create(6565))
.block();
}
  • This method provides us the test data
private Flux<Payload> getRequestPayload(){
return Flux.just("hi", "hello", "how", "are", "you")
.delayElements(Duration.ofSeconds(1))
.map(DefaultPayload::create);
}

fireAndForget

@Test
public void fireAndForget(){
this.getRequestPayload()
.flatMap(payload -> rSocket.fireAndForget(payload))
.blockLast(Duration.ofMinutes(1));
}
// output

Received :: hi
Received :: hello
Received :: how
Received :: are
Received :: you

requestResponse

@Test
public void requestAndResponse(){
this.getRequestPayload()
.flatMap(payload -> rSocket.requestResponse(payload))
.doOnNext(response -> System.out.println("Response from server :: " + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
// output

Response from server :: HI
Response from server :: HELLO
Response from server :: HOW
Response from server :: ARE
Response from server :: YOU

requestAndResponseStream

@Test
public void requestAndResponseStream(){
this.getRequestPayload()
.flatMap(payload -> rSocket.requestStream(payload))
.doOnNext(response -> System.out.println("Response from server :: " + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
// output

Response from server :: h
Response from server :: i
Response from server :: h
Response from server :: e
Response from server :: l
Response from server :: l
Response from server :: o
Response from server :: h
Response from server :: o
Response from server :: w
Response from server :: a
Response from server :: r
Response from server :: e
Response from server :: y
Response from server :: o
Response from server :: u

Summary:

--

--

--

Principal Software Engineer — passionate about software architectural design, microservices.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Boto3 with Jupyter Notebook

5 Simple Steps to Learn to Code (Quickly)

Follow these 5 simple steps if you want to learn to code quickly and easily.

4 Steps To A Minimalist Mac Desktop

Building Navigation tree without Graph traversal.

Anchor Navigation Tree

Indexing Strings in Rust and TypeScript: A Case Study of Strings

Day #22 of Python

I Wish I Knew Unreal Engine Better.

Are you Serialize Clark?

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Vinoth Selvaraj

Vinoth Selvaraj

Principal Software Engineer — passionate about software architectural design, microservices.

More from Medium

Highlighting — Java API Client

TIL 0624 Head First Java Ch.5 — Extra-Strength Methods

Springboot — Test DAO/Repository layer with no main class in the library using Junit5.

RESTful API Evolution With HATEOAS