Class ReplicationDomain

  • Direct Known Subclasses:
    LDAPReplicationDomain

    public abstract class ReplicationDomain
    extends Object
    This class should be used as a base for Replication implementations.

    It is intended that developer in need of a replication mechanism subclass this class with their own implementation.

    The startup phase of the ReplicationDomain subclass, should read the list of replication servers from the configuration, instantiate a ServerState then start the publish service by calling startPublishService(). At this point it can start calling the publish(ReplicationMsg) method if needed.

    When the startup phase reach the point when the subclass is ready to handle updates the Replication Domain implementation should call the startListenService() method. At this point a Listener thread is created on the Replication Service and which can start receiving updates.

    When updates are received the Replication Service calls the dispatchUpdateForReplay(UpdateMsg) method. ReplicationDomain implementation should implement the appropriate code for replaying the update on the local repository. When fully done the subclass must call the processUpdateAfterReplay(UpdateMsg, long) method. This allows to process the update asynchronously if necessary.

    To propagate changes to other replica, a ReplicationDomain implementation must use the publish(ReplicationMsg) method.

    If the Full Initialization process is needed then implementation for importBackend(InputStream) and exportBackend(OutputStream) must be provided.

    Full Initialization of a replica can be triggered by LDAP clients by creating InitializeTasks or InitializeTargetTask. Full initialization can also be triggered from the ReplicationDomain implementation using methods initializeRemote(ReplicaId, Task), initializeRemote(ReplicaId, ReplicaId, Task, int) or initializeFromRemote(ReplicaId, Task).

    At shutdown time, the suspendService() method should be called to cleanly stop the replication service.

    • Field Detail

      • config

        protected volatile ReplicationDomainCfg config
        The configuration of the replication domain.
      • replicationPurgeDelayInMs

        protected volatile long replicationPurgeDelayInMs
        The replication purge delay is available as Long from the config object. Since the value is retrieved for every update, avoid unboxing performance issues by using a dedicated long field.
      • broker

        protected ReplicationBroker broker
        The ReplicationBroker that is used by this ReplicationDomain to connect to the ReplicationService.
      • generationId

        protected volatile GenerationId generationId
        The generationId for this replication domain. It is made of a hash of the 1000 first entries for this domain.
      • serverContext

        protected final ServerContext serverContext
        The directory server context.
      • canPersistROM

        protected volatile boolean canPersistROM
        Decides whether to include the ReplicaOfflineMsg (ROM) sent by the local replica in the persisted state.

        The ROM should not be part of the persisted state while the replica is recovering updates from the connected RS. Since the RS state is sent as part of the response to the start message, we can detect the case and only persist the ROM when the replica has finished receiving its own updates or there are no updates to receive. See OPENDJ-8062 and OPENDJ-8992 for details.

    • Constructor Detail

      • ReplicationDomain

        public ReplicationDomain​(ServerContext serverContext,
                                 ReplicationDomainCfg config,
                                 GenerationId generationId)
        Creates a ReplicationDomain with the provided parameters.
        Parameters:
        serverContext - The directory server context
        config - The configuration object for this ReplicationDomain
        generationId - the generation of this ReplicationDomain
    • Method Detail

      • getHealthStatus

        public HealthStatus getHealthStatus​(long delayThresholdMs)
        Returns the health status based on the current replication delay. If the delay is above the provided threshold, the replication domain is considered not healthy.
        Parameters:
        delayThresholdMs - The maximum replication delay in milliseconds for considering this replication domain healthy.
        Returns:
        This replication domain health status.
      • getCsnGenerator

        protected CSNGenerator getCsnGenerator()
        Returns the CSNGenerator that will be used to generate CSN for this domain.
        Returns:
        The CSNGenerator that will be used to generate CSN for this domain.
      • sessionInitiated

        public void sessionInitiated​(ServerStatus initStatus,
                                     ServerState rsState,
                                     ServerState newestChangelogStateOfReplica)
        Set the initial status of the domain and perform necessary initializations. This method will be called by the Broker each time the ReplicationBroker establish a new session to a Replication Server. Implementations may override this method when they need to perform additional computing after session establishment. The default implementation should be sufficient for ReplicationDomains that don't need to perform additional computing.
        Parameters:
        initStatus - The status to enter the state machine with.
        rsState - The ServerState of the ReplicationServer with which the session was established.
        newestChangelogStateOfReplica - The newest known ServerState of the change-log for this replica across the contacted ReplicationServers
      • getStatus

        public ServerStatus getStatus()
        Gets the status for this domain.
        Returns:
        The status for this domain.
      • getBaseDN

        public Dn getBaseDN()
        Returns the base DN of this ReplicationDomain. All Replication Domain using this baseDN will be connected through the Replication Service.
        Returns:
        The base DN of this ReplicationDomain
      • getReplicaId

        public ReplicaId getReplicaId()
        Get the replica ID which identifies this Replication Domain inside the Replication Service.
        Returns:
        The replica ID.
      • recreateRemoteReplicasFromState

        protected void recreateRemoteReplicasFromState()
        Rebuilds Remote DS lists after a reset of the domain state, either at startup or domain re-initialization.
      • updateState

        protected void updateState()
        Update the server state if necessary.
      • decodeTarget

        public ReplicaId decodeTarget​(String targetString)
                               throws LdapException
        Verifies that the given string represents a valid source from which this server can be initialized.
        Parameters:
        targetString - The string representing the source
        Returns:
        The source as a integer value
        Throws:
        LdapException - if the string is not valid
      • initializeRemote

        public void initializeRemote​(ReplicaId target,
                                     Task initTask)
                              throws LdapException
        Initializes a remote server from this server.

        The exportBackend(OutputStream) will therefore be called on this server, and the importBackend(InputStream) will be called on the remote server.

        The InputStream and OutputStream given as a parameter to those methods will be connected through the replication protocol.

        Parameters:
        target - The server-id of the server that should be initialized. The target can be discovered using the getReplicaInfos() method.
        initTask - The task that triggers this initialization and that should be updated with its progress.
        Throws:
        LdapException - If it was not possible to publish the Initialization message to the Topology.
      • initializeRemote

        protected void initializeRemote​(ReplicaId replicaToInitialize,
                                        ReplicaId replicaRunningTheTask,
                                        Task initTask,
                                        int initWindow)
                                 throws LdapException
        Process the initialization of some other server or servers in the topology specified by the target argument when this initialization specifying the server that requests the initialization.
        Parameters:
        replicaToInitialize - The target replica that should be initialized.
        replicaRunningTheTask - The replica that initiated the export. It can be the replica id of this server, or the replica id of a remote replica.
        initTask - The task in this server that triggers this initialization and that should be updated with its progress. Null when the export is done following a request coming from a remote server (task is remote).
        initWindow - The value of the initialization window for flow control between the importer and the exporter.
        Throws:
        LdapException - When an error occurs. No exception raised means success.
      • getServerState

        public ServerState getServerState()
        Get the ServerState maintained by the Concrete class.
        Returns:
        the ServerState maintained by the Concrete class.
      • receiveEntryBytes

        protected byte[] receiveEntryBytes()
        Receives bytes related to an entry in the context of an import to initialize the domain (called by ReplLDIFInputStream).
        Returns:
        The bytes. Null when the Done or Err message has been received
      • initializeFromRemote

        public void initializeFromRemote​(ReplicaId source,
                                         Task initTask)
                                  throws LdapException
        Initializes asynchronously this domain from a remote source server. Before returning from this call, for the provided task :
        1. the progressing counters are updated during the initialization using setTotal() and setLeft().
        2. the end of the initialization using updateTaskCompletionState().

        When this method is called, a request for initialization is sent to the remote source server requesting initialization.

        Parameters:
        source - The server-id of the source from which to initialize. The source can be discovered using the getReplicaInfos() method.
        initTask - The task that launched the initialization and should be updated of its progress.
        Throws:
        LdapException - If it was not possible to publish the Initialization message to the Topology. The task state is updated.
      • toBadDataStatus

        protected void toBadDataStatus()
        Sets the status to BAD_DATA status.
      • toTooLateStatus

        protected void toTooLateStatus()
        Sets the status to too late status.
      • ieRunning

        public boolean ieRunning()
        Returns a boolean indicating if an import or export is currently processed.
        Returns:
        The status
      • resetGenerationId

        public void resetGenerationId​(GenerationId generationIdNewValue)
                               throws LdapException
        Reset the generationId of this domain in the whole topology. A message is sent to the Replication Servers for them to reset their change dbs.
        Parameters:
        generationIdNewValue - The new value of the generation Id.
        Throws:
        LdapException - When an error occurs
      • isConnected

        public boolean isConnected()
        Check if the domain is connected to a ReplicationServer.
        Returns:
        true if the server is connected, false if not.
      • hasConnectionError

        protected boolean hasConnectionError()
        Check if the domain has a connection error. A Connection error happens when the broker could not be created or when the broker could not find any ReplicationServer to connect to.
        Returns:
        true if the domain has a connection error.
      • replicaIsTooLate

        protected boolean replicaIsTooLate()
        Returns whether the connected RS has notified this replica to be too late with respect to changelog contents.
        Returns:
        whether the connected RS has notified this replica to be too late
      • getReplicationServerPort

        public int getReplicationServerPort()
        Get the port of the replicationServer to which this domain is currently connected.
        Returns:
        the replication server port to which this domain is currently connected.
      • startPublishService

        protected void startPublishService()
        Start the publish mechanism of the Replication Service. After this method has been called, the publish service can be used by calling the publish(ReplicationMsg) method.
      • startListenService

        protected void startListenService()
        Starts the receiver side of the Replication Service.

        After this method has been called, the Replication Service will start calling the dispatchUpdateForReplay(UpdateMsg).

        This method must be called once and must be called after the startPublishService().

      • suspendService

        public void suspendService()
        Temporarily suspend the Replication Service. The Replication Service can be resumed again using resumeService().

        It can be useful to suspend the Replication Service when the repository where the replicated information is stored becomes temporarily unavailable and replicated updates can therefore not be replayed during a while. This method is not MT safe.

      • isListenerShuttingDown

        protected final boolean isListenerShuttingDown()
        Returns true if the listener thread is shutting down or has shutdown.
        Returns:
        true if the listener thread is shutting down or has shutdown.
      • isSuspending

        protected boolean isSuspending()
        Return true if the domain is being suspended.
        Returns:
        true if the domain is being suspended
      • resumeService

        protected void resumeService()
        Restart the Replication service after a suspendService().

        The Replication Service will restart from the point indicated by the ServerState that was given as a parameter to the startPublishService() at startup time.

        If some data have changed in the repository during the period of time when the Replication Service was suspended, this ServerState should therefore be updated by the Replication Domain subclass before calling this method. This method is not MT safe.

      • changeConfig

        protected boolean changeConfig​(ReplicationDomainCfg config)
        Change some ReplicationDomain parameters.
        Parameters:
        config - The new configuration that this domain should now use.
        Returns:
        whether a restart of the service is needed
      • exportBackend

        protected abstract void exportBackend​(OutputStream output)
                                       throws LdapException
        This method should trigger an export of the replicated data. to the provided outputStream. When finished the outputStream should be flushed and closed.
        Parameters:
        output - The OutputStream where the export should be produced.
        Throws:
        LdapException - When needed.
      • importBackend

        protected abstract void importBackend​(InputStream input)
                                       throws LdapException
        This method should trigger an import of the replicated data.
        Parameters:
        input - The InputStream from which the import should be reading entries.
        Throws:
        LdapException - When needed.
      • countEntries

        public abstract long countEntries()
                                   throws LdapException
        This method should return the total number of objects in the replicated domain. This count will be used for reporting.
        Returns:
        The number of objects in the replication domain.
        Throws:
        LdapException - when needed.
      • dispatchUpdateForReplay

        public abstract void dispatchUpdateForReplay​(UpdateMsg updateMsg)
        This method ensures this UpdateMsg received from remote replication entities will be replayed, by dispatching it to the replay threads.
        Parameters:
        updateMsg - The UpdateMsg to replay.
      • processUpdateAfterReplay

        protected final void processUpdateAfterReplay​(UpdateMsg msg,
                                                      long etimeNanos)
        This method must be called after each call to dispatchUpdateForReplay(UpdateMsg) when the processing of the update is completed.

        It is useful for implementation needing to process the update in an asynchronous way or using several threads, but must be called even by implementation doing it in a synchronous, single-threaded way.

        Parameters:
        msg - The UpdateMsg whose processing was completed.
        etimeNanos - The time it took to replay the update, in nanoseconds
      • publish

        public void publish​(ReplicationMsg msg)
        Publish an UpdateMsg to the Replication Service.

        The Replication Service will handle the delivery of this UpdateMsg to all the participants of this Replication Domain. These members will be receive this UpdateMsg through a call of the dispatchUpdateForReplay(UpdateMsg) message.

        Parameters:
        msg - The UpdateMsg that should be published.
      • publishHeartbeatMsg

        public void publishHeartbeatMsg()
        Publishes a heartbeat message if all pending changes for current replica have been sent out.
      • publishReplicaOfflineMsg

        public void publishReplicaOfflineMsg()
        Publishes a replica offline message if all pending changes for current replica have been sent out.
      • getGenerationID

        public GenerationId getGenerationID()
        This method should return the generationID to use for this ReplicationDomain. This method can be called at any time after the ReplicationDomain has been started.
        Returns:
        The GenerationID.
      • addAdditionalMonitoring

        public void addAdditionalMonitoring​(MeterRegistryHolder registry)
        Subclasses should use this method to add additional monitoring information in the ReplicationDomain.
        Parameters:
        registry - where to additional monitoring attributes
      • getImportExportContext

        protected ReplicationDomain.ImportExportContext getImportExportContext()
        Returns the Import/Export context associated to this ReplicationDomain.
        Returns:
        the Import/Export context associated to this ReplicationDomain
      • setEclIncludes

        public boolean setEclIncludes​(ReplicaId replicaId,
                                      Set<String> includeAttributes,
                                      Set<String> includeAttributesForDeletes)
        Set the attributes configured on a server to be included in the ECL.
        Parameters:
        replicaId - Server where these attributes are configured.
        includeAttributes - Attributes to be included with all change records, may include wild-cards.
        includeAttributesForDeletes - Additional attributes to be included with delete change records, may include wild-cards.
        Returns:
        true if the set of attributes was modified.
      • getEclIncludes

        public Set<String> getEclIncludes()
        Get the attributes to include in each change for the ECL.
        Returns:
        The attributes to include in each change for the ECL.
      • getEclIncludesForDeletes

        protected Set<String> getEclIncludesForDeletes()
        Get the attributes to include in each delete change for the ECL.
        Returns:
        The attributes to include in each delete change for the ECL.
      • getLastLocalChange

        public CSN getLastLocalChange()
        Returns the CSN of the last Change that was fully processed by this ReplicationDomain.
        Returns:
        The CSN of the last Change that was fully processed by this ReplicationDomain.
      • trace

        protected void trace​(String msg,
                             Object... args)
        Logs at trace level a message for this replication domain.
        Parameters:
        msg - the message
        args - the arguments of the message
      • trace

        protected void trace​(LocalizableMessageDescriptor.Arg0 msg)
        Logs at trace level a message for this replication domain.
        Parameters:
        msg - a localizable message