Simple Quickstart
Client Example
The client sends a request/response
message to the server on an interval, with the requested date-time format,
and exits after a certain amount of time has elapsed.
# client.py
import asyncio
import logging
import sys
from reactivestreams.subscriber import DefaultSubscriber
from rsocket.helpers import single_transport_provider
from rsocket.payload import Payload
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP
class StreamSubscriber(DefaultSubscriber):
def on_next(self, value, is_complete=False):
logging.info('RS: {}'.format(value))
self.subscription.request(1)
async def main(server_port):
logging.info('Connecting to server at localhost:%s', server_port)
connection = await asyncio.open_connection('localhost', server_port)
async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client:
payload = Payload(b'%Y-%m-%d %H:%M:%S')
async def run_request_response():
try:
while True:
result = await client.request_response(payload)
logging.info('Response: {}'.format(result.data))
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
task = asyncio.create_task(run_request_response())
await asyncio.sleep(5)
task.cancel()
await task
if __name__ == '__main__':
port = sys.argv[1] if len(sys.argv) > 1 else 6565
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main(port))
Server Example
The server responds to request/response
messages with the current formatted date-time.
# server.py
import asyncio
import logging
import sys
from datetime import datetime
from rsocket.helpers import create_future
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler
from rsocket.rsocket_server import RSocketServer
from rsocket.transports.tcp import TransportTCP
class Handler(BaseRequestHandler):
async def request_response(self, payload: Payload) -> asyncio.Future:
await asyncio.sleep(0.1) # Simulate not immediate process
date_time_format = payload.data.decode('utf-8')
formatted_date_time = datetime.now().strftime(date_time_format)
return create_future(Payload(formatted_date_time.encode('utf-8')))
async def run_server(server_port):
logging.info('Starting server at localhost:%s', server_port)
def session(*connection):
RSocketServer(TransportTCP(*connection), handler_factory=Handler)
server = await asyncio.start_server(session, 'localhost', server_port)
async with server:
await server.serve_forever()
if __name__ == '__main__':
port = sys.argv[1] if len(sys.argv) > 1 else 6565
logging.basicConfig(level=logging.DEBUG)
asyncio.run(run_server(port))
More Examples
Browse the following repositories for more rsocket-py
examples: