Skip to content

Commit 20990e6

Browse files
committed
chore: add rxjs example without rsocket-messaging
Signed-off-by: Kevin Viglucci <[email protected]>
1 parent 09f237e commit 20990e6

File tree

2 files changed

+230
-0
lines changed

2 files changed

+230
-0
lines changed

packages/rsocket-examples/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"start-client-server-request-response-tcp": "ts-node -r tsconfig-paths/register src/ClientServerRequestResponseExampleTcp.ts",
1717
"start-client-server-request-response-websocket": "ts-node -r tsconfig-paths/register src/ClientServerRequestResponseExampleWebSocket.ts",
1818
"start-client-server-composite-metadata-route": "ts-node -r tsconfig-paths/register src/ClientServerCompositeMetadataRouteExample.ts",
19+
"start-client-server-rxjs-requester-responder": "ts-node -r tsconfig-paths/register src/rxjs/RxjsRequesterResponderExample.ts",
1920
"start-client-server-rxjs-messaging-composite-metadata-route": "ts-node -r tsconfig-paths/register src/rxjs/RxjsMessagingCompositeMetadataRouteExample.ts",
2021
"start-client-apollo-graphql": "ts-node -r tsconfig-paths/register src/graphql/apollo/client/example.ts",
2122
"start-client-server-apollo-graphql": "ts-node -r tsconfig-paths/register src/graphql/apollo/client-server/example.ts"
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright 2021-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { RSocket, RSocketConnector, RSocketServer } from "rsocket-core";
18+
import { Codec } from "rsocket-messaging";
19+
import { RxRequestersFactory, RxRespondersFactory } from "rsocket-adapter-rxjs";
20+
import { TcpClientTransport } from "rsocket-tcp-client";
21+
import { TcpServerTransport } from "rsocket-tcp-server";
22+
import { exit } from "process";
23+
import {
24+
firstValueFrom,
25+
interval,
26+
map,
27+
Observable,
28+
of,
29+
take,
30+
tap,
31+
timer,
32+
} from "rxjs";
33+
import Logger from "../shared/logger";
34+
35+
let serverCloseable;
36+
37+
class EchoService {
38+
handleEchoRequestResponse(data: string): Observable<string> {
39+
return timer(1000).pipe(map(() => `Echo: ${data}`));
40+
}
41+
42+
handleEchoRequestStream(data: string): Observable<string> {
43+
return interval(1000).pipe(
44+
map(() => `RxEchoService Echo: ${data}`),
45+
tap((value) => {
46+
Logger.info(`[server] sending: ${value}`);
47+
})
48+
);
49+
}
50+
51+
handleEchoRequestChannel(datas: Observable<string>): Observable<string> {
52+
datas
53+
.pipe(
54+
tap((value) => {
55+
Logger.info(`[server] receiving: ${value}`);
56+
})
57+
)
58+
.subscribe();
59+
return interval(200).pipe(
60+
map((data) => `RxEchoService Echo: ${data}`),
61+
tap((value) => {
62+
Logger.info(`[server] sending: ${value}`);
63+
})
64+
);
65+
}
66+
67+
handleLogFireAndForget(data: string): Observable<void> {
68+
Logger.info(`[server] received: ${data}`);
69+
return of(null);
70+
}
71+
}
72+
73+
function makeServer() {
74+
const stringCodec = new StringCodec();
75+
return new RSocketServer({
76+
transport: new TcpServerTransport({
77+
listenOptions: {
78+
port: 9090,
79+
host: "127.0.0.1",
80+
},
81+
}),
82+
acceptor: {
83+
accept: async () => {
84+
const echoService = new EchoService();
85+
return {
86+
fireAndForget: RxRespondersFactory.fireAndForget(
87+
echoService.handleLogFireAndForget,
88+
stringCodec
89+
),
90+
requestResponse: RxRespondersFactory.requestResponse(
91+
echoService.handleEchoRequestResponse,
92+
{ inputCodec: stringCodec, outputCodec: stringCodec }
93+
),
94+
requestStream: RxRespondersFactory.requestStream(
95+
echoService.handleEchoRequestStream,
96+
{ inputCodec: stringCodec, outputCodec: stringCodec }
97+
),
98+
requestChannel: RxRespondersFactory.requestChannel(
99+
echoService.handleEchoRequestChannel,
100+
{ inputCodec: stringCodec, outputCodec: stringCodec },
101+
4
102+
),
103+
};
104+
},
105+
},
106+
});
107+
}
108+
109+
function makeConnector() {
110+
return new RSocketConnector({
111+
transport: new TcpClientTransport({
112+
connectionOptions: {
113+
host: "127.0.0.1",
114+
port: 9090,
115+
},
116+
}),
117+
});
118+
}
119+
120+
async function fireAndForget(rsocket: RSocket, route: string = "") {
121+
return new Promise((resolve, reject) => {
122+
const request = RxRequestersFactory.fireAndForget(
123+
"Hello World",
124+
stringCodec
125+
);
126+
request(rsocket).subscribe({
127+
complete() {
128+
// give server a chance to handle before continuing
129+
setTimeout(() => {
130+
resolve(null);
131+
}, 100);
132+
},
133+
error(err) {
134+
reject(err);
135+
},
136+
});
137+
});
138+
}
139+
140+
async function requestResponse(rsocket: RSocket) {
141+
const request = RxRequestersFactory.requestResponse(
142+
"Hello World",
143+
stringCodec,
144+
stringCodec
145+
);
146+
return firstValueFrom(
147+
request(rsocket).pipe(tap((data) => Logger.info(`payload[data: ${data};]`)))
148+
);
149+
}
150+
151+
async function requestStream(rsocket: RSocket) {
152+
const request = RxRequestersFactory.requestStream(
153+
"Hello World",
154+
stringCodec,
155+
stringCodec,
156+
5
157+
);
158+
return request(rsocket)
159+
.pipe(
160+
tap((data) => {
161+
Logger.info(`[client] received[data: ${data}]`);
162+
}),
163+
take(10)
164+
)
165+
.toPromise();
166+
}
167+
168+
async function requestChannel(rsocket: RSocket) {
169+
const request = RxRequestersFactory.requestChannel(
170+
interval(1000).pipe(
171+
map((i) => `Hello World ${i}`),
172+
tap((data) => {
173+
Logger.info(`[client] produced[data: ${data}]`);
174+
})
175+
),
176+
stringCodec,
177+
stringCodec,
178+
5
179+
);
180+
return request(rsocket)
181+
.pipe(
182+
tap((data) => {
183+
Logger.info(`[client] received[data: ${data}]`);
184+
}),
185+
take(25)
186+
)
187+
.toPromise();
188+
}
189+
190+
class StringCodec implements Codec<string> {
191+
readonly mimeType: string = "text/plain";
192+
193+
decode(buffer: Buffer): string {
194+
return buffer.toString();
195+
}
196+
197+
encode(entity: string): Buffer {
198+
return Buffer.from(entity);
199+
}
200+
}
201+
202+
const stringCodec = new StringCodec();
203+
204+
async function main() {
205+
const server = makeServer();
206+
const connector = makeConnector();
207+
208+
serverCloseable = await server.bind();
209+
const rsocket = await connector.connect();
210+
211+
Logger.info(`----- Fire And Forget -----`);
212+
await fireAndForget(rsocket);
213+
214+
Logger.info(`----- Request Response -----`);
215+
await requestResponse(rsocket);
216+
217+
Logger.info(`----- Request Stream -----`);
218+
await requestStream(rsocket);
219+
220+
Logger.info(`----- Request Channel -----`);
221+
await requestChannel(rsocket);
222+
}
223+
224+
main()
225+
.then(() => exit())
226+
.catch((error: Error) => {
227+
console.error(error);
228+
exit(1);
229+
});

0 commit comments

Comments
 (0)