Class ReplicationDomain

java.lang.Object
org.opends.server.replication.service.ReplicationDomain
Direct Known Subclasses:
LDAPReplicationDomain

public abstract class ReplicationDomain extends Object
This class should be used as a base for Replication implementations.

It is intended that developer in need of a replication mechanism subclass this class with their own implementation.

The startup phase of the ReplicationDomain subclass, should read the list of replication servers from the configuration, instantiate a ServerState then start the publish service by calling startPublishService(). At this point it can start calling the publish(ReplicationMsg) method if needed.

When the startup phase reach the point when the subclass is ready to handle updates the Replication Domain implementation should call the startListenService() method. At this point a Listener thread is created on the Replication Service and which can start receiving updates.

When updates are received the Replication Service calls the dispatchUpdateForReplay(UpdateMsg) method. ReplicationDomain implementation should implement the appropriate code for replaying the update on the local repository. When fully done the subclass must call the processUpdateAfterReplay(UpdateMsg, long) method. This allows to process the update asynchronously if necessary.

To propagate changes to other replica, a ReplicationDomain implementation must use the publish(ReplicationMsg) method.

If the Full Initialization process is needed then implementation for importBackend(InputStream) and exportBackend(OutputStream) must be provided.

Full Initialization of a replica can be triggered by LDAP clients by creating InitializeTasks or InitializeTargetTask. Full initialization can also be triggered from the ReplicationDomain implementation using methods initializeRemote(ReplicaId, Task), initializeRemote(ReplicaId, ReplicaId, Task, int) or initializeFromRemote(ReplicaId, Task).

