Class RxIo


  • public final class RxIo
    extends Object
    Utility class for creating reactive transports and sockets.

    ByteBuffer based transports

    Network packets are represented using ByteBuffers. Read operations require the use of a caller-provided Supplier which MUST be used for supplying the buffer in which received data should be placed. The supplier may return null which indicates that the transport should allocate a buffer on behalf of the caller. On completion, the returned buffer's position will represent the number of bytes read. The number of bytes remaining may be non-zero if there was not enough data available to fill the buffer. The caller may then choose to repeat the read in order to fill the buffer, or flip it in order to consume the buffer.

    Write operations return a cold Completable which must be subscribed in order to write the provided data to the network. It will only complete once all of the data has been written or the write fails. In other words, on completion the provided buffer's remaining space is guaranteed to be zero.

    • Method Detail

      • ldapTransportFromOptions

        public static RxTransport<LdapMessage,​LdapSocket> ldapTransportFromOptions​(Options options)
        Returns a new LDAP RxTransport configured using the provided options.

        This method supports the following options in order to select the transport:

        • CommonLdapOptions.TRANSPORT
        In addition, the options will be used to configure the transport once it has been selected. See the documentation of the individual transports for the specific options they support.
        Parameters:
        options - The options for configuring the LDAP transport.
        Returns:
        The LDAP transport.
      • asyncTcpTransport

        public static RxTransport<ByteBuffer,​RxSocket<ByteBuffer>> asyncTcpTransport​(Options options)
        Returns a reactive TCP transport that uses asynchronous non-blocking IO when accepting connections, sending and receiving data. This transport uses a finite number of threads and is therefore suitable for use in applications that may have many concurrent connections.

        This transport supports the following options:

        • CommonLdapOptions.SELECTOR_THREAD_COUNT
        • CommonLdapOptions.SELECTOR_THREAD_NAME

        RxTransport.connect(java.lang.String, int, org.forgerock.util.Options) supports the following options:

        • CommonLdapOptions.WRITE_TIMEOUT
        • CommonLdapOptions.SO_REUSE_ADDRESS
        • CommonLdapOptions.SO_KEEPALIVE
        • CommonLdapOptions.TCP_NO_DELAY
        • CommonLdapOptions.PROBE_BYTES_READ
        • CommonLdapOptions.PROBE_BYTES_WRITTEN
        • CommonLdapOptions.BUFFER_SIZE
        RxTransport.listen(java.net.InetSocketAddress, org.forgerock.util.Options) supports the following options:
        • LdapServer.CONNECT_MAX_BACKLOG
        • CommonLdapOptions.WRITE_TIMEOUT
        • CommonLdapOptions.SO_REUSE_ADDRESS
        • CommonLdapOptions.SO_KEEPALIVE
        • CommonLdapOptions.TCP_NO_DELAY
        • CommonLdapOptions.PROBE_BYTES_READ
        • CommonLdapOptions.PROBE_BYTES_WRITTEN
        • CommonLdapOptions.BUFFER_SIZE
        Refer to the class documentation for more information describing how ByteBuffers are transferred.
        Parameters:
        options - Configuration options for the transport.
        Returns:
        An asynchronous non-blocking IO reactive TCP transport suitable for use in applications having many concurrent connections.
      • syncTcpTransport

        public static RxTransport<ByteBuffer,​RxSocket<ByteBuffer>> syncTcpTransport​(Options options)
        Returns a reactive TCP transport that uses synchronous blocking IO when accepting connections, sending and receiving data. This transport may yield slightly better performance than an asynchronous non-blocking transport. However, it uses a separate thread for each listener and connection, so it should not be used in applications that may have many concurrent connections.

        This transport does not currently support any configuration options.

        RxTransport.connect(java.lang.String, int, org.forgerock.util.Options) supports the following options:

        • CommonLdapOptions.WRITE_TIMEOUT
        • CommonLdapOptions.SO_REUSE_ADDRESS
        • CommonLdapOptions.SO_KEEPALIVE
        • CommonLdapOptions.TCP_NO_DELAY
        • CommonLdapOptions.PROBE_BYTES_READ
        • CommonLdapOptions.PROBE_BYTES_WRITTEN
        • CommonLdapOptions.BUFFER_SIZE
        • CommonLdapOptions.SELECTOR_THREAD_NAME - the name of the client reader thread
        RxTransport.listen(java.net.InetSocketAddress, org.forgerock.util.Options) supports the following options:
        • LdapServer.CONNECT_MAX_BACKLOG
        • CommonLdapOptions.WRITE_TIMEOUT
        • CommonLdapOptions.SO_REUSE_ADDRESS
        • CommonLdapOptions.SO_KEEPALIVE
        • CommonLdapOptions.TCP_NO_DELAY
        • CommonLdapOptions.PROBE_BYTES_READ
        • CommonLdapOptions.PROBE_BYTES_WRITTEN
        • CommonLdapOptions.BUFFER_SIZE
        • CommonLdapOptions.SELECTOR_THREAD_NAME - the name of the server accept thread, and prefix for client reader threads
        Refer to the class documentation for more information describing how ByteBuffers are transferred.
        Parameters:
        options - Configuration options for the transport (currently not used).
        Returns:
        A synchronous blocking IO reactive TCP transport suitable for use in applications with few concurrent connections.
      • memoryTransport

        public static <M> RxTransport<M,​RxSocket<M>> memoryTransport​(Options options)
        Returns a reactive transport that uses in-memory sockets for all communication. Even though this transport does not use the network for communication, it does use the addresses passed to RxTransport.listen(InetSocketAddress, Options) for identifying servers in calls to RxTransport.connect(String, int, Options).

        This transport does not currently support any configuration options.

        The type of network packets transmitted by this transport is specified by the caller, but should generally be immutable objects. Care should be taken when using mutable objects in order to avoid potential side-effects between the client and server. Applications should avoid holding references to objects after writing them to the socket. In addition, care should be taken to ensure that written objects are in a state which is compatible with the peer reader. Note that Read operations do not use the provided supplier.

        Type Parameters:
        M - The type of messages transferred by the transport.
        Parameters:
        options - Configuration options for the transport (currently not used).
        Returns:
        A reactive transport that uses in-memory sockets for all communication.
      • sslClientSocket

        public static SslRxSocket sslClientSocket​(RxSocket<ByteBuffer> socket,
                                                  Options options)
        Returns a reactive socket that adds an SSL client layer to an underlying socket using an SSLEngine obtained from the CommonLdapOptions.SSL_OPTIONS of the provided options and the remote host and port of the provided socket.

        SSL client sockets support the following options:

        • CommonLdapOptions.SSL_OPTIONS - for configuring the SSLEngine.
        Refer to the class documentation for more information describing how ByteBuffers are transferred.
        Parameters:
        socket - The underlying socket.
        options - Configuration options for the SSL socket.
        Returns:
        A reactive socket that adds an SSL client layer to an underlying socket.
      • sslServerSocket

        public static SslRxSocket sslServerSocket​(RxSocket<ByteBuffer> socket,
                                                  Options options)
        Returns a reactive socket that adds an SSL server layer to an underlying socket using an SSLEngine obtained from the CommonLdapOptions.SSL_OPTIONS of the provided options.

        SSL server sockets support the following options:

        • CommonLdapOptions.SSL_OPTIONS - for configuring the SSLEngine.
        Refer to the class documentation for more information describing how ByteBuffers are transferred.
        Parameters:
        socket - The underlying socket.
        options - Configuration options for the SSL socket.
        Returns:
        A reactive socket that adds an SSL server layer to an underlying socket.
      • saslClientSocket

        public static SaslRxSocket saslClientSocket​(RxSocket<ByteBuffer> socket,
                                                    SaslClient saslClient)
        Returns a reactive socket that adds a SASL QOP client layer to an underlying socket using the provided authenticated SaslClient. Refer to the class documentation for more information describing how ByteBuffers are transferred.
        Parameters:
        socket - The underlying socket.
        saslClient - The authenticated SASL client.
        Returns:
        A reactive socket that adds a SASL QOP client layer to an underlying socket.
      • saslServerSocket

        public static SaslRxSocket saslServerSocket​(RxSocket<ByteBuffer> socket,
                                                    SaslServer saslServer)
        Returns a reactive socket that adds a SASL QOP server layer to an underlying socket using the provided authenticated SaslServer. Refer to the class documentation for more information describing how ByteBuffers are transferred.
        Parameters:
        socket - The underlying socket.
        saslServer - The authenticated SASL server.
        Returns:
        A reactive socket that adds a SASL QOP server layer to an underlying socket.
      • ldapMemoryTransport

        public static RxTransport<LdapMessage,​LdapSocket> ldapMemoryTransport​(Options options)
        Returns an in-memory reactive LDAP transport.

        This transport does not currently support any configuration options, nor does it support StartTLS or SASL QOP.

        Parameters:
        options - Configuration options for the transport (currently not used).
        Returns:
        An in-memory reactive LDAP transport.
      • ldapClientSocket

        public static LdapSocket ldapClientSocket​(RxSocket<ByteBuffer> socket,
                                                  Options options)
        Returns a reactive socket that adds an LDAP client layer to an underlying socket.

        LDAP sockets support the following options:

        • CommonLdapOptions.DECODE_OPTIONS - for controlling how LDAP messages are decoded.
        • CommonLdapOptions.MAX_MSG_SIZE_IN_BYTES - the maximum request size in bytes for incoming LDAP messages.
        The LDAP transport supports the StartTLS extended operation. The transport's options will be combined with with the SslOptions provided by the StartTlsExtendedRequest or StartTlsExtendedResult. Refer to sslClientSocket(RxSocket, Options) for more information.

        Refer to the class documentation for more information describing how ByteBuffers are transferred.

        Parameters:
        socket - The underlying socket.
        options - Configuration options for the LDAP socket.
        Returns:
        A reactive socket that adds an LDAP client layer to an underlying socket.
      • ldapServerSocket

        public static LdapSocket ldapServerSocket​(RxSocket<ByteBuffer> socket,
                                                  Options options)
        Returns a reactive socket that adds an LDAP server layer to an underlying socket.

        LDAP sockets support the following options:

        • CommonLdapOptions.DECODE_OPTIONS - for controlling how LDAP messages are decoded.
        • CommonLdapOptions.MAX_MSG_SIZE_IN_BYTES - the maximum request size in bytes for incoming LDAP messages.
        The LDAP transport supports the StartTLS extended operation. The transport's options will be combined with with the SslOptions provided by the StartTlsExtendedRequest or StartTlsExtendedResult. Refer to sslServerSocket(RxSocket, Options) for more information.

        Refer to the class documentation for more information describing how ByteBuffers are transferred.

        Parameters:
        socket - The underlying socket.
        options - Configuration options for the LDAP socket.
        Returns:
        A reactive socket that adds an LDAP server layer to an underlying socket.
      • transformTransport

        public static <D,​DS extends RxSocket<D>,​U,​US extends RxSocket<U>> RxTransport<U,​US> transformTransport​(String protocol,
                                                                                                                                       RxTransport<D,​DS> downstream,
                                                                                                                                       BiFunction<io.reactivex.rxjava3.core.Single<DS>,​Options,​io.reactivex.rxjava3.core.Single<US>> connectTransformer,
                                                                                                                                       BiFunction<io.reactivex.rxjava3.core.Single<DS>,​RxServerSocket<D,​DS>,​io.reactivex.rxjava3.core.Single<US>> acceptTransformer)
        Returns a reactive transport that transforms downstream client and server reactive sockets of type D to upstream sockets of type U. This method facilitates creation of intermediate filtering transports that may perform functions like encryption, compression, or protocol encoding and decoding.
        Type Parameters:
        D - The downstream transport message type.
        DS - The type of reactive sockets exposed by the downstream transport.
        U - The upstream transport message type.
        US - The type of reactive sockets exposed by the upstream transport.
        Parameters:
        protocol - The name of the protocol implemented by the transformed transport, e.g. SSL.
        downstream - The downstream transport.
        connectTransformer - A function which transforms the singles returned by RxTransport.connect(java.lang.String, int, org.forgerock.util.Options) into their upstream form.
        acceptTransformer - A function which transforms the singles returned by RxServerSocket.accept() into their upstream form.
        Returns:
        The transformed transport.
      • handleUndeliverableRxErrors

        public static void handleUndeliverableRxErrors​(Throwable t)
        This method is suitable for use as an RX Java global error handler. It logs errors that are likely to be caused by bugs. The global RX Java error handler is invoked by RX Java when an RX operator is unable to deliver an exception downstream, usually because the subscriber has already cancelled. It can be configured using the following code:
        
         RxJavaPlugins.setErrorHandler(RxIo::handleUndeliverableRxExceptionsErrors);
         
        Parameters:
        t - The undeliverable exception.
        See Also:
        RxJavaPlugins.setErrorHandler(io.reactivex.rxjava3.functions.Consumer<? super java.lang.Throwable>), RxJavaPlugins.onError(java.lang.Throwable)