Class ReplicationDomain
- java.lang.Object
-
- org.opends.server.replication.service.ReplicationDomain
-
- Direct Known Subclasses:
LDAPReplicationDomain
public abstract class ReplicationDomain extends Object
This class should be used as a base for Replication implementations.It is intended that developer in need of a replication mechanism subclass this class with their own implementation.
The startup phase of the ReplicationDomain subclass, should read the list of replication servers from the configuration, instantiate a
ServerState
then start the publish service by callingstartPublishService()
. At this point it can start calling thepublish(ReplicationMsg)
method if needed.When the startup phase reach the point when the subclass is ready to handle updates the Replication Domain implementation should call the
startListenService()
method. At this point a Listener thread is created on the Replication Service and which can start receiving updates.When updates are received the Replication Service calls the
dispatchUpdateForReplay(UpdateMsg)
method. ReplicationDomain implementation should implement the appropriate code for replaying the update on the local repository. When fully done the subclass must call theprocessUpdateAfterReplay(UpdateMsg)
method. This allows to process the update asynchronously if necessary.To propagate changes to other replica, a ReplicationDomain implementation must use the
publish(ReplicationMsg)
method.If the Full Initialization process is needed then implementation for
importBackend(InputStream)
andexportBackend(OutputStream)
must be provided.Full Initialization of a replica can be triggered by LDAP clients by creating InitializeTasks or InitializeTargetTask. Full initialization can also be triggered from the ReplicationDomain implementation using methods
initializeRemote(ReplicaId, Task)
,initializeRemote(ReplicaId, ReplicaId, Task, int)
orinitializeFromRemote(ReplicaId, Task)
.At shutdown time, the
suspendService()
method should be called to cleanly stop the replication service.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected static class
ReplicationDomain.ImportExportContext
This class contains the context related to an import or export launched on the domain.
-
Field Summary
Fields Modifier and Type Field Description protected ReplicationBroker
broker
The ReplicationBroker that is used by this ReplicationDomain to connect to the ReplicationService.protected boolean
canPersistROM
Decides whether to include theReplicaOfflineMsg
(ROM) sent by the local replica in the persisted state.protected ReplicationDomainCfg
config
The configuration of the replication domain.protected long
generationId
The generationId for this replication domain.protected long
replicationPurgeDelayInMs
The replication purge delay is available asLong
from theconfig
object.protected ServerContext
serverContext
The directory server context.
-
Constructor Summary
Constructors Constructor Description ReplicationDomain(ServerContext serverContext, ReplicationDomainCfg config, long generationId)
Creates a ReplicationDomain with the provided parameters.
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description void
addAdditionalMonitoring(MeterRegistryHolder registry)
Subclasses should use this method to add additional monitoring information in the ReplicationDomain.protected boolean
changeConfig(ReplicationDomainCfg config)
Change some ReplicationDomain parameters.abstract long
countEntries()
This method should return the total number of objects in the replicated domain.ReplicaId
decodeTarget(String targetString)
Verifies that the given string represents a valid source from which this server can be initialized.abstract void
dispatchUpdateForReplay(UpdateMsg updateMsg)
This method ensures thisUpdateMsg
received from remote replication entities will be replayed, by dispatching it to the replay threads.protected abstract void
exportBackend(OutputStream output)
This method should trigger an export of the replicated data.Dn
getBaseDN()
Returns the base DN of this ReplicationDomain.protected CSNGenerator
getCsnGenerator()
Returns theCSNGenerator
that will be used to generateCSN
for this domain.Set<String>
getEclIncludes()
Get the attributes to include in each change for the ECL.protected Set<String>
getEclIncludesForDeletes()
Get the attributes to include in each delete change for the ECL.long
getGenerationID()
This method should return the generationID to use for this ReplicationDomain.HealthStatus
getHealthStatus(long delayThresholdMs)
Returns the health status based on the current replication delay.protected ReplicationDomain.ImportExportContext
getImportExportContext()
Returns the Import/Export context associated to this ReplicationDomain.CSN
getLastLocalChange()
Returns the CSN of the last Change that was fully processed by this ReplicationDomain.ReplicaId
getReplicaId()
Get the replica ID which identifies this Replication Domain inside the Replication Service.int
getReplicationServerPort()
Get the port of the replicationServer to which this domain is currently connected.ServerState
getServerState()
Get the ServerState maintained by the Concrete class.ServerStatus
getStatus()
Gets the status for this domain.protected boolean
hasConnectionError()
Check if the domain has a connection error.boolean
ieRunning()
Returns a boolean indicating if an import or export is currently processed.protected abstract void
importBackend(InputStream input)
This method should trigger an import of the replicated data.void
initializeFromRemote(ReplicaId source, Task initTask)
Initializes asynchronously this domain from a remote source server.void
initializeRemote(ReplicaId target, Task initTask)
Initializes a remote server from this server.protected void
initializeRemote(ReplicaId replicaToInitialize, ReplicaId replicaRunningTheTask, Task initTask, int initWindow)
Process the initialization of some other server or servers in the topology specified by the target argument when this initialization specifying the server that requests the initialization.boolean
isConnected()
Check if the domain is connected to a ReplicationServer.protected boolean
isListenerShuttingDown()
Returnstrue
if the listener thread is shutting down or has shutdown.protected boolean
isSuspending()
Returntrue
if the domain is being suspended.protected void
processUpdateAfterReplay(UpdateMsg msg)
This method must be called after each call todispatchUpdateForReplay(UpdateMsg)
when the processing of the update is completed.void
publish(ReplicationMsg msg)
Publish anUpdateMsg
to the Replication Service.void
publishHeartbeatMsg()
Publishes a heartbeat message if all pending changes for current replica have been sent out.void
publishReplicaOfflineMsg()
Publishes a replica offline message if all pending changes for current replica have been sent out.protected byte[]
receiveEntryBytes()
Receives bytes related to an entry in the context of an import to initialize the domain (called by ReplLDIFInputStream).protected void
recreateRemoteReplicasFromState()
Rebuilds Remote DS lists after a reset of the domain state, either at startup or domain re-initialization.protected boolean
replicaIsLate()
Returns whether the connected RS has notified this replica to be late with respect to changelog contents.void
resetGenerationId(long generationIdNewValue)
Reset the generationId of this domain in the whole topology.protected void
resumeService()
Restart the Replication service after asuspendService()
.void
sessionInitiated(ServerStatus initStatus, ServerState rsState, ServerState newestChangelogStateOfReplica)
Set the initial status of the domain and perform necessary initializations.boolean
setEclIncludes(ReplicaId replicaId, Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
Set the attributes configured on a server to be included in the ECL.protected void
startListenService()
Starts the receiver side of the Replication Service.protected void
startPublishService()
Start the publish mechanism of the Replication Service.void
suspendService()
Temporarily suspend the Replication Service.protected void
toBadGenerationIdStatus()
Sets the status to bad generation ID.String
toString()
protected void
trace(String msg, Object... args)
Logs at trace level a message for this replication domain.protected void
trace(LocalizableMessageDescriptor.Arg0 msg)
Logs at trace level a message for this replication domain.protected void
updateState()
Update the server state if necessary.
-
-
-
Field Detail
-
config
protected volatile ReplicationDomainCfg config
The configuration of the replication domain.
-
replicationPurgeDelayInMs
protected volatile long replicationPurgeDelayInMs
The replication purge delay is available asLong
from theconfig
object. Since the value is retrieved for every update, avoid unboxing performance issues by using a dedicated long field.
-
broker
protected ReplicationBroker broker
The ReplicationBroker that is used by this ReplicationDomain to connect to the ReplicationService.
-
generationId
protected volatile long generationId
The generationId for this replication domain. It is made of a hash of the 1000 first entries for this domain.
-
serverContext
protected final ServerContext serverContext
The directory server context.
-
canPersistROM
protected volatile boolean canPersistROM
Decides whether to include theReplicaOfflineMsg
(ROM) sent by the local replica in the persisted state.The ROM should not be part of the persisted state while the replica is recovering updates from the connected RS. Since the RS state is sent as part of the response to the start message, we can detect the case and only persist the ROM when the replica has finished receiving its own updates or there are no updates to receive. See OPENDJ-8062 and OPENDJ-8992 for details.
-
-
Constructor Detail
-
ReplicationDomain
public ReplicationDomain(ServerContext serverContext, ReplicationDomainCfg config, long generationId)
Creates a ReplicationDomain with the provided parameters.- Parameters:
serverContext
- The directory server contextconfig
- The configuration object for this ReplicationDomaingenerationId
- the generation of this ReplicationDomain
-
-
Method Detail
-
getHealthStatus
public HealthStatus getHealthStatus(long delayThresholdMs)
Returns the health status based on the current replication delay. If the delay is above the provided threshold, the replication domain is considered not healthy.- Parameters:
delayThresholdMs
- The maximum replication delay in milliseconds for considering this replication domain healthy.- Returns:
- This replication domain health status.
-
getCsnGenerator
protected CSNGenerator getCsnGenerator()
Returns theCSNGenerator
that will be used to generateCSN
for this domain.- Returns:
- The
CSNGenerator
that will be used to generateCSN
for this domain.
-
sessionInitiated
public void sessionInitiated(ServerStatus initStatus, ServerState rsState, ServerState newestChangelogStateOfReplica)
Set the initial status of the domain and perform necessary initializations. This method will be called by the Broker each time the ReplicationBroker establish a new session to a Replication Server. Implementations may override this method when they need to perform additional computing after session establishment. The default implementation should be sufficient for ReplicationDomains that don't need to perform additional computing.- Parameters:
initStatus
- The status to enter the state machine with.rsState
- The ServerState of the ReplicationServer with which the session was established.newestChangelogStateOfReplica
- The newest known ServerState of the change-log for this replica across the contacted ReplicationServers
-
getStatus
public ServerStatus getStatus()
Gets the status for this domain.- Returns:
- The status for this domain.
-
getBaseDN
public Dn getBaseDN()
Returns the base DN of this ReplicationDomain. All Replication Domain using this baseDN will be connected through the Replication Service.- Returns:
- The base DN of this ReplicationDomain
-
getReplicaId
public ReplicaId getReplicaId()
Get the replica ID which identifies this Replication Domain inside the Replication Service.- Returns:
- The replica ID.
-
recreateRemoteReplicasFromState
protected void recreateRemoteReplicasFromState()
Rebuilds Remote DS lists after a reset of the domain state, either at startup or domain re-initialization.
-
updateState
protected void updateState()
Update the server state if necessary.
-
decodeTarget
public ReplicaId decodeTarget(String targetString) throws LdapException
Verifies that the given string represents a valid source from which this server can be initialized.- Parameters:
targetString
- The string representing the source- Returns:
- The source as a integer value
- Throws:
LdapException
- if the string is not valid
-
initializeRemote
public void initializeRemote(ReplicaId target, Task initTask) throws LdapException
Initializes a remote server from this server.The
exportBackend(OutputStream)
will therefore be called on this server, and theimportBackend(InputStream)
will be called on the remote server.The InputStream and OutputStream given as a parameter to those methods will be connected through the replication protocol.
- Parameters:
target
- The server-id of the server that should be initialized. The target can be discovered using thegetReplicaInfos()
method.initTask
- The task that triggers this initialization and that should be updated with its progress.- Throws:
LdapException
- If it was not possible to publish the Initialization message to the Topology.
-
initializeRemote
protected void initializeRemote(ReplicaId replicaToInitialize, ReplicaId replicaRunningTheTask, Task initTask, int initWindow) throws LdapException
Process the initialization of some other server or servers in the topology specified by the target argument when this initialization specifying the server that requests the initialization.- Parameters:
replicaToInitialize
- The target replica that should be initialized.replicaRunningTheTask
- The replica that initiated the export. It can be the replica id of this server, or the replica id of a remote replica.initTask
- The task in this server that triggers this initialization and that should be updated with its progress. Null when the export is done following a request coming from a remote server (task is remote).initWindow
- The value of the initialization window for flow control between the importer and the exporter.- Throws:
LdapException
- When an error occurs. No exception raised means success.
-
getServerState
public ServerState getServerState()
Get the ServerState maintained by the Concrete class.- Returns:
- the ServerState maintained by the Concrete class.
-
receiveEntryBytes
protected byte[] receiveEntryBytes()
Receives bytes related to an entry in the context of an import to initialize the domain (called by ReplLDIFInputStream).- Returns:
- The bytes. Null when the Done or Err message has been received
-
initializeFromRemote
public void initializeFromRemote(ReplicaId source, Task initTask) throws LdapException
Initializes asynchronously this domain from a remote source server. Before returning from this call, for the provided task :- the progressing counters are updated during the initialization using setTotal() and setLeft().
- the end of the initialization using updateTaskCompletionState().
When this method is called, a request for initialization is sent to the remote source server requesting initialization.
- Parameters:
source
- The server-id of the source from which to initialize. The source can be discovered using thegetReplicaInfos()
method.initTask
- The task that launched the initialization and should be updated of its progress.- Throws:
LdapException
- If it was not possible to publish the Initialization message to the Topology. The task state is updated.
-
toBadGenerationIdStatus
protected void toBadGenerationIdStatus()
Sets the status to bad generation ID.
-
ieRunning
public boolean ieRunning()
Returns a boolean indicating if an import or export is currently processed.- Returns:
- The status
-
resetGenerationId
public void resetGenerationId(long generationIdNewValue) throws LdapException
Reset the generationId of this domain in the whole topology. A message is sent to the Replication Servers for them to reset their change dbs.- Parameters:
generationIdNewValue
- The new value of the generation Id.- Throws:
LdapException
- When an error occurs
-
isConnected
public boolean isConnected()
Check if the domain is connected to a ReplicationServer.- Returns:
- true if the server is connected, false if not.
-
hasConnectionError
protected boolean hasConnectionError()
Check if the domain has a connection error. A Connection error happens when the broker could not be created or when the broker could not find any ReplicationServer to connect to.- Returns:
- true if the domain has a connection error.
-
replicaIsLate
protected boolean replicaIsLate()
Returns whether the connected RS has notified this replica to be late with respect to changelog contents.- Returns:
- whether the connected RS has notified this replica to be late
-
getReplicationServerPort
public int getReplicationServerPort()
Get the port of the replicationServer to which this domain is currently connected.- Returns:
- the replication server port to which this domain is currently connected.
-
startPublishService
protected void startPublishService()
Start the publish mechanism of the Replication Service. After this method has been called, the publish service can be used by calling thepublish(ReplicationMsg)
method.
-
startListenService
protected void startListenService()
Starts the receiver side of the Replication Service.After this method has been called, the Replication Service will start calling the
dispatchUpdateForReplay(UpdateMsg)
.This method must be called once and must be called after the
startPublishService()
.
-
suspendService
public void suspendService()
Temporarily suspend the Replication Service. The Replication Service can be resumed again usingresumeService()
.It can be useful to suspend the Replication Service when the repository where the replicated information is stored becomes temporarily unavailable and replicated updates can therefore not be replayed during a while. This method is not MT safe.
-
isListenerShuttingDown
protected final boolean isListenerShuttingDown()
Returnstrue
if the listener thread is shutting down or has shutdown.- Returns:
true
if the listener thread is shutting down or has shutdown.
-
isSuspending
protected boolean isSuspending()
Returntrue
if the domain is being suspended.- Returns:
true
if the domain is being suspended
-
resumeService
protected void resumeService()
Restart the Replication service after asuspendService()
.The Replication Service will restart from the point indicated by the
ServerState
that was given as a parameter to thestartPublishService()
at startup time.If some data have changed in the repository during the period of time when the Replication Service was suspended, this
ServerState
should therefore be updated by the Replication Domain subclass before calling this method. This method is not MT safe.
-
changeConfig
protected boolean changeConfig(ReplicationDomainCfg config)
Change some ReplicationDomain parameters.- Parameters:
config
- The new configuration that this domain should now use.- Returns:
- whether a restart of the service is needed
-
exportBackend
protected abstract void exportBackend(OutputStream output) throws LdapException
This method should trigger an export of the replicated data. to the provided outputStream. When finished the outputStream should be flushed and closed.- Parameters:
output
- The OutputStream where the export should be produced.- Throws:
LdapException
- When needed.
-
importBackend
protected abstract void importBackend(InputStream input) throws LdapException
This method should trigger an import of the replicated data.- Parameters:
input
- The InputStream from which the import should be reading entries.- Throws:
LdapException
- When needed.
-
countEntries
public abstract long countEntries() throws LdapException
This method should return the total number of objects in the replicated domain. This count will be used for reporting.- Returns:
- The number of objects in the replication domain.
- Throws:
LdapException
- when needed.
-
dispatchUpdateForReplay
public abstract void dispatchUpdateForReplay(UpdateMsg updateMsg)
This method ensures thisUpdateMsg
received from remote replication entities will be replayed, by dispatching it to the replay threads.- Parameters:
updateMsg
- TheUpdateMsg
to replay.
-
processUpdateAfterReplay
protected final void processUpdateAfterReplay(UpdateMsg msg)
This method must be called after each call todispatchUpdateForReplay(UpdateMsg)
when the processing of the update is completed.It is useful for implementation needing to process the update in an asynchronous way or using several threads, but must be called even by implementation doing it in a synchronous, single-threaded way.
- Parameters:
msg
- The UpdateMsg whose processing was completed.
-
publish
public void publish(ReplicationMsg msg)
Publish anUpdateMsg
to the Replication Service.The Replication Service will handle the delivery of this
UpdateMsg
to all the participants of this Replication Domain. These members will be receive thisUpdateMsg
through a call of thedispatchUpdateForReplay(UpdateMsg)
message.- Parameters:
msg
- The UpdateMsg that should be published.
-
publishHeartbeatMsg
public void publishHeartbeatMsg()
Publishes a heartbeat message if all pending changes for current replica have been sent out.
-
publishReplicaOfflineMsg
public void publishReplicaOfflineMsg()
Publishes a replica offline message if all pending changes for current replica have been sent out.
-
getGenerationID
public long getGenerationID()
This method should return the generationID to use for this ReplicationDomain. This method can be called at any time after the ReplicationDomain has been started.- Returns:
- The GenerationID.
-
addAdditionalMonitoring
public void addAdditionalMonitoring(MeterRegistryHolder registry)
Subclasses should use this method to add additional monitoring information in the ReplicationDomain.- Parameters:
registry
- where to additional monitoring attributes
-
getImportExportContext
protected ReplicationDomain.ImportExportContext getImportExportContext()
Returns the Import/Export context associated to this ReplicationDomain.- Returns:
- the Import/Export context associated to this ReplicationDomain
-
setEclIncludes
public boolean setEclIncludes(ReplicaId replicaId, Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
Set the attributes configured on a server to be included in the ECL.- Parameters:
replicaId
- Server where these attributes are configured.includeAttributes
- Attributes to be included with all change records, may include wild-cards.includeAttributesForDeletes
- Additional attributes to be included with delete change records, may include wild-cards.- Returns:
true
if the set of attributes was modified.
-
getEclIncludes
public Set<String> getEclIncludes()
Get the attributes to include in each change for the ECL.- Returns:
- The attributes to include in each change for the ECL.
-
getEclIncludesForDeletes
protected Set<String> getEclIncludesForDeletes()
Get the attributes to include in each delete change for the ECL.- Returns:
- The attributes to include in each delete change for the ECL.
-
getLastLocalChange
public CSN getLastLocalChange()
Returns the CSN of the last Change that was fully processed by this ReplicationDomain.- Returns:
- The CSN of the last Change that was fully processed by this ReplicationDomain.
-
trace
protected void trace(String msg, Object... args)
Logs at trace level a message for this replication domain.- Parameters:
msg
- the messageargs
- the arguments of the message
-
trace
protected void trace(LocalizableMessageDescriptor.Arg0 msg)
Logs at trace level a message for this replication domain.- Parameters:
msg
- a localizable message
-
-