Class ReplicationDomain
- Direct Known Subclasses:
LDAPReplicationDomain
It is intended that developer in need of a replication mechanism subclass this class with their own implementation.
The startup phase of the ReplicationDomain subclass, should read the list of replication servers from the
configuration, instantiate a ServerState
then start the publish service by calling
startPublishService()
. At this point it can start calling the publish(ReplicationMsg)
method if needed.
When the startup phase reach the point when the subclass is ready to handle updates the Replication Domain
implementation should call the startListenService()
method. At this point a Listener thread is created on
the Replication Service and which can start receiving updates.
When updates are received the Replication Service calls the dispatchUpdateForReplay(UpdateMsg)
method.
ReplicationDomain implementation should implement the appropriate code for replaying the update on the local
repository. When fully done the subclass must call the processUpdateAfterReplay(UpdateMsg, long)
method.
This allows to process the update asynchronously if necessary.
To propagate changes to other replica, a ReplicationDomain implementation must use the
publish(ReplicationMsg)
method.
If the Full Initialization process is needed then implementation for importBackend(InputStream)
and
exportBackend(OutputStream)
must be provided.
Full Initialization of a replica can be triggered by LDAP clients by creating InitializeTasks or
InitializeTargetTask. Full initialization can also be triggered from the ReplicationDomain implementation using
methods initializeRemote(ReplicaId, Task)
, initializeRemote(ReplicaId, ReplicaId, Task, int)
or initializeFromRemote(ReplicaId, Task)
.
At shutdown time, the suspendService()
method should be called to cleanly stop the replication service.
-
Nested Class Summary
Modifier and TypeClassDescriptionprotected static final class
This class contains the context related to an import or export launched on the domain. -
Field Summary
Modifier and TypeFieldDescriptionprotected ReplicationBroker
The ReplicationBroker that is used by this ReplicationDomain to connect to the ReplicationService.protected boolean
Decides whether to include theReplicaOfflineMsg
(ROM) sent by the local replica in the persisted state.protected ReplicationDomainCfg
The configuration of the replication domain.protected GenerationId
The generationId for this replication domain.protected long
The replication purge delay is available asLong
from theconfig
object.protected final ServerContext
The directory server context. -
Constructor Summary
ConstructorDescriptionReplicationDomain
(ServerContext serverContext, ReplicationDomainCfg config, GenerationId generationId) Creates a ReplicationDomain with the provided parameters. -
Method Summary
Modifier and TypeMethodDescriptionvoid
addMetricsForLocalReplica
(MeterRegistryHolder registry) Subclasses should use this method to add additional monitoring information in the ReplicationDomain.void
addMetricsForRemoteReplica
(MeterRegistryHolder registry, ReplicaId remoteReplicaId) Subclasses should use this method to add additional monitoring information in the Remote Replica Monitor.protected boolean
changeConfig
(ReplicationDomainCfg config) Change some ReplicationDomain parameters.abstract long
This method should return the total number of objects in the replicated domain.decodeTarget
(String targetString) Verifies that the given string represents a valid source from which this server can be initialized.abstract void
dispatchUpdateForReplay
(UpdateMsg updateMsg) This method ensures thisUpdateMsg
received from remote replication entities will be replayed, by dispatching it to the replay threads.protected abstract void
exportBackend
(OutputStream output) This method should trigger an export of the replicated data.Returns the base DN of this ReplicationDomain.protected CSNGenerator
Returns theCSNGenerator
that will be used to generateCSN
for this domain.Get the attributes to include in each change for the ECL.Get the attributes to include in each delete change for the ECL.This method should return the generationID to use for this ReplicationDomain.getHealthStatus
(long delayThresholdMs) Returns the health status based on the current replication delay.protected ReplicationDomain.ImportExportContext
Returns the Import/Export context associated to this ReplicationDomain.Returns the CSN of the last Change that was fully processed by this ReplicationDomain.Get the replica ID which identifies this Replication Domain inside the Replication Service.Get the replication server ID to which this domain is currently connected.int
Get the port of the replicationServer to which this domain is currently connected.Get the ServerState maintained by the Concrete class.Gets the status for this domain.boolean
Returns a boolean indicating if an import or export is currently processed.protected abstract void
importBackend
(InputStream input) This method should trigger an import of the replicated data.void
initializeFromRemote
(ReplicaId source, Task initTask) Initializes asynchronously this domain from a remote source server.void
initializeRemote
(ReplicaId target, Task initTask) Initializes a remote server from this server.protected void
initializeRemote
(ReplicaId replicaToInitialize, ReplicaId replicaRunningTheTask, Task initTask, int initWindow) Process the initialization of some other server or servers in the topology specified by the target argument when this initialization specifying the server that requests the initialization.boolean
Check if the domain is connected to a ReplicationServer.protected final boolean
Returnstrue
if the listener thread is shutting down or has shutdown.protected boolean
Returntrue
if the domain is being suspended.protected final void
processUpdateAfterReplay
(UpdateMsg msg, long etimeNanos) This method must be called after each call todispatchUpdateForReplay(UpdateMsg)
when the processing of the update is completed.void
publish
(ReplicationMsg msg) Publish anUpdateMsg
to the Replication Service.void
Publishes a heartbeat message if all pending changes for current replica have been sent out.void
Publishes a replica offline message if all pending changes for current replica have been sent out.protected byte[]
Receives bytes related to an entry in the context of an import to initialize the domain (called by ReplLDIFInputStream).protected void
Rebuilds Remote DS lists after a reset of the domain state, either at startup or domain re-initialization.void
resetGenerationId
(GenerationId generationIdNewValue) Reset the generationId of this domain in the whole topology.protected void
Restart the Replication service after asuspendService()
.void
sessionInitiated
(ServerStatus initStatus, ServerState rsState, ServerState newestChangelogStateOfReplica) Set the initial status of the domain and perform necessary initializations.boolean
setEclIncludes
(ReplicaId replicaId, Set<String> includeAttributes, Set<String> includeAttributesForDeletes) Set the attributes configured on a server to be included in the ECL.protected void
Starts the receiver side of the Replication Service.protected void
Start the publish mechanism of the Replication Service.void
Temporarily suspend the Replication Service.protected void
Sets the status to BAD_DATA status.toString()
protected void
Update the server state if necessary.
-
Field Details
-
config
The configuration of the replication domain. -
replicationPurgeDelayInMs
protected volatile long replicationPurgeDelayInMsThe replication purge delay is available asLong
from theconfig
object. Since the value is retrieved for every update, avoid unboxing performance issues by using a dedicated long field. -
broker
The ReplicationBroker that is used by this ReplicationDomain to connect to the ReplicationService. -
generationId
The generationId for this replication domain. It is made of a hash of the 1000 first entries for this domain. -
serverContext
The directory server context. -
canPersistROM
protected volatile boolean canPersistROMDecides whether to include theReplicaOfflineMsg
(ROM) sent by the local replica in the persisted state.The ROM should not be part of the persisted state while the replica is recovering updates from the connected RS. Since the RS state is sent as part of the response to the start message, we can detect the case and only persist the ROM when the replica has finished receiving its own updates or there are no updates to receive. See OPENDJ-8062 and OPENDJ-8992 for details.
-
-
Constructor Details
-
ReplicationDomain
public ReplicationDomain(ServerContext serverContext, ReplicationDomainCfg config, GenerationId generationId) Creates a ReplicationDomain with the provided parameters.- Parameters:
serverContext
- The directory server contextconfig
- The configuration object for this ReplicationDomaingenerationId
- the generation of this ReplicationDomain
-
-
Method Details
-
getHealthStatus
Returns the health status based on the current replication delay. If the delay is above the provided threshold, the replication domain is considered not healthy.- Parameters:
delayThresholdMs
- The maximum replication delay in milliseconds for considering this replication domain healthy.- Returns:
- This replication domain health status.
-
getCsnGenerator
Returns theCSNGenerator
that will be used to generateCSN
for this domain.- Returns:
- The
CSNGenerator
that will be used to generateCSN
for this domain.
-
sessionInitiated
public void sessionInitiated(ServerStatus initStatus, ServerState rsState, ServerState newestChangelogStateOfReplica) Set the initial status of the domain and perform necessary initializations. This method will be called by the Broker each time the ReplicationBroker establish a new session to a Replication Server. Implementations may override this method when they need to perform additional computing after session establishment. The default implementation should be sufficient for ReplicationDomains that don't need to perform additional computing.- Parameters:
initStatus
- The status to enter the state machine with.rsState
- The ServerState of the ReplicationServer with which the session was established.newestChangelogStateOfReplica
- The newest known ServerState of the change-log for this replica across the contacted ReplicationServers
-
getStatus
Gets the status for this domain.- Returns:
- The status for this domain.
-
getBaseDN
Returns the base DN of this ReplicationDomain. All Replication Domain using this baseDN will be connected through the Replication Service.- Returns:
- The base DN of this ReplicationDomain
-
getReplicaId
Get the replica ID which identifies this Replication Domain inside the Replication Service.- Returns:
- The replica ID.
-
recreateRemoteReplicasFromState
protected void recreateRemoteReplicasFromState()Rebuilds Remote DS lists after a reset of the domain state, either at startup or domain re-initialization. -
updateState
protected void updateState()Update the server state if necessary. -
decodeTarget
Verifies that the given string represents a valid source from which this server can be initialized.- Parameters:
targetString
- The string representing the source- Returns:
- The source as a integer value
- Throws:
LdapException
- if the string is not valid
-
initializeRemote
Initializes a remote server from this server.The
exportBackend(OutputStream)
will therefore be called on this server, and theimportBackend(InputStream)
will be called on the remote server.The InputStream and OutputStream given as a parameter to those methods will be connected through the replication protocol.
- Parameters:
target
- The server-id of the server that should be initialized. The target can be discovered using thegetReplicaInfos()
method.initTask
- The task that triggers this initialization and that should be updated with its progress.- Throws:
LdapException
- If it was not possible to publish the Initialization message to the Topology.
-
initializeRemote
protected void initializeRemote(ReplicaId replicaToInitialize, ReplicaId replicaRunningTheTask, Task initTask, int initWindow) throws LdapException Process the initialization of some other server or servers in the topology specified by the target argument when this initialization specifying the server that requests the initialization.- Parameters:
replicaToInitialize
- The target replica that should be initialized.replicaRunningTheTask
- The replica that initiated the export. It can be the replica id of this server, or the replica id of a remote replica.initTask
- The task in this server that triggers this initialization and that should be updated with its progress. Null when the export is done following a request coming from a remote server (task is remote).initWindow
- The value of the initialization window for flow control between the importer and the exporter.- Throws:
LdapException
- When an error occurs. No exception raised means success.
-
getServerState
Get the ServerState maintained by the Concrete class.- Returns:
- the ServerState maintained by the Concrete class.
-
receiveEntryBytes
protected byte[] receiveEntryBytes()Receives bytes related to an entry in the context of an import to initialize the domain (called by ReplLDIFInputStream).- Returns:
- The bytes. Null when the Done or Err message has been received
-
initializeFromRemote
Initializes asynchronously this domain from a remote source server. Before returning from this call, for the provided task :- the progressing counters are updated during the initialization using setTotal() and setLeft().
- the end of the initialization using updateTaskCompletionState().
When this method is called, a request for initialization is sent to the remote source server requesting initialization.
- Parameters:
source
- The server-id of the source from which to initialize. The source can be discovered using thegetReplicaInfos()
method.initTask
- The task that launched the initialization and should be updated of its progress.- Throws:
LdapException
- If it was not possible to publish the Initialization message to the Topology. The task state is updated.
-
toBadDataStatus
protected void toBadDataStatus()Sets the status to BAD_DATA status. -
ieRunning
public boolean ieRunning()Returns a boolean indicating if an import or export is currently processed.- Returns:
- The status
-
resetGenerationId
Reset the generationId of this domain in the whole topology. A message is sent to the Replication Servers for them to reset their change dbs.- Parameters:
generationIdNewValue
- The new value of the generation Id.- Throws:
LdapException
- When an error occurs
-
isConnected
public boolean isConnected()Check if the domain is connected to a ReplicationServer.- Returns:
- true if the server is connected, false if not.
-
getReplicationServerPort
public int getReplicationServerPort()Get the port of the replicationServer to which this domain is currently connected.- Returns:
- the replication server port to which this domain is currently connected.
-
getReplicationServerId
Get the replication server ID to which this domain is currently connected.- Returns:
- the replication server ID to which this domain is currently connected.
-
startPublishService
protected void startPublishService()Start the publish mechanism of the Replication Service. After this method has been called, the publish service can be used by calling thepublish(ReplicationMsg)
method. -
startListenService
protected void startListenService()Starts the receiver side of the Replication Service.After this method has been called, the Replication Service will start calling the
dispatchUpdateForReplay(UpdateMsg)
.This method must be called once and must be called after the
startPublishService()
. -
suspendService
public void suspendService()Temporarily suspend the Replication Service. The Replication Service can be resumed again usingresumeService()
.It can be useful to suspend the Replication Service when the repository where the replicated information is stored becomes temporarily unavailable and replicated updates can therefore not be replayed during a while. This method is not MT safe.
-
isListenerShuttingDown
protected final boolean isListenerShuttingDown()Returnstrue
if the listener thread is shutting down or has shutdown.- Returns:
true
if the listener thread is shutting down or has shutdown.
-
isSuspending
protected boolean isSuspending()Returntrue
if the domain is being suspended.- Returns:
true
if the domain is being suspended
-
resumeService
protected void resumeService()Restart the Replication service after asuspendService()
.The Replication Service will restart from the point indicated by the
ServerState
that was given as a parameter to thestartPublishService()
at startup time.If some data have changed in the repository during the period of time when the Replication Service was suspended, this
ServerState
should therefore be updated by the Replication Domain subclass before calling this method. This method is not MT safe. -
changeConfig
Change some ReplicationDomain parameters.- Parameters:
config
- The new configuration that this domain should now use.- Returns:
- whether a restart of the service is needed
-
exportBackend
This method should trigger an export of the replicated data. to the provided outputStream. When finished the outputStream should be flushed and closed.- Parameters:
output
- The OutputStream where the export should be produced.- Throws:
LdapException
- When needed.
-
importBackend
This method should trigger an import of the replicated data.- Parameters:
input
- The InputStream from which the import should be reading entries.- Throws:
LdapException
- When needed.
-
countEntries
This method should return the total number of objects in the replicated domain. This count will be used for reporting.- Returns:
- The number of objects in the replication domain.
- Throws:
LdapException
- when needed.
-
dispatchUpdateForReplay
This method ensures thisUpdateMsg
received from remote replication entities will be replayed, by dispatching it to the replay threads.- Parameters:
updateMsg
- TheUpdateMsg
to replay.
-
processUpdateAfterReplay
This method must be called after each call todispatchUpdateForReplay(UpdateMsg)
when the processing of the update is completed.It is useful for implementation needing to process the update in an asynchronous way or using several threads, but must be called even by implementation doing it in a synchronous, single-threaded way.
- Parameters:
msg
- The UpdateMsg whose processing was completed.etimeNanos
- The time it took to replay the update, in nanoseconds
-
publish
Publish anUpdateMsg
to the Replication Service.The Replication Service will handle the delivery of this
UpdateMsg
to all the participants of this Replication Domain. These members will be receive thisUpdateMsg
through a call of thedispatchUpdateForReplay(UpdateMsg)
message.- Parameters:
msg
- The UpdateMsg that should be published.
-
publishHeartbeatMsg
public void publishHeartbeatMsg()Publishes a heartbeat message if all pending changes for current replica have been sent out. -
publishReplicaOfflineMsg
public void publishReplicaOfflineMsg()Publishes a replica offline message if all pending changes for current replica have been sent out. -
getGenerationID
This method should return the generationID to use for this ReplicationDomain. This method can be called at any time after the ReplicationDomain has been started.- Returns:
- The GenerationID.
-
addMetricsForLocalReplica
Subclasses should use this method to add additional monitoring information in the ReplicationDomain.- Parameters:
registry
- where to additional monitoring attributes
-
addMetricsForRemoteReplica
Subclasses should use this method to add additional monitoring information in the Remote Replica Monitor.- Parameters:
registry
- where to additional monitoring attributesremoteReplicaId
- the remote replica for which monitoring data is requested
-
getImportExportContext
Returns the Import/Export context associated to this ReplicationDomain.- Returns:
- the Import/Export context associated to this ReplicationDomain
-
setEclIncludes
public boolean setEclIncludes(ReplicaId replicaId, Set<String> includeAttributes, Set<String> includeAttributesForDeletes) Set the attributes configured on a server to be included in the ECL.- Parameters:
replicaId
- Server where these attributes are configured.includeAttributes
- Attributes to be included with all change records, may include wild-cards.includeAttributesForDeletes
- Additional attributes to be included with delete change records, may include wild-cards.- Returns:
true
if the set of attributes was modified.
-
getEclIncludes
Get the attributes to include in each change for the ECL.- Returns:
- The attributes to include in each change for the ECL.
-
getEclIncludesForDeletes
Get the attributes to include in each delete change for the ECL.- Returns:
- The attributes to include in each delete change for the ECL.
-
getLastLocalChange
Returns the CSN of the last Change that was fully processed by this ReplicationDomain.- Returns:
- The CSN of the last Change that was fully processed by this ReplicationDomain.
-
toString
-