Class RxIo

java.lang.Object
org.forgerock.opendj.io.rx.RxIo

public final class RxIo extends Object
Utility class for creating reactive transports and sockets.

ByteBuffer based transports

Network packets are represented using ByteBuffers. Read operations require the use of a caller-provided Supplier which MUST be used for supplying the buffer in which received data should be placed. The supplier may return null which indicates that the transport should allocate a buffer on behalf of the caller. On completion, the returned buffer's position will represent the number of bytes read. The number of bytes remaining may be non-zero if there was not enough data available to fill the buffer. The caller may then choose to repeat the read in order to fill the buffer, or flip it in order to consume the buffer.

Write operations return a cold Completable which must be subscribed in order to write the provided data to the network. It will only complete once all of the data has been written or the write fails. In other words, on completion the provided buffer's remaining space is guaranteed to be zero.

  • Method Details

    • ldapTransportFromOptions

      public static RxTransport<LdapMessage,LdapSocket> ldapTransportFromOptions(Options options)
      Returns a new LDAP RxTransport configured using the provided options.

      This method supports the following options in order to select the transport:

      • CommonLdapOptions.TRANSPORT
      In addition, the options will be used to configure the transport once it has been selected. See the documentation of the individual transports for the specific options they support.
      Parameters:
      options - The options for configuring the LDAP transport.
      Returns:
      The LDAP transport.
    • asyncTcpTransport

      public static RxTransport<ByteBuffer,RxSocket<ByteBuffer>> asyncTcpTransport(Options options)
      Returns a reactive TCP transport that uses asynchronous non-blocking IO when accepting connections, sending and receiving data. This transport uses a finite number of threads and is therefore suitable for use in applications that may have many concurrent connections.

      This transport supports the following options:

      • CommonLdapOptions.SELECTOR_THREAD_COUNT
      • CommonLdapOptions.SELECTOR_THREAD_NAME

      RxTransport.connect(java.lang.String, int, org.forgerock.util.Options) supports the following options:

      • CommonLdapOptions.WRITE_TIMEOUT
      • CommonLdapOptions.SO_REUSE_ADDRESS
      • CommonLdapOptions.SO_KEEPALIVE
      • CommonLdapOptions.TCP_NO_DELAY
      • CommonLdapOptions.PROBE_BYTES_READ
      • CommonLdapOptions.PROBE_BYTES_WRITTEN
      • CommonLdapOptions.BUFFER_SIZE
      RxTransport.listen(java.net.InetSocketAddress, org.forgerock.util.Options) supports the following options:
      • LdapServer.CONNECT_MAX_BACKLOG
      • CommonLdapOptions.WRITE_TIMEOUT
      • CommonLdapOptions.SO_REUSE_ADDRESS
      • CommonLdapOptions.SO_KEEPALIVE
      • CommonLdapOptions.TCP_NO_DELAY
      • CommonLdapOptions.PROBE_BYTES_READ
      • CommonLdapOptions.PROBE_BYTES_WRITTEN
      • CommonLdapOptions.BUFFER_SIZE
      Refer to the class documentation for more information describing how ByteBuffers are transferred.
      Parameters:
      options - Configuration options for the transport.
      Returns:
      An asynchronous non-blocking IO reactive TCP transport suitable for use in applications having many concurrent connections.
    • syncTcpTransport

      public static RxTransport<ByteBuffer,RxSocket<ByteBuffer>> syncTcpTransport(Options options)
      Returns a reactive TCP transport that uses synchronous blocking IO when accepting connections, sending and receiving data. This transport may yield slightly better performance than an asynchronous non-blocking transport. However, it uses a separate thread for each listener and connection, so it should not be used in applications that may have many concurrent connections.

      This transport does not currently support any configuration options.

      RxTransport.connect(java.lang.String, int, org.forgerock.util.Options) supports the following options:

      • CommonLdapOptions.WRITE_TIMEOUT
      • CommonLdapOptions.SO_REUSE_ADDRESS
      • CommonLdapOptions.SO_KEEPALIVE
      • CommonLdapOptions.TCP_NO_DELAY
      • CommonLdapOptions.PROBE_BYTES_READ
      • CommonLdapOptions.PROBE_BYTES_WRITTEN
      • CommonLdapOptions.BUFFER_SIZE
      • CommonLdapOptions.SELECTOR_THREAD_NAME - the name of the client reader thread
      RxTransport.listen(java.net.InetSocketAddress, org.forgerock.util.Options) supports the following options:
      • LdapServer.CONNECT_MAX_BACKLOG
      • CommonLdapOptions.WRITE_TIMEOUT
      • CommonLdapOptions.SO_REUSE_ADDRESS
      • CommonLdapOptions.SO_KEEPALIVE
      • CommonLdapOptions.TCP_NO_DELAY
      • CommonLdapOptions.PROBE_BYTES_READ
      • CommonLdapOptions.PROBE_BYTES_WRITTEN
      • CommonLdapOptions.BUFFER_SIZE
      • CommonLdapOptions.SELECTOR_THREAD_NAME - the name of the server accept thread, and prefix for client reader threads
      Refer to the class documentation for more information describing how ByteBuffers are transferred.
      Parameters:
      options - Configuration options for the transport (currently not used).
      Returns:
      A synchronous blocking IO reactive TCP transport suitable for use in applications with few concurrent connections.
    • memoryTransport

      public static <M> RxTransport<M,RxSocket<M>> memoryTransport(Options options)
      Returns a reactive transport that uses in-memory sockets for all communication. Even though this transport does not use the network for communication, it does use the addresses passed to RxTransport.listen(InetSocketAddress, Options) for identifying servers in calls to RxTransport.connect(String, int, Options).

      This transport does not currently support any configuration options.

      The type of network packets transmitted by this transport is specified by the caller, but should generally be immutable objects. Care should be taken when using mutable objects in order to avoid potential side-effects between the client and server. Applications should avoid holding references to objects after writing them to the socket. In addition, care should be taken to ensure that written objects are in a state which is compatible with the peer reader. Note that Read operations do not use the provided supplier.

      Type Parameters:
      M - The type of messages transferred by the transport.
      Parameters:
      options - Configuration options for the transport (currently not used).
      Returns:
      A reactive transport that uses in-memory sockets for all communication.
    • memoryByteBufferTransport

      public static RxTransport<ByteBuffer,RxSocket<ByteBuffer>> memoryByteBufferTransport(Options options)
      Returns a reactive transport that uses in-memory sockets for all communication. Even though this transport does not use the network for communication, it does use the addresses passed to RxTransport.listen(InetSocketAddress, Options) for identifying servers in calls to RxTransport.connect(String, int, Options).

      This transport does not currently support any configuration options.

      Refer to the class documentation for more information describing how ByteBuffers are transferred.

      Parameters:
      options - Configuration options for the transport (currently not used).
      Returns:
      A reactive transport that uses in-memory sockets for all communication.
    • sslTransport

      public static RxTransport<ByteBuffer,SslRxSocket> sslTransport(RxTransport<ByteBuffer,? extends RxSocket<ByteBuffer>> transport)
      Returns a reactive transport that adds an SSL layer to an underlying transport. New sockets will be published after their SSL handshake completes or fails. More specifically, for clients, RxTransport.connect(java.lang.String, int, org.forgerock.util.Options) will return an error if the handshake fails. However, for servers, RxServerSocket.accept() will publish failed incoming connections even if the handshake failed. This gives the server application the opportunity to react to the failed connection, e.g. by logging an error in the application logs.

      RxTransport.connect(java.lang.String, int, org.forgerock.util.Options) supports the following options:

      • CommonLdapOptions.SSL_OPTIONS
      RxTransport.listen(java.net.InetSocketAddress, org.forgerock.util.Options) supports the following options:
      • CommonLdapOptions.SSL_OPTIONS
      Refer to the class documentation for more information describing how ByteBuffers are transferred.
      Parameters:
      transport - The underlying transport.
      Returns:
      A reactive transport adding SSL to an underlying transport.
    • sslClientSocket

      public static SslRxSocket sslClientSocket(RxSocket<ByteBuffer> socket, Options options)
      Returns a reactive socket that adds an SSL client layer to an underlying socket using an SSLEngine obtained from the CommonLdapOptions.SSL_OPTIONS of the provided options and the remote host and port of the provided socket.

      SSL client sockets support the following options:

      • CommonLdapOptions.SSL_OPTIONS - for configuring the SSLEngine.
      Refer to the class documentation for more information describing how ByteBuffers are transferred.
      Parameters:
      socket - The underlying socket.
      options - Configuration options for the SSL socket.
      Returns:
      A reactive socket that adds an SSL client layer to an underlying socket.
    • sslServerSocket

      public static SslRxSocket sslServerSocket(RxSocket<ByteBuffer> socket, Options options)
      Returns a reactive socket that adds an SSL server layer to an underlying socket using an SSLEngine obtained from the CommonLdapOptions.SSL_OPTIONS of the provided options.

      SSL server sockets support the following options:

      • CommonLdapOptions.SSL_OPTIONS - for configuring the SSLEngine.
      Refer to the class documentation for more information describing how ByteBuffers are transferred.
      Parameters:
      socket - The underlying socket.
      options - Configuration options for the SSL socket.
      Returns:
      A reactive socket that adds an SSL server layer to an underlying socket.
    • saslClientSocket

      public static SaslRxSocket saslClientSocket(RxSocket<ByteBuffer> socket, SaslClient saslClient)
      Returns a reactive socket that adds a SASL QOP client layer to an underlying socket using the provided authenticated SaslClient. Refer to the class documentation for more information describing how ByteBuffers are transferred.
      Parameters:
      socket - The underlying socket.
      saslClient - The authenticated SASL client.
      Returns:
      A reactive socket that adds a SASL QOP client layer to an underlying socket.
    • saslServerSocket

      public static SaslRxSocket saslServerSocket(RxSocket<ByteBuffer> socket, SaslServer saslServer)
      Returns a reactive socket that adds a SASL QOP server layer to an underlying socket using the provided authenticated SaslServer. Refer to the class documentation for more information describing how ByteBuffers are transferred.
      Parameters:
      socket - The underlying socket.
      saslServer - The authenticated SASL server.
      Returns:
      A reactive socket that adds a SASL QOP server layer to an underlying socket.
    • ldapTransport

      public static RxTransport<LdapMessage,LdapSocket> ldapTransport(RxTransport<ByteBuffer,? extends RxSocket<ByteBuffer>> transport)
      Returns a reactive transport that adds an LDAP layer to an underlying transport.

      RxTransport.connect(java.lang.String, int, org.forgerock.util.Options) supports the following options:

      • CommonLdapOptions.DECODE_OPTIONS - for controlling how responses are decoded.
      • CommonLdapOptions.MAX_MSG_SIZE_IN_BYTES - the maximum response size in bytes for incoming LDAP responses.
      RxTransport.listen(java.net.InetSocketAddress, org.forgerock.util.Options) supports the following options:
      • CommonLdapOptions.DECODE_OPTIONS - for controlling how requests are decoded.
      • CommonLdapOptions.MAX_MSG_SIZE_IN_BYTES - the maximum request size in bytes for incoming LDAP requests.
      The LDAP transport supports the StartTLS extended operation. The transport's options will be combined with the SslOptions provided by the StartTlsExtendedRequest or StartTlsExtendedResult. Refer to sslTransport(RxTransport) for more information.

      Refer to the class documentation for more information describing how ByteBuffers are transferred.

      Parameters:
      transport - The underlying transport.
      Returns:
      A reactive transport adding LDAP to an underlying transport.
    • ldapMemoryTransport

      public static RxTransport<LdapMessage,LdapSocket> ldapMemoryTransport(Options options)
      Returns an in-memory reactive LDAP transport.

      This transport does not currently support any configuration options, nor does it support StartTLS or SASL QOP.

      Parameters:
      options - Configuration options for the transport (currently not used).
      Returns:
      An in-memory reactive LDAP transport.
    • ldapClientSocket

      public static LdapSocket ldapClientSocket(RxSocket<ByteBuffer> socket, Options options)
      Returns a reactive socket that adds an LDAP client layer to an underlying socket.

      LDAP sockets support the following options:

      • CommonLdapOptions.DECODE_OPTIONS - for controlling how LDAP messages are decoded.
      • CommonLdapOptions.MAX_MSG_SIZE_IN_BYTES - the maximum request size in bytes for incoming LDAP messages.
      The LDAP transport supports the StartTLS extended operation. The transport's options will be combined with with the SslOptions provided by the StartTlsExtendedRequest or StartTlsExtendedResult. Refer to sslClientSocket(RxSocket, Options) for more information.

      Refer to the class documentation for more information describing how ByteBuffers are transferred.

      Parameters:
      socket - The underlying socket.
      options - Configuration options for the LDAP socket.
      Returns:
      A reactive socket that adds an LDAP client layer to an underlying socket.
    • ldapServerSocket

      public static LdapSocket ldapServerSocket(RxSocket<ByteBuffer> socket, Options options)
      Returns a reactive socket that adds an LDAP server layer to an underlying socket.

      LDAP sockets support the following options:

      • CommonLdapOptions.DECODE_OPTIONS - for controlling how LDAP messages are decoded.
      • CommonLdapOptions.MAX_MSG_SIZE_IN_BYTES - the maximum request size in bytes for incoming LDAP messages.
      The LDAP transport supports the StartTLS extended operation. The transport's options will be combined with with the SslOptions provided by the StartTlsExtendedRequest or StartTlsExtendedResult. Refer to sslServerSocket(RxSocket, Options) for more information.

      Refer to the class documentation for more information describing how ByteBuffers are transferred.

      Parameters:
      socket - The underlying socket.
      options - Configuration options for the LDAP socket.
      Returns:
      A reactive socket that adds an LDAP server layer to an underlying socket.
    • transformTransport

      public static <D, DS extends RxSocket<D>, U, US extends RxSocket<U>> RxTransport<U,US> transformTransport(String protocol, RxTransport<D,DS> downstream, BiFunction<Single<DS>,Options,Single<US>> connectTransformer, BiFunction<Single<DS>,RxServerSocket<D,DS>,Single<US>> acceptTransformer)
      Returns a reactive transport that transforms downstream client and server reactive sockets of type D to upstream sockets of type U. This method facilitates creation of intermediate filtering transports that may perform functions like encryption, compression, or protocol encoding and decoding.
      Type Parameters:
      D - The downstream transport message type.
      DS - The type of reactive sockets exposed by the downstream transport.
      U - The upstream transport message type.
      US - The type of reactive sockets exposed by the upstream transport.
      Parameters:
      protocol - The name of the protocol implemented by the transformed transport, e.g. SSL.
      downstream - The downstream transport.
      connectTransformer - A function which transforms the singles returned by RxTransport.connect(java.lang.String, int, org.forgerock.util.Options) into their upstream form.
      acceptTransformer - A function which transforms the singles returned by RxServerSocket.accept() into their upstream form.
      Returns:
      The transformed transport.
    • handleUndeliverableRxErrors

      public static void handleUndeliverableRxErrors(Throwable t)
      This method is suitable for use as an RX Java global error handler. It logs errors that are likely to be caused by bugs. The global RX Java error handler is invoked by RX Java when an RX operator is unable to deliver an exception downstream, usually because the subscriber has already cancelled. It can be configured using the following code:
      
       RxJavaPlugins.setErrorHandler(RxIo::handleUndeliverableRxExceptionsErrors);
       
      Parameters:
      t - The undeliverable exception.
      See Also: