RSocket — Getting Started

Need For RSocket:

RSocket vs gRPC:

RSocket With Java:

<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>1.0.1</version>
</dependency>

RSocket — Interaction Models:

private static class SimpleRSocket implements RSocket {
@Override
public Mono<Void> fireAndForget(Payload payload) {
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
}

@Override
public Flux<Payload> requestStream(Payload payload) {
}
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
// just print the received string
var str = payload.getDataUtf8();
System.out.println("Received :: " + str);
return Mono.empty();
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
// just convert to upper case
var str = payload.getDataUtf8();
return Mono.just(DefaultPayload.create(str.toUpperCase()));
}
@Override
public Flux<Payload> requestStream(Payload payload) {
// convert the given str to char array and return
var str = payload.getDataUtf8();
return Flux.fromStream(str.chars().mapToObj(i -> (char) i))
.map(Object::toString)
.map(DefaultPayload::create);
}

Socket Acceptor Implementation:

public class SimpleRSocketAcceptor implements SocketAcceptor {

@Override
public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
return Mono.just(new SimpleRSocket());
}

}

RSocket With Java — Server Setup:

public class Server {

private static Disposable disposable;

public static void start() {
RSocketServer rSocketServer = RSocketServer.create();
rSocketServer.acceptor(new SimpleRSocketAcceptor());
rSocketServer.payloadDecoder(PayloadDecoder.ZERO_COPY);
disposable = rSocketServer.bind(TcpServerTransport.create(6565))
.subscribe();
}

public static void stop(){
disposable.dispose();
}

}

RSocket With Java — Client Setup:

private static RSocket rSocket;

@BeforeClass
public static void setUpClient(){
Server.start();
rSocket = RSocketConnector.connectWith(TcpClientTransport.create(6565))
.block();
}
private Flux<Payload> getRequestPayload(){
return Flux.just("hi", "hello", "how", "are", "you")
.delayElements(Duration.ofSeconds(1))
.map(DefaultPayload::create);
}
@Test
public void fireAndForget(){
this.getRequestPayload()
.flatMap(payload -> rSocket.fireAndForget(payload))
.blockLast(Duration.ofMinutes(1));
}
// output

Received :: hi
Received :: hello
Received :: how
Received :: are
Received :: you
@Test
public void requestAndResponse(){
this.getRequestPayload()
.flatMap(payload -> rSocket.requestResponse(payload))
.doOnNext(response -> System.out.println("Response from server :: " + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
// output

Response from server :: HI
Response from server :: HELLO
Response from server :: HOW
Response from server :: ARE
Response from server :: YOU
@Test
public void requestAndResponseStream(){
this.getRequestPayload()
.flatMap(payload -> rSocket.requestStream(payload))
.doOnNext(response -> System.out.println("Response from server :: " + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
// output

Response from server :: h
Response from server :: i
Response from server :: h
Response from server :: e
Response from server :: l
Response from server :: l
Response from server :: o
Response from server :: h
Response from server :: o
Response from server :: w
Response from server :: a
Response from server :: r
Response from server :: e
Response from server :: y
Response from server :: o
Response from server :: u

Summary:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store