At shutdown time, the suspendService() method should be called to cleanly stop the replication service.

  • Field Details

    • config

      protected volatile ReplicationDomainCfg config
      The configuration of the replication domain.
    • replicationPurgeDelayInMs

      protected volatile long replicationPurgeDelayInMs
      The replication purge delay is available as Long from the config object. Since the value is retrieved for every update, avoid unboxing performance issues by using a dedicated long field.
    • broker

      protected ReplicationBroker broker
      The ReplicationBroker that is used by this ReplicationDomain to connect to the ReplicationService.
    • generationId

      protected volatile GenerationId generationId
      The generationId for this replication domain. It is made of a hash of the 1000 first entries for this domain.
    • serverContext

      protected final ServerContext serverContext
      The directory server context.
    • canPersistROM

      protected volatile boolean canPersistROM
      Decides whether to include the ReplicaOfflineMsg (ROM) sent by the local replica in the persisted state.

      The ROM should not be part of the persisted state while the replica is recovering updates from the connected RS. Since the RS state is sent as part of the response to the start message, we can detect the case and only persist the ROM when the replica has finished receiving its own updates or there are no updates to receive. See OPENDJ-8062 and OPENDJ-8992 for details.

  • Constructor Details

    • ReplicationDomain

      public ReplicationDomain(ServerContext serverContext, ReplicationDomainCfg config, GenerationId generationId)
      Creates a ReplicationDomain with the provided parameters.
      Parameters:
      serverContext - The directory server context
      config - The configuration object for this ReplicationDomain
      generationId - the generation of this ReplicationDomain
  • Method Details

    • getHealthStatus

      public HealthStatus getHealthStatus(long delayThresholdMs)
      Returns the health status based on the current replication delay. If the delay is above the provided threshold, the replication domain is considered not healthy.
      Parameters:
      delayThresholdMs - The maximum replication delay in milliseconds for considering this replication domain healthy.
      Returns:
      This replication domain health status.
    • getCsnGenerator

      protected CSNGenerator getCsnGenerator()
      Returns the CSNGenerator that will be used to generate CSN for this domain.
      Returns:
      The CSNGenerator that will be used to generate CSN for this domain.
    • sessionInitiated

      public void sessionInitiated(ServerStatus initStatus, ServerState rsState, ServerState newestChangelogStateOfReplica)
      Set the initial status of the domain and perform necessary initializations. This method will be called by the Broker each time the ReplicationBroker establish a new session to a Replication Server. Implementations may override this method when they need to perform additional computing after session establishment. The default implementation should be sufficient for ReplicationDomains that don't need to perform additional computing.
      Parameters:
      initStatus - The status to enter the state machine with.
      rsState - The ServerState of the ReplicationServer with which the session was established.
      newestChangelogStateOfReplica - The newest known ServerState of the change-log for this replica across the contacted ReplicationServers
    • getStatus

      public ServerStatus getStatus()
      Gets the status for this domain.
      Returns:
      The status for this domain.
    • getBaseDN

      public Dn getBaseDN()
      Returns the base DN of this ReplicationDomain. All Replication Domain using this baseDN will be connected through the Replication Service.
      Returns:
      The base DN of this ReplicationDomain
    • getReplicaId

      public ReplicaId getReplicaId()
      Get the replica ID which identifies this Replication Domain inside the Replication Service.
      Returns:
      The replica ID.
    • recreateRemoteReplicasFromState

      protected void recreateRemoteReplicasFromState()
      Rebuilds Remote DS lists after a reset of the domain state, either at startup or domain re-initialization.
    • updateState

      protected void updateState()
      Update the server state if necessary.
    • decodeTarget

      public ReplicaId decodeTarget(String targetString) throws LdapException
      Verifies that the given string represents a valid source from which this server can be initialized.
      Parameters:
      targetString - The string representing the source
      Returns:
      The source as a integer value
      Throws:
      LdapException - if the string is not valid
    • initializeRemote

      public void initializeRemote(ReplicaId target, Task initTask) throws LdapException
      Initializes a remote server from this server.

      The exportBackend(OutputStream) will therefore be called on this server, and the importBackend(InputStream) will be called on the remote server.

      The InputStream and OutputStream given as a parameter to those methods will be connected through the replication protocol.

      Parameters:
      target - The server-id of the server that should be initialized. The target can be discovered using the getReplicaInfos() method.
      initTask - The task that triggers this initialization and that should be updated with its progress.
      Throws:
      LdapException - If it was not possible to publish the Initialization message to the Topology.
    • initializeRemote

      protected void initializeRemote(ReplicaId replicaToInitialize, ReplicaId replicaRunningTheTask, Task initTask, int initWindow) throws LdapException
      Process the initialization of some other server or servers in the topology specified by the target argument when this initialization specifying the server that requests the initialization.
      Parameters:
      replicaToInitialize - The target replica that should be initialized.
      replicaRunningTheTask - The replica that initiated the export. It can be the replica id of this server, or the replica id of a remote replica.
      initTask - The task in this server that triggers this initialization and that should be updated with its progress. Null when the export is done following a request coming from a remote server (task is remote).
      initWindow - The value of the initialization window for flow control between the importer and the exporter.
      Throws:
      LdapException - When an error occurs. No exception raised means success.
    • getServerState

      public ServerState getServerState()
      Get the ServerState maintained by the Concrete class.
      Returns:
      the ServerState maintained by the Concrete class.
    • receiveEntryBytes

      protected byte[] receiveEntryBytes()
      Receives bytes related to an entry in the context of an import to initialize the domain (called by ReplLDIFInputStream).
      Returns:
      The bytes. Null when the Done or Err message has been received
    • initializeFromRemote

      public void initializeFromRemote(ReplicaId source, Task initTask) throws LdapException
      Initializes asynchronously this domain from a remote source server. Before returning from this call, for the provided task :
      1. the progressing counters are updated during the initialization using setTotal() and setLeft().
      2. the end of the initialization using updateTaskCompletionState().

      When this method is called, a request for initialization is sent to the remote source server requesting initialization.

      Parameters:
      source - The server-id of the source from which to initialize. The source can be discovered using the getReplicaInfos() method.
      initTask - The task that launched the initialization and should be updated of its progress.
      Throws:
      LdapException - If it was not possible to publish the Initialization message to the Topology. The task state is updated.
    • toBadDataStatus

      protected void toBadDataStatus()
      Sets the status to BAD_DATA status.
    • ieRunning

      public boolean ieRunning()
      Returns a boolean indicating if an import or export is currently processed.
      Returns:
      The status
    • resetGenerationId

      public void resetGenerationId(GenerationId generationIdNewValue) throws LdapException
      Reset the generationId of this domain in the whole topology. A message is sent to the Replication Servers for them to reset their change dbs.
      Parameters:
      generationIdNewValue - The new value of the generation Id.
      Throws:
      LdapException - When an error occurs
    • isConnected

      public boolean isConnected()
      Check if the domain is connected to a ReplicationServer.
      Returns:
      true if the server is connected, false if not.
    • getReplicationServerPort

      public int getReplicationServerPort()
      Get the port of the replicationServer to which this domain is currently connected.
      Returns:
      the replication server port to which this domain is currently connected.
    • getReplicationServerId

      public ReplicationServerId getReplicationServerId()
      Get the replication server ID to which this domain is currently connected.
      Returns:
      the replication server ID to which this domain is currently connected.
    • startPublishService

      protected void startPublishService()
      Start the publish mechanism of the Replication Service. After this method has been called, the publish service can be used by calling the publish(ReplicationMsg) method.
    • startListenService

      protected void startListenService()
      Starts the receiver side of the Replication Service.

      After this method has been called, the Replication Service will start calling the dispatchUpdateForReplay(UpdateMsg).

      This method must be called once and must be called after the startPublishService().

    • suspendService

      public void suspendService()
      Temporarily suspend the Replication Service. The Replication Service can be resumed again using resumeService().

      It can be useful to suspend the Replication Service when the repository where the replicated information is stored becomes temporarily unavailable and replicated updates can therefore not be replayed during a while. This method is not MT safe.

    • isListenerShuttingDown

      protected final boolean isListenerShuttingDown()
      Returns true if the listener thread is shutting down or has shutdown.
      Returns:
      true if the listener thread is shutting down or has shutdown.
    • isSuspending

      protected boolean isSuspending()
      Return true if the domain is being suspended.
      Returns:
      true if the domain is being suspended
    • resumeService

      protected void resumeService()
      Restart the Replication service after a suspendService().

      The Replication Service will restart from the point indicated by the ServerState that was given as a parameter to the startPublishService() at startup time.

      If some data have changed in the repository during the period of time when the Replication Service was suspended, this ServerState should therefore be updated by the Replication Domain subclass before calling this method. This method is not MT safe.

    • changeConfig

      protected boolean changeConfig(ReplicationDomainCfg config)
      Change some ReplicationDomain parameters.
      Parameters:
      config - The new configuration that this domain should now use.
      Returns:
      whether a restart of the service is needed
    • exportBackend

      protected abstract void exportBackend(OutputStream output) throws LdapException
      This method should trigger an export of the replicated data. to the provided outputStream. When finished the outputStream should be flushed and closed.
      Parameters:
      output - The OutputStream where the export should be produced.
      Throws:
      LdapException - When needed.
    • importBackend

      protected abstract void importBackend(InputStream input) throws LdapException
      This method should trigger an import of the replicated data.
      Parameters:
      input - The InputStream from which the import should be reading entries.
      Throws:
      LdapException - When needed.
    • countEntries

      public abstract long countEntries() throws LdapException
      This method should return the total number of objects in the replicated domain. This count will be used for reporting.
      Returns:
      The number of objects in the replication domain.
      Throws:
      LdapException - when needed.
    • dispatchUpdateForReplay

      public abstract void dispatchUpdateForReplay(UpdateMsg updateMsg)
      This method ensures this UpdateMsg received from remote replication entities will be replayed, by dispatching it to the replay threads.
      Parameters:
      updateMsg - The UpdateMsg to replay.
    • processUpdateAfterReplay

      protected final void processUpdateAfterReplay(UpdateMsg msg, long etimeNanos)
      This method must be called after each call to dispatchUpdateForReplay(UpdateMsg) when the processing of the update is completed.

      It is useful for implementation needing to process the update in an asynchronous way or using several threads, but must be called even by implementation doing it in a synchronous, single-threaded way.

      Parameters:
      msg - The UpdateMsg whose processing was completed.
      etimeNanos - The time it took to replay the update, in nanoseconds
    • publish

      public void publish(ReplicationMsg msg)
      Publish an UpdateMsg to the Replication Service.

      The Replication Service will handle the delivery of this UpdateMsg to all the participants of this Replication Domain. These members will be receive this UpdateMsg through a call of the dispatchUpdateForReplay(UpdateMsg) message.

      Parameters:
      msg - The UpdateMsg that should be published.
    • publishHeartbeatMsg

      public void publishHeartbeatMsg()
      Publishes a heartbeat message if all pending changes for current replica have been sent out.
    • publishReplicaOfflineMsg

      public void publishReplicaOfflineMsg()
      Publishes a replica offline message if all pending changes for current replica have been sent out.
    • getGenerationID

      public GenerationId getGenerationID()
      This method should return the generationID to use for this ReplicationDomain. This method can be called at any time after the ReplicationDomain has been started.
      Returns:
      The GenerationID.
    • addMetricsForLocalReplica

      public void addMetricsForLocalReplica(MeterRegistryHolder registry)
      Subclasses should use this method to add additional monitoring information in the ReplicationDomain.
      Parameters:
      registry - where to additional monitoring attributes
    • addMetricsForRemoteReplica

      public void addMetricsForRemoteReplica(MeterRegistryHolder registry, ReplicaId remoteReplicaId)
      Subclasses should use this method to add additional monitoring information in the Remote Replica Monitor.
      Parameters:
      registry - where to additional monitoring attributes
      remoteReplicaId - the remote replica for which monitoring data is requested
    • getImportExportContext

      protected ReplicationDomain.ImportExportContext getImportExportContext()
      Returns the Import/Export context associated to this ReplicationDomain.
      Returns:
      the Import/Export context associated to this ReplicationDomain
    • setEclIncludes

      public boolean setEclIncludes(ReplicaId replicaId, Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
      Set the attributes configured on a server to be included in the ECL.
      Parameters:
      replicaId - Server where these attributes are configured.
      includeAttributes - Attributes to be included with all change records, may include wild-cards.
      includeAttributesForDeletes - Additional attributes to be included with delete change records, may include wild-cards.
      Returns:
      true if the set of attributes was modified.
    • getEclIncludes

      public Set<String> getEclIncludes()
      Get the attributes to include in each change for the ECL.
      Returns:
      The attributes to include in each change for the ECL.
    • getEclIncludesForDeletes

      protected Set<String> getEclIncludesForDeletes()
      Get the attributes to include in each delete change for the ECL.
      Returns:
      The attributes to include in each delete change for the ECL.
    • getLastLocalChange

      public CSN getLastLocalChange()
      Returns the CSN of the last Change that was fully processed by this ReplicationDomain.
      Returns:
      The CSN of the last Change that was fully processed by this ReplicationDomain.
    • toString

      public String toString()
      Overrides:
      toString in class Object