Skip to content

feat: quarkus websocket structure #1605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions outDython/client.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
//////////////////////////////////////////////////////////////////////
///
/// Postman Echo WebSocket Client - 1.0.0
/// Protocol: wss
/// Host: ws.postman-echo.com
/// Path: /raw
///
//////////////////////////////////////////////////////////////////////

import 'dart:convert';
import 'package:web_socket_channel/web_socket_channel.dart';

class PostmanEchoWebSocketClient {

final String _url;
WebSocketChannel? _channel;
final List<void Function(String)> _messageHandlers = [];
final List<void Function(Object)> _errorHandlers = [];

/// Constructor to initialize the WebSocket client
///
/// [url] - The WebSocket server URL. Use it if the server URL is different from the default one taken from the AsyncAPI document.
PostmanEchoWebSocketClient({String? url})
: _url = url ?? 'wss://ws.postman-echo.com/raw';


/// Method to establish a WebSocket connection
Future<void> connect() async {
if (_channel != null) {
print('Already connected to Postman Echo WebSocket Client server');
return;
}
try {
final wsUrl = Uri.parse(_url);
_channel = WebSocketChannel.connect(wsUrl);
print('Connected to Postman Echo WebSocket Client server');

/// Listen to the incoming message stream
_channel?.stream.listen(
(message) {
if (_messageHandlers.isNotEmpty) {
for (var handler in _messageHandlers) {
_handleMessage(message, handler);
}
} else {
print('Message received: $message');
}
},
onError: (error) {
if (_errorHandlers.isNotEmpty) {
for (var handler in _errorHandlers) {
handler(error);
}
} else {
print('WebSocket Error: $error');
}
},
onDone: () {
_channel = null;
print('Disconnected from Postman Echo WebSocket Client server');
},
);
} catch (error) {
print('Connection failed: $error');
rethrow;
}
}

/// Method to register custom message handlers
void registerMessageHandler(void Function(String) handler) {
_messageHandlers.add(handler);
}

/// Method to register custom error handlers
void registerErrorHandler(void Function(Object) handler) {
_errorHandlers.add(handler);
}

/// Method to handle message with callback
void _handleMessage(dynamic message, void Function(String) cb) {
cb(message is String ? message : message.toString());
}

/// Method to send an echo message to the server
void sendEchoMessage(dynamic message) {
if (_channel == null) {
print('Error: WebSocket is not connected.');
return;
}
final payload = message is String ? message : jsonEncode(message);
_channel!.sink.add(payload);
print('Sent message to echo server: $payload');
}

/// Method to close the WebSocket connection
void close() {
_channel?.sink.close();
_channel = null;
print('WebSocket connection closed.');
}
}

5 changes: 5 additions & 0 deletions outDython/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
name: wsclient
environment:
sdk: '>=3.0.0 <4.0.0'
dependencies:
web_socket_channel: ^3.0.2
172 changes: 172 additions & 0 deletions outPython/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# //////////////////////////////////////////////////////////////////////////
#
# Postman Echo WebSocket Client - 1.0.0
# Protocol: wss
# Host: ws.postman-echo.com
# Path: /raw
#
# //////////////////////////////////////////////////////////////////////////

import json
import certifi
import threading
import websocket



class PostmanEchoWebSocketClient:

def __init__(self, url: str = "wss://ws.postman-echo.com/raw"):

"""
Constructor to initialize the WebSocket client.

Args:
url (str, optional): The WebSocket server URL. Use it if the server URL is
different from the default one taken from the AsyncAPI document.
"""
self.ws_app = None # Instance of WebSocketApp
self.message_handlers = [] # Callables for incoming messages
self.error_handlers = [] # Callables for errors
self.outgoing_processors = [] # Callables to process outgoing messages
self._stop_event = threading.Event()




self.url = url

def on_open(self, ws):
print("Connected to Postman Echo WebSocket Client server")

def on_message(self, ws, message):
self.handle_message(message)

def on_error(self, ws, error):
print("WebSocket Error:", error)
self.handle_error(error)

def on_close(self, ws, close_status_code, close_msg):
print("Disconnected from Postman Echo WebSocket Client", close_status_code, close_msg)

def connect(self):
"""Establish the connection and start the run_forever loop in a background thread."""
ssl_opts = {"ca_certs": certifi.where()}
self.ws_app = websocket.WebSocketApp(
self.url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# Run the WebSocketApp's run_forever in a separate thread with multithreading enabled.
def run():

retry = 0
max_retries = 5

while not self._stop_event.is_set() and retry < max_retries:
try:
retry += 1
print("Starting WebSocket thread...")
self.ws_app.run_forever(sslopt=ssl_opts)
except Exception as e:
print(f"Exception in WebSocket thread: {e}") # Print full error details

thread = threading.Thread(target=run, daemon=True)
thread.start()

def register_message_handler(self, handler):
"""Register a callable to process incoming messages."""
if callable(handler):
self.message_handlers.append(handler)
else:
print("Message handler must be callable")

def register_error_handler(self, handler):
"""Register a callable to process errors."""
if callable(handler):
self.error_handlers.append(handler)
else:
print("Error handler must be callable")

def register_outgoing_processor(self, processor):
"""
Register a callable that processes outgoing messages automatically.
These processors run in sequence before each message is sent.
"""
if callable(processor):
self.outgoing_processors.append(processor)
else:
print("Outgoing processor must be callable")

def handle_message(self, message):
"""Pass the incoming message to all registered message handlers. """
if len(self.message_handlers) == 0:
print("\033[94mReceived raw message:\033[0m", message)
else:
for handler in self.message_handlers:
handler(message)

def handle_error(self, error):
"""Pass the error to all registered error handlers. Generic log message is printed if no handlers are registered."""
if len(self.error_handlers) == 0:
print("\033[91mError occurred:\033[0m", error)
else:
# Call custom error handlers
for handler in self.error_handlers:
handler(error)

async def send_echo_message(self, message):
"""
Send a send_echo_message message using the WebSocket connection attached to this instance.

Args:
message (dict or str): The message to send. Will be serialized to JSON if it's a dictionary.

Raises:
Exception: If sending fails or the socket is not connected.
"""
await self._send(message, self.ws_app)

@staticmethod
async def send_echo_message_static(message, socket):
"""
Send a send_echo_message message using a provided WebSocket connection, without needing an instance.

Args:
message (dict or str): The message to send.
socket (websockets.WebSocketCommonProtocol): The WebSocket to send through.

Raises:
Exception: If sending fails or the socket is not connected.
"""
await PostmanEchoWebSocketClient._send(message, socket)


@staticmethod
async def _send(message, socket):
"""
Internal helper to handle the actual sending logic.

Args:
message (dict or str): The message to send.
socket (websockets.WebSocketCommonProtocol): The WebSocket to send through.

Notes:
If message is a dictionary, it will be automatically converted to JSON.
"""
try:
if isinstance(message, dict):
message = json.dumps(message)
await socket.send(message)
except Exception as e:
print("Error sending:", e)

def close(self):
"""Cleanly close the WebSocket connection."""
self._stop_event.set()
if self.ws_app:
self.ws_app.close()
print("WebSocket connection closed.")

3 changes: 3 additions & 0 deletions outPython/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
websocket-client==1.8.0
certifi==2025.1.31
requests==2.32.3
5 changes: 5 additions & 0 deletions outputTest/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*
!target/*-runner
!target/*-runner.jar
!target/lib/*
!target/quarkus-app/*
45 changes: 45 additions & 0 deletions outputTest/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#Maven
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
release.properties
.flattened-pom.xml

# Eclipse
.project
.classpath
.settings/
bin/

# IntelliJ
.idea
*.ipr
*.iml
*.iws

# NetBeans
nb-configuration.xml

# Visual Studio Code
.vscode
.factorypath

# OSX
.DS_Store

# Vim
*.swp
*.swo

# patch
*.orig
*.rej

# Local environment
.env

# Plugin directory
/.quarkus/cli/plugins/
# TLS Certificates
.certs/
1 change: 1 addition & 0 deletions outputTest/.mvn/wrapper/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
maven-wrapper.jar
Loading