Class RxIo
- java.lang.Object
-
- org.forgerock.opendj.io.rx.RxIo
-
public final class RxIo extends Object
Utility class for creating reactive transports and sockets.ByteBuffer
based transportsNetwork packets are represented using
ByteBuffer
s.Read
operations require the use of a caller-providedSupplier
which MUST be used for supplying the buffer in which received data should be placed. The supplier may returnnull
which indicates that the transport should allocate a buffer on behalf of the caller. On completion, the returned buffer'sposition
will represent the number of bytes read. The number of bytesremaining
may be non-zero if there was not enough data available to fill the buffer. The caller may then choose to repeat the read in order to fill the buffer, orflip
it in order to consume the buffer.Write
operations return a coldCompletable
which must be subscribed in order to write the provided data to the network. It will only complete once all of the data has been written or the write fails. In other words, on completion the provided buffer'sremaining
space is guaranteed to be zero.
-
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static RxTransport<ByteBuffer,RxSocket<ByteBuffer>>
asyncTcpTransport(Options options)
Returns a reactive TCP transport that uses asynchronous non-blocking IO when accepting connections, sending and receiving data.static void
handleUndeliverableRxErrors(Throwable t)
This method is suitable for use as an RX Java global error handler.static LdapSocket
ldapClientSocket(RxSocket<ByteBuffer> socket, Options options)
Returns a reactive socket that adds an LDAP client layer to an underlying socket.static RxTransport<LdapMessage,LdapSocket>
ldapMemoryTransport(Options options)
Returns an in-memory reactive LDAP transport.static LdapSocket
ldapServerSocket(RxSocket<ByteBuffer> socket, Options options)
Returns a reactive socket that adds an LDAP server layer to an underlying socket.static RxTransport<LdapMessage,LdapSocket>
ldapTransport(RxTransport<ByteBuffer,? extends RxSocket<ByteBuffer>> transport)
Returns a reactive transport that adds an LDAP layer to an underlying transport.static RxTransport<LdapMessage,LdapSocket>
ldapTransportFromOptions(Options options)
Returns a new LDAPRxTransport
configured using the provided options.static RxTransport<ByteBuffer,RxSocket<ByteBuffer>>
memoryByteBufferTransport(Options options)
Returns a reactive transport that uses in-memory sockets for all communication.static <M> RxTransport<M,RxSocket<M>>
memoryTransport(Options options)
Returns a reactive transport that uses in-memory sockets for all communication.static SaslRxSocket
saslClientSocket(RxSocket<ByteBuffer> socket, SaslClient saslClient)
Returns a reactive socket that adds a SASL QOP client layer to an underlying socket using the provided authenticatedSaslClient
.static SaslRxSocket
saslServerSocket(RxSocket<ByteBuffer> socket, SaslServer saslServer)
Returns a reactive socket that adds a SASL QOP server layer to an underlying socket using the provided authenticatedSaslServer
.static SslRxSocket
sslClientSocket(RxSocket<ByteBuffer> socket, Options options)
Returns a reactive socket that adds an SSL client layer to an underlying socket using anSSLEngine
obtained from theCommonLdapOptions.SSL_OPTIONS
of the provided options and the remote host and port of the provided socket.static SslRxSocket
sslServerSocket(RxSocket<ByteBuffer> socket, Options options)
Returns a reactive socket that adds an SSL server layer to an underlying socket using anSSLEngine
obtained from theCommonLdapOptions.SSL_OPTIONS
of the provided options.static RxTransport<ByteBuffer,SslRxSocket>
sslTransport(RxTransport<ByteBuffer,? extends RxSocket<ByteBuffer>> transport)
Returns a reactive transport that adds an SSL layer to an underlying transport.static RxTransport<ByteBuffer,RxSocket<ByteBuffer>>
syncTcpTransport(Options options)
Returns a reactive TCP transport that uses synchronous blocking IO when accepting connections, sending and receiving data.static <D,DS extends RxSocket<D>,U,US extends RxSocket<U>>
RxTransport<U,US>transformTransport(String protocol, RxTransport<D,DS> downstream, BiFunction<io.reactivex.rxjava3.core.Single<DS>,Options,io.reactivex.rxjava3.core.Single<US>> connectTransformer, BiFunction<io.reactivex.rxjava3.core.Single<DS>,RxServerSocket<D,DS>,io.reactivex.rxjava3.core.Single<US>> acceptTransformer)
Returns a reactive transport that transforms downstream client and server reactive sockets of typeD
to upstream sockets of typeU
.
-
-
-
Method Detail
-
ldapTransportFromOptions
public static RxTransport<LdapMessage,LdapSocket> ldapTransportFromOptions(Options options)
Returns a new LDAPRxTransport
configured using the provided options.This method supports the following options in order to select the transport:
CommonLdapOptions.TRANSPORT
- Parameters:
options
- The options for configuring the LDAP transport.- Returns:
- The LDAP transport.
-
asyncTcpTransport
public static RxTransport<ByteBuffer,RxSocket<ByteBuffer>> asyncTcpTransport(Options options)
Returns a reactive TCP transport that uses asynchronous non-blocking IO when accepting connections, sending and receiving data. This transport uses a finite number of threads and is therefore suitable for use in applications that may have many concurrent connections.This transport supports the following options:
CommonLdapOptions.SELECTOR_THREAD_COUNT
CommonLdapOptions.SELECTOR_THREAD_NAME
RxTransport.connect(java.lang.String, int, org.forgerock.util.Options)
supports the following options:CommonLdapOptions.WRITE_TIMEOUT
CommonLdapOptions.SO_REUSE_ADDRESS
CommonLdapOptions.SO_KEEPALIVE
CommonLdapOptions.TCP_NO_DELAY
CommonLdapOptions.PROBE_BYTES_READ
CommonLdapOptions.PROBE_BYTES_WRITTEN
CommonLdapOptions.BUFFER_SIZE
RxTransport.listen(java.net.InetSocketAddress, org.forgerock.util.Options)
supports the following options:LdapServer.CONNECT_MAX_BACKLOG
CommonLdapOptions.WRITE_TIMEOUT
CommonLdapOptions.SO_REUSE_ADDRESS
CommonLdapOptions.SO_KEEPALIVE
CommonLdapOptions.TCP_NO_DELAY
CommonLdapOptions.PROBE_BYTES_READ
CommonLdapOptions.PROBE_BYTES_WRITTEN
CommonLdapOptions.BUFFER_SIZE
ByteBuffers
are transferred.- Parameters:
options
- Configuration options for the transport.- Returns:
- An asynchronous non-blocking IO reactive TCP transport suitable for use in applications having many concurrent connections.
-
syncTcpTransport
public static RxTransport<ByteBuffer,RxSocket<ByteBuffer>> syncTcpTransport(Options options)
Returns a reactive TCP transport that uses synchronous blocking IO when accepting connections, sending and receiving data. This transport may yield slightly better performance than an asynchronous non-blocking transport. However, it uses a separate thread for each listener and connection, so it should not be used in applications that may have many concurrent connections.This transport does not currently support any configuration options.
RxTransport.connect(java.lang.String, int, org.forgerock.util.Options)
supports the following options:CommonLdapOptions.WRITE_TIMEOUT
CommonLdapOptions.SO_REUSE_ADDRESS
CommonLdapOptions.SO_KEEPALIVE
CommonLdapOptions.TCP_NO_DELAY
CommonLdapOptions.PROBE_BYTES_READ
CommonLdapOptions.PROBE_BYTES_WRITTEN
CommonLdapOptions.BUFFER_SIZE
CommonLdapOptions.SELECTOR_THREAD_NAME
- the name of the client reader thread
RxTransport.listen(java.net.InetSocketAddress, org.forgerock.util.Options)
supports the following options:LdapServer.CONNECT_MAX_BACKLOG
CommonLdapOptions.WRITE_TIMEOUT
CommonLdapOptions.SO_REUSE_ADDRESS
CommonLdapOptions.SO_KEEPALIVE
CommonLdapOptions.TCP_NO_DELAY
CommonLdapOptions.PROBE_BYTES_READ
CommonLdapOptions.PROBE_BYTES_WRITTEN
CommonLdapOptions.BUFFER_SIZE
CommonLdapOptions.SELECTOR_THREAD_NAME
- the name of the server accept thread, and prefix for client reader threads
ByteBuffers
are transferred.- Parameters:
options
- Configuration options for the transport (currently not used).- Returns:
- A synchronous blocking IO reactive TCP transport suitable for use in applications with few concurrent connections.
-
memoryTransport
public static <M> RxTransport<M,RxSocket<M>> memoryTransport(Options options)
Returns a reactive transport that uses in-memory sockets for all communication. Even though this transport does not use the network for communication, it does use the addresses passed toRxTransport.listen(InetSocketAddress, Options)
for identifying servers in calls toRxTransport.connect(String, int, Options)
.This transport does not currently support any configuration options.
The type of network packets transmitted by this transport is specified by the caller, but should generally be immutable objects. Care should be taken when using mutable objects in order to avoid potential side-effects between the client and server. Applications should avoid holding references to objects after writing them to the socket. In addition, care should be taken to ensure that written objects are in a state which is compatible with the peer reader. Note that
Read
operations do not use the provided supplier.- Type Parameters:
M
- The type of messages transferred by the transport.- Parameters:
options
- Configuration options for the transport (currently not used).- Returns:
- A reactive transport that uses in-memory sockets for all communication.
-
memoryByteBufferTransport
public static RxTransport<ByteBuffer,RxSocket<ByteBuffer>> memoryByteBufferTransport(Options options)
Returns a reactive transport that uses in-memory sockets for all communication. Even though this transport does not use the network for communication, it does use the addresses passed toRxTransport.listen(InetSocketAddress, Options)
for identifying servers in calls toRxTransport.connect(String, int, Options)
.This transport does not currently support any configuration options.
Refer to the class documentation for more information describing how
ByteBuffers
are transferred.- Parameters:
options
- Configuration options for the transport (currently not used).- Returns:
- A reactive transport that uses in-memory sockets for all communication.
-
sslTransport
public static RxTransport<ByteBuffer,SslRxSocket> sslTransport(RxTransport<ByteBuffer,? extends RxSocket<ByteBuffer>> transport)
Returns a reactive transport that adds an SSL layer to an underlying transport. New sockets will be published after their SSL handshake completes or fails. More specifically, for clients,RxTransport.connect(java.lang.String, int, org.forgerock.util.Options)
will return an error if the handshake fails. However, for servers,RxServerSocket.accept()
will publish failed incoming connections even if the handshake failed. This gives the server application the opportunity to react to the failed connection, e.g. by logging an error in the application logs.RxTransport.connect(java.lang.String, int, org.forgerock.util.Options)
supports the following options:CommonLdapOptions.SSL_OPTIONS
RxTransport.listen(java.net.InetSocketAddress, org.forgerock.util.Options)
supports the following options:CommonLdapOptions.SSL_OPTIONS
ByteBuffers
are transferred.- Parameters:
transport
- The underlying transport.- Returns:
- A reactive transport adding SSL to an underlying transport.
-
sslClientSocket
public static SslRxSocket sslClientSocket(RxSocket<ByteBuffer> socket, Options options)
Returns a reactive socket that adds an SSL client layer to an underlying socket using anSSLEngine
obtained from theCommonLdapOptions.SSL_OPTIONS
of the provided options and the remote host and port of the provided socket.SSL client sockets support the following options:
CommonLdapOptions.SSL_OPTIONS
- for configuring theSSLEngine
.
ByteBuffers
are transferred.- Parameters:
socket
- The underlying socket.options
- Configuration options for the SSL socket.- Returns:
- A reactive socket that adds an SSL client layer to an underlying socket.
-
sslServerSocket
public static SslRxSocket sslServerSocket(RxSocket<ByteBuffer> socket, Options options)
Returns a reactive socket that adds an SSL server layer to an underlying socket using anSSLEngine
obtained from theCommonLdapOptions.SSL_OPTIONS
of the provided options.SSL server sockets support the following options:
CommonLdapOptions.SSL_OPTIONS
- for configuring theSSLEngine
.
ByteBuffers
are transferred.- Parameters:
socket
- The underlying socket.options
- Configuration options for the SSL socket.- Returns:
- A reactive socket that adds an SSL server layer to an underlying socket.
-
saslClientSocket
public static SaslRxSocket saslClientSocket(RxSocket<ByteBuffer> socket, SaslClient saslClient)
Returns a reactive socket that adds a SASL QOP client layer to an underlying socket using the provided authenticatedSaslClient
. Refer to the class documentation for more information describing howByteBuffers
are transferred.- Parameters:
socket
- The underlying socket.saslClient
- The authenticated SASL client.- Returns:
- A reactive socket that adds a SASL QOP client layer to an underlying socket.
-
saslServerSocket
public static SaslRxSocket saslServerSocket(RxSocket<ByteBuffer> socket, SaslServer saslServer)
Returns a reactive socket that adds a SASL QOP server layer to an underlying socket using the provided authenticatedSaslServer
. Refer to the class documentation for more information describing howByteBuffers
are transferred.- Parameters:
socket
- The underlying socket.saslServer
- The authenticated SASL server.- Returns:
- A reactive socket that adds a SASL QOP server layer to an underlying socket.
-
ldapTransport
public static RxTransport<LdapMessage,LdapSocket> ldapTransport(RxTransport<ByteBuffer,? extends RxSocket<ByteBuffer>> transport)
Returns a reactive transport that adds an LDAP layer to an underlying transport.RxTransport.connect(java.lang.String, int, org.forgerock.util.Options)
supports the following options:CommonLdapOptions.DECODE_OPTIONS
- for controlling how responses are decoded.CommonLdapOptions.MAX_MSG_SIZE_IN_BYTES
- the maximum response size in bytes for incoming LDAP responses.
RxTransport.listen(java.net.InetSocketAddress, org.forgerock.util.Options)
supports the following options:CommonLdapOptions.DECODE_OPTIONS
- for controlling how requests are decoded.CommonLdapOptions.MAX_MSG_SIZE_IN_BYTES
- the maximum request size in bytes for incoming LDAP requests.
SslOptions
provided by theStartTlsExtendedRequest
orStartTlsExtendedResult
. Refer tosslTransport(RxTransport)
for more information.Refer to the class documentation for more information describing how
ByteBuffers
are transferred.- Parameters:
transport
- The underlying transport.- Returns:
- A reactive transport adding LDAP to an underlying transport.
-
ldapMemoryTransport
public static RxTransport<LdapMessage,LdapSocket> ldapMemoryTransport(Options options)
Returns an in-memory reactive LDAP transport.This transport does not currently support any configuration options, nor does it support StartTLS or SASL QOP.
- Parameters:
options
- Configuration options for the transport (currently not used).- Returns:
- An in-memory reactive LDAP transport.
-
ldapClientSocket
public static LdapSocket ldapClientSocket(RxSocket<ByteBuffer> socket, Options options)
Returns a reactive socket that adds an LDAP client layer to an underlying socket.LDAP sockets support the following options:
CommonLdapOptions.DECODE_OPTIONS
- for controlling how LDAP messages are decoded.CommonLdapOptions.MAX_MSG_SIZE_IN_BYTES
- the maximum request size in bytes for incoming LDAP messages.
SslOptions
provided by theStartTlsExtendedRequest
orStartTlsExtendedResult
. Refer tosslClientSocket(RxSocket, Options)
for more information.Refer to the class documentation for more information describing how
ByteBuffers
are transferred.- Parameters:
socket
- The underlying socket.options
- Configuration options for the LDAP socket.- Returns:
- A reactive socket that adds an LDAP client layer to an underlying socket.
-
ldapServerSocket
public static LdapSocket ldapServerSocket(RxSocket<ByteBuffer> socket, Options options)
Returns a reactive socket that adds an LDAP server layer to an underlying socket.LDAP sockets support the following options:
CommonLdapOptions.DECODE_OPTIONS
- for controlling how LDAP messages are decoded.CommonLdapOptions.MAX_MSG_SIZE_IN_BYTES
- the maximum request size in bytes for incoming LDAP messages.
SslOptions
provided by theStartTlsExtendedRequest
orStartTlsExtendedResult
. Refer tosslServerSocket(RxSocket, Options)
for more information.Refer to the class documentation for more information describing how
ByteBuffers
are transferred.- Parameters:
socket
- The underlying socket.options
- Configuration options for the LDAP socket.- Returns:
- A reactive socket that adds an LDAP server layer to an underlying socket.
-
transformTransport
public static <D,DS extends RxSocket<D>,U,US extends RxSocket<U>> RxTransport<U,US> transformTransport(String protocol, RxTransport<D,DS> downstream, BiFunction<io.reactivex.rxjava3.core.Single<DS>,Options,io.reactivex.rxjava3.core.Single<US>> connectTransformer, BiFunction<io.reactivex.rxjava3.core.Single<DS>,RxServerSocket<D,DS>,io.reactivex.rxjava3.core.Single<US>> acceptTransformer)
Returns a reactive transport that transforms downstream client and server reactive sockets of typeD
to upstream sockets of typeU
. This method facilitates creation of intermediate filtering transports that may perform functions like encryption, compression, or protocol encoding and decoding.- Type Parameters:
D
- The downstream transport message type.DS
- The type of reactive sockets exposed by the downstream transport.U
- The upstream transport message type.US
- The type of reactive sockets exposed by the upstream transport.- Parameters:
protocol
- The name of the protocol implemented by the transformed transport, e.g. SSL.downstream
- The downstream transport.connectTransformer
- A function which transforms the singles returned byRxTransport.connect(java.lang.String, int, org.forgerock.util.Options)
into their upstream form.acceptTransformer
- A function which transforms the singles returned byRxServerSocket.accept()
into their upstream form.- Returns:
- The transformed transport.
-
handleUndeliverableRxErrors
public static void handleUndeliverableRxErrors(Throwable t)
This method is suitable for use as an RX Java global error handler. It logs errors that are likely to be caused by bugs. The global RX Java error handler is invoked by RX Java when an RX operator is unable to deliver an exception downstream, usually because the subscriber has already cancelled. It can be configured using the following code:RxJavaPlugins.setErrorHandler(RxIo::handleUndeliverableRxExceptionsErrors);
- Parameters:
t
- The undeliverable exception.- See Also:
RxJavaPlugins.setErrorHandler(io.reactivex.rxjava3.functions.Consumer<? super java.lang.Throwable>)
,RxJavaPlugins.onError(java.lang.Throwable)
-
-