PingDirectory

Configure a Kafka sync source

This feature is provided as a Preview, which means that it isn’t supported and should not be used in production environments. Learn more in Feature statuses.

A Kafka sync source reads change events from a Kafka topic and applies them to a sync destination. This lets you use Kafka as a change feed for propagating updates from an upstream system into PingDataSync. For example, an upstream application can publish change events to a Kafka topic, and PingDataSync can consume those events and apply the changes to a PingDirectory server or another sync destination.

The following objects are required to configure a Kafka sync source:

  • Kafka cluster external server: Defines the connection parameters for the Kafka cluster. This is the same configuration object used by the Kafka sync destination, and can be shared between both.

  • Kafka sync source: References the Kafka cluster external server and defines how PingDataSync consumes messages from the topic.

Kafka cluster external server

The only required property is bootstrap-server, which identifies one or more Kafka brokers in the cluster.

When use-ssl is set to true, you must also configure a trust-manager-provider and a key-manager-provider. Learn more in SSL configuration.

Steps

  • Create the Kafka cluster external server.

    Example:

    $ bin/dsconfig create-external-server \
      --server-name kafkaServerName \
      --type kafka-cluster \
      --set bootstrap-server:kafkaHost:9092 \
      --set use-ssl:true \
      --set trust-manager-provider:Kafka \
      --set key-manager-provider:Kafka \
      --applyChangeTo server-group

    If you already created a Kafka cluster external server for a Kafka sync destination, you can reference the same object from your Kafka sync source.

Passing consumer properties

Use the consumer-property configuration parameter to pass standard Kafka consumer configuration properties to PingDataSync. For sensitive values such as passwords or keys, use sensitive-consumer-property instead to prevent storing those values in plain text.

Create a sensitive-consumer-property using the following required arguments:

--property-name

Specifies the name of the sensitive Kafka consumer property.

--set sensitive-consumer-key:<key>

Specifies the name of the valid property key that contains a sensitive value.

--set sensitive-consumer-value:<value>

Specifies the sensitive value associated with the consumer key.

Steps

  • Create one or more sensitive Kafka consumer properties using dsconfig create-sensitive-kafka-consumer-property.

    Example:
    $ bin/dsconfig create-sensitive-kafka-consumer-property \
      --property-name saslConfig \
      --set "sensitive-consumer-key:sasl.jaas.config" \
      --set "sensitive-consumer-value:org.apache.kafka.common.security.scram.ScramLoginModule" \
        required username="username" password="password";
  • (Optional) Delete one or more sensitive Kafka consumer properties using dsconfig delete-sensitive-kafka-consumer-property.

    Example:
    $ bin/dsconfig delete-sensitive-kafka-consumer-property \
      --property-name saslConfig

Kafka sync source properties

Use the following properties to configure the Kafka sync source object:

topic

The Kafka topic to read messages from.

cluster

The Kafka cluster external server that defines the connection to the Kafka cluster.

consumer-group-id

Identifies the consumer group this sync source belongs to. Kafka delivers each message once per consumer group. Multiple instances of the same sync source should share a consumer group so that a new instance can take over where a failed instance left off.

max-poll-records

The maximum number of records to return in a single poll request. Larger values can improve throughput but increase memory usage.

Message format semantics for sync sources

The Kafka sync source uses the same JSON message format as the Kafka sync destination. Learn more in Message format.

The following additional semantics apply when reading messages as a sync source:

  • For modify operations, the current field can be a partial entry containing only the modified attributes. Attributes omitted from current are left unchanged on the destination entry.

  • To delete an attribute value in a modify operation, set the attribute to null (single-valued) or an empty array (multivalued) in the current field.

  • For add operations, the current field must be a complete entry.

JSON sync operation mapping

If your Kafka messages use a different format, you can configure a JSON sync operation mapping object to map your message fields to the fields that PingDataSync requires.

The JSON sync operation mapping uses the existing constructed value syntax to assign values from the incoming JSON message to each required sync operation field.

The following properties are available:

JSON sync operation mapping properties
dn-pattern

(Required) A constructed value pattern that resolves to a DN identifying the entry. If the source doesn’t use LDAP DNs, you can construct a synthetic DN such as uid=<userId>,ou=people,dc=example,dc=com and map it to a destination DN using the existing DN map configuration.

change-id-pattern

(Required) A constructed value pattern that resolves to a unique identifier for the change.

source-entry-pattern

(Required) A constructed value that resolves to a JSON object representing the entry. For modify operations, this can be a partial entry containing only the modified attributes.

op-type-pattern

(Required) A constructed value pattern that resolves to the operation type being performed.

modified-attributes-pattern

(Optional) A constructed value pattern that lists the attributes modified by the event. If unspecified, all attributes are assumed to be modified.

create-op-type-value

(Optional) The value in your message that represents a create operation. Defaults to create.

delete-op-type-value

(Optional) The value in your message that represents a delete operation. Defaults to delete.

modify-op-type-value

(Optional) The value in your message that represents a modify operation. Defaults to modify.

Dead letter queue

When PingDataSync can’t process a Kafka message (for example, due to a JSON parsing error or mapping failure) it logs an error and moves on to the next message. To preserve failed messages for later inspection or reprocessing, you can configure a dead letter queue (DLQ).

When the DLQ is enabled, failed messages are published to a separate Kafka topic. The original message data is preserved, and error metadata is added to the Kafka record headers.

The following properties configure the DLQ:

dlq-enabled

Disabled by default. Set to true to enable the DLQ. When the DLQ is enabled, you must also specify the dlq-topic.

dlq-topic

The name of the Kafka topic where failed messages are published. Kafka restricts topic names to alphanumeric characters, periods, underscores, and hyphens, with a maximum length of 249 characters.

Configuring a Kafka sync source

The following steps create a basic Kafka sync source and connect it to a sync destination through a sync pipe.

Steps

  1. Create the Kafka cluster external server.

  2. Create the Kafka sync source.

    Example:

    $ bin/dsconfig create-sync-source \
      --source-name kafkaSyncSource \
      --type kafka \
      --set topic:topicName \
      --set cluster:kafkaServerName \
      --set consumer-group-id:consumerGroupId \
      --applyChangeTo server-group

    The consumer-group-id should be consistent across all PingDataSync instances in a server group so that a standby instance resumes from the last committed offset if the primary fails.

  3. Create the sync destination.

    Example:

    $ bin/dsconfig create-sync-destination \
      --destination-name syncDestinationName \
      --type ping-directory \
      --set server:destinationServer \
      --applyChangeTo server-group

    The sync destination can be any supported type, including PingDirectory.

  4. Create the sync pipe.

    Example:

    $ bin/dsconfig create-sync-pipe \
      --pipe-name kafkaPipeName \
      --set num-worker-threads:10 \
      --set change-detection-polling-interval:"500 ms" \
      --set sync-source:kafkaSyncSource \
      --set sync-destination:syncDestinationName \
      --applyChangeTo server-group
  5. Create the sync class.

    Example:

    $ bin/dsconfig create-sync-class \
      --pipe-name kafkaPipeName \
      --class-name kafkaSyncClass \
      --set auto-mapped-source-attribute:all \
      --applyChangeTo server-group
  6. Start the sync pipe.

    Example:

    $ bin/dsconfig set-sync-pipe-prop \
      --pipe-name kafkaPipeName \
      --set started:true \
      --applyChangeTo server-group