Support RSocket
Skip to main content

RxPy Integration

The rsocket-py implementation doesn't use RxPy by default. A wrapper class RxRSocketClient can be used to interact with RxPy (>= 3.2.0) entities (Observable, Observer)

Getting started

To use Rx with the rsocket client instantiate an RxRSocket with an existing client or server instance:

from rsocket.rx_support.rx_rsocket import RxRSocket

import asyncio
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP
from rsocket.helpers import single_transport_provider

async def main():
connection = await asyncio.open_connection('localhost', 6565)

async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client:
rx_client = RxRSocket(client)
... # Execute requests

if __name__ == '__main__':
asyncio.run(main())

Examples

RxRSocket can be used as a context manager with a client which is not yet connected. It will close the underlying client when exiting the context. Example code:

from rsocket.rx_support.rx_rsocket import RxRSocket

import asyncio
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP
from rsocket.helpers import single_transport_provider

async def main():
connection = await asyncio.open_connection('localhost', 6565)
client = RSocketClient(single_transport_provider(TransportTCP(*connection)))

async with RxRSocket(client) as rx_client:
... # Execute requests

if __name__ == '__main__':
asyncio.run(main())

Request Response

from rsocket.payload import Payload
from rx import operators

received_message = await rx_client.request_response(
Payload(b'request text')
).pipe(
operators.map(lambda payload: payload.data),
operators.single()
)

Request Stream

from rsocket.payload import Payload
from rx import operators

received_messages = await rx_client.request_stream(
Payload(b'request text'),
request_limit=2
).pipe(
operators.map(lambda payload: payload.data),
operators.to_list()
)

Request Channel

from rsocket.payload import Payload
import rx
from rx import operators

sent_payloads = [Payload(data) for data in [b'1', b'2', b'3']]

received_messages = await rx_client.request_channel(
Payload(b'request text'),
observable=rx.from_list(sent_payloads),
request_limit=2
).pipe(
operators.map(lambda payload: payload.data),
operators.to_list()
)