---
title: Part A - Install and configure Kafka and Zookeeper
description: PingIntelligence uses Kafka and Zookeeper for processing event streaming.
component: pingintelligence
version: 5.1
page_id: pingintelligence:pingintelligence_production_deployment:pingintelligence_install_configure_kafka_zookeeper
canonical_url: https://docs.pingidentity.com/pingintelligence/5.1/pingintelligence_production_deployment/pingintelligence_install_configure_kafka_zookeeper.html
revdate: April 1, 2024
section_ids:
  about-this-task: About this task
  steps: Steps
---

# Part A - Install and configure Kafka and Zookeeper

PingIntelligence uses Kafka and Zookeeper for processing event streaming.

## About this task

|   |                                                                                                                                                                                                                                                                |
| - | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|   | From PingIntelligence 5.1, you can configure Kafka in SSL mode only.For further information on Kafka, refer to the Kafka documentation:- <https://kafka.apache.org/documentation/#introduction>

- <https://kafka.apache.org/documentation/#security_overview> |

## Steps

1. Create a truststore and keystore:

   1. Create `.crt` and `.key` files:

      ```
      #openssl req -new -x509 -keyout pi4api-kafka-key.key -out pi4api-kafka-crt.crt -days 730
      ```

   2. Create a `.p12` file:

      ```
      #openssl pkcs12 -export -in pi4api-kafka-crt.crt -inkey pi4api-kafka-key.key -name pingidentity -out kafka.p12 -password pass:changeme
      ```

   3. Create a truststore:

      ```
      #keytool -keystore kafka_truststore.jks -alias pingidentity -import -file pi4api-kafka-crt.crt -storepass changeme -noprompt
      ```

   4. Create a keystore:

      ```
      #keytool -importkeystore -deststorepass changeme -deststoretype JKS -destkeystore kafka_keystore.jks -srckeystore kafka.p12 -srcstoretype PKCS12 -srcstorepass changeme -noprompt
      ```

2. Configure and start the Zookeeper service:

   1. Customize the `zookeeper.properties` file for your installation.

      For example:

      ```
      dataDir=/home/pi-user/pingidentity/kafka/data/zookeeper
      dataLogDir=/home/pi-user/pingidentity/kafka/datalog
      tickTime=2000
      initLimit=5
      syncLimit=2
      autopurge.snapRetainCount=3
      autopurge.purgeInterval=0
      maxClientCnxns=60
      standaloneEnabled=true
      admin.enableServer=true
      admin.serverPort=9090
      server.1=172.16.40.244:2888:3888
      # the port at which the clients will connect
      secureClientPort=2182

      authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
      serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
      ssl.trustStore.location=/home/pi-user/pingidentity/kafka/kafka_truststore.jks
      ssl.trustStore.password=changeme
      ssl.keyStore.location=/home/pi-user/pingidentity/kafka/kafka_keystore.jks
      ssl.keyStore.password=changeme
      ssl.clientAuth=need
      ssl.hostnameVerification=false
      sslQuorum=true
      ssl.quorum.keyStore.location=/home/pi-user/pingidentity/kafka/kafka_keystore.jks
      ssl.quorum.keyStore.password=changeme
      ssl.quorum.trustStore.location=/home/pi-user/pingidentity/kafka/kafka_truststore.jks
      ssl.quorum.trustStore.password=changeme
      ssl.quorum.hostnameVerification=false
      portUnification=false
      ```

   2. Start the Zookeeper service:

      ```
      #./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
      ```

   3. Check the Zookeeper logfile:

      ```
      #tail -f logs/zookeeper.out
      ```

3. Configure and start the Kafka server:

   1. Configure the SASL SCRAM server authentication file:

      ```
      vim /home/pi-user/pingidentity/kafka/config/sasl_server.conf

      KafkaServer {
              org.apache.kafka.common.security.scram.ScramLoginModule required;
      };
      ```

   2. Export the server authentication filepath as the environment variable `KAFKA_OPTS` in the Kafka server startup script `kafka-server-start.sh`.

      For example:

      ```
      #vim /bin/kafka-server-start.sh

      export KAFKA_OPTS="-Djava.security.auth.login.config=/home/pi-user/pingidentity/kafka/config/sasl_server.conf"
      ```

   3. Customize the `kafka/config/server.properties` file for your installation.

      For example:

      ```
      broker.id=0
      listeners=SSL://172.16.40.244:9091,SCRAM_SASL_SSL://172.16.40.244:9093
      advertised.listeners=SSL://172.16.40.244:9091,SCRAM_SASL_SSL://172.16.40.244:9093
      num.network.threads=3
      num.io.threads=8
      socket.send.buffer.bytes=102400
      socket.receive.buffer.bytes=102400
      socket.request.max.bytes=104857600

      log.dirs=/home/pi-user/pingidentity/kafka/data/kafka/

      num.partitions=1

      num.recovery.threads.per.data.dir=1
      offsets.topic.replication.factor=1
      transaction.state.log.replication.factor=1
      transaction.state.log.min.isr=1
      log.retention.hours=168
      log.segment.bytes=1073741824
      log.retention.check.interval.ms=300000
      zookeeper.connect=172.16.40.244:2182 (Important to change the SSL port)
      zookeeper.connection.timeout.ms=18000
      group.initial.rebalance.delay.ms=0

      Appending the following

      ssl.keystore.location=/home/pi-user/pingidentity/kafka/kafka_keystore.jks
      ssl.keystore.password=changeme
      ssl.key.password=changeme
      ssl.truststore.location=/home/pi-user/pingidentity/kafka/kafka_truststore.jks
      ssl.truststore.password=changeme
      ssl.client.auth=required
      sasl.enabled.mechanisms=SCRAM-SHA-512
      ssl.enabled.protocols=TLSv1.2
      listener.security.protocol.map= SSL:SSL,SCRAM_SASL_SSL:SASL_SSL
      delete.topic.enable=False
      authorizer.class.name=kafka.security.authorizer.AclAuthorizer
      allow.everyone.if.no.acl.found=true
      ssl.endpoint.identification.algorithm=
      security.inter.broker.protocol=SSL
      zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
      zookeeper.ssl.client.enable=true
      zookeeper.ssl.protocol=TLSv1.2
      zookeeper.ssl.truststore.location=/home/pi-user/pingidentity/kafka/kafka_truststore.jks
      zookeeper.ssl.truststore.password=changeme
      zookeeper.ssl.keystore.location=/home/pi-user/pingidentity/kafka/kafka_keystore.jks
      zookeeper.ssl.keystore.password=changeme
      zookeeper.ssl.quorum.hostnameVerification=false
      zookeeper.ssl.hostnameVerification=false
      zookeeper.ssl.endpoint.identification.algorithm=
      ```

   4. Start the Kafka server:

      ```
      #./bin/kafka-server-start.sh -daemon config/server.properties
      ```

   5. Check the Kafka server logfile and server status:

      ```
      # tail -f logs/kafkaServer.out
      #netstat -tupln | grep -E 9093
      ```

4. Configure topics and ACLs in Kafka's `config/client.properties` file.

   For example:

   ```
   # vim config/client.properties

   security.protocol=SSL
   ssl.truststore.location=/home/pi-user/pingidentity/kafka/kafka_truststore.jks
   ssl.truststore.password=changeme
   ssl.keystore.location=/home/pi-user/pingidentity/kafka/kafka_keystore.jks
   ssl.keystore.password=changeme
   ssl.key.password=changeme
   ssl.enabled.protocols=TLSv1.2
   ssl.truststore.type=JKS
   ssl.keystore.type=JKS
   enable.ssl.certificate.verification=false
   ssl.endpoint.identification.algorithm=
   ```

5. Configure producer and consumer users in Zookeeper's `config/zookeeper_client.properties` file.

   For example:

   ```
   # vim config/zookeeper_client.properties

   zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
   zookeeper.ssl.client.enable=true
   zookeeper.ssl.protocol=TLSv1.2

   #zookeeper.ssl.quorum.hostnameVerification=false
   #zookeeper.ssl.hostnameVerification=false
   zookeeper.ssl.truststore.location=/home/pi-user/pingidentity/kafka/kafka_truststore.jks
   zookeeper.ssl.truststore.password=changeme
   zookeeper.ssl.keystore.location=/home/pi-user/pingidentity/kafka/kafka_keystore.jks
   zookeeper.ssl.keystore.password=changeme
   zookeeper.ssl.endpoint.identification.algorithm=
   zookeeper.ssl.hostnameVerification=false
   ```

6. Create topics:

   Command line and parameters:

   ```
   <installation path>/pingidentity/kafka/bin/kafka-topics.sh
   --bootstrap-server <Kafka master IP>:<Kafka SSL port>
   --create
     --topic <ABS transactions topic>
     --partitions <ABS topic partitions>
     --replication-factor <ABS replication factor>
     --command-config <installation path>/pingidentity/kafka/config/client.properties
   ```

   1. Create the transactions topic for events related to all API traffic.

      For example:

      ```
      /home/pi-user/pingidentity/kafka/bin/kafka-topics.sh --bootstrap-server 172.16.40.244:9091 --create --topic pi4api.queuing.transactions --partitions 1 --replication-factor 1 --command-config /home/pi-user/pingidentity/kafka/config/client.properties
      ```

   2. Create the indicators of attack (IoA) topic for IoA-related events.

      For example:

      ```
      /home/pi-user/pingidentity/kafka/bin/kafka-topics.sh --bootstrap-server 172.16.40.244:9091 --create --topic pi4api.queuing.ioas --partitions 1 --replication-factor 1 --command-config /home/pi-user/pingidentity/kafka/config/client.properties
      ```

   3. Create the anomalies topic for anomaly-related events.

      For example:

      ```
      /home/pi-user/pingidentity/kafka/bin/kafka-topics.sh --bootstrap-server 172.16.40.244:9091 --create --topic epi4api.queuing.anomalies --partitions 1 --replication-factor 1 --command-config /home/pi-user/pingidentity/kafka/config/client.properties
      ```

7. Create users:

   Command line and parameters:

   ```
   <installation path>/pingidentity/kafka/bin/kafka-configs.sh
   --zookeeper <Kafka master IP>:<Zookeeper.ssl_port>
   --alter
     --add-config SCRAM-SHA-512=[iterations=8192,password=<user authentication password>
     --entity-type users
     --entity-name <username> -zk-tls-config-file <installation path>/pingidentity/kafka/config/zookeeper_client.properties
   ```

   1. Create the ABS producer user for sending machine learning data.

      For example:

      ```
      /home/pi-user/pingidentity/kafka/bin/kafka-configs.sh --zookeeper 10.96.6.126:2182 --alter --add-config SCRAM-SHA-512=[iterations=8192,password=changeme]] --entity-type users --entity-name abs_producer -zk-tls-config-file /home/pi-user/pingidentity/kafka/config/zookeeper_client.properties
      ```

   2. Create the ABS consumer user for consuming machine language data for job processing.

      For example:

      ```
      /home/pi-user/pingidentity/kafka/bin/kafka-configs.sh --zookeeper 10.96.6.126:2182 --alter --add-config SCRAM-SHA-512=[iterations=8192,password=changeme]] --entity-type users --entity-name abs_consumer -zk-tls-config-file /home/pi-user/pingidentity/kafka/config/zookeeper_client.properties
      ```

   3. Create the data engine consumer for pulling transactions, anomalies and indicators of compromise (IOCs).

      For example:

      ```
      /home/pi-user/pingidentity/kafka/bin/kafka-configs.sh --zookeeper 10.96.6.126:2182 --alter --add-config SCRAM-SHA-512=[iterations=8192,password=changeme]] --entity-type users --entity-name pi4api_de_user -zk-tls-config-file /home/pi-user/pingidentity/kafka/config/zookeeper_client.properties
      ```

8. Configure ACLs for users.

   The following table lists the topics and operations permitted on them, per user.

   | User                 | Allowed operations        | Topics                              |
   | -------------------- | ------------------------- | ----------------------------------- |
   | ABS producer         | * Create

   * Write

   * Read | - Transactions

   - IoAs

   - Anomalies |
   | ABS consumer         | Read                      | * Transactions

   * IoAs

   * Anomalies |
   |                      | Describe                  | Transactions                        |
   | Data engine consumer | Read                      | - Transactions

   - IoAs

   - Anomalies |

   Command line and parameters:

   ```
   <installation path>/pingidentity/kafka/bin/kafka-acls.sh
   --bootstrap-server<Kafka master IP>:<Kafka SSL port>
   --add
   --allow-principal User:<username>
   --operation <operation> [--operation <operation 2>] [--operation <operation n>]
   --topic <topic name>
   --command-config <installation path>/pingidentity/kafka/config/client.properties
   ```

   1. Create the ACLs for the ABS producer user.

      For example:

      1. Transactions topic:

         ```
         /home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_producer --operation Create --operation Read --operation Write --topic pi4api.queuing.transactions --command-config /home/pi-user/pingidentity/kafka/config/client.properties
         ```

      2. IoAs topic:

         ```
         /home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_producer --operation Create --operation Read --operation Write --topic pi4api.queuing.ioas --command-config /home/pi-user/pingidentity/kafka/config/client.properties
         ```

      3. Anomalies topic:

         ```
         /home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_producer --operation Create --operation Read --operation Write --topic epi4api.queuing.anomalies --command-config /home/pi-user/pingidentity/kafka/config/client.properties
         ```

   2. Create the ACLs for the ABS consumer user.

      For example:

      1. Transactions topic:

         ```
         /home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_consumer --operation Read --operation Describe --topic pi4api.queuing.transactions --command-config /home/pi-user/pingidentity/kafka/config/client.properties
         ```

      2. IoAs topic:

         ```
         /home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_consumer --operation Read --topic pi4api.queuing.ioas --command-config /home/pi-user/pingidentity/kafka/config/client.properties
         ```

      3. Anomalies topic:

         ```
         /home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_consumer --operation Read --topic epi4api.queuing.anomalies --command-config /home/pi-user/pingidentity/kafka/config/client.properties
         ```

   3. Create the ACLs for the data engine consumer user.

   For example:

   1. Transactions topic:

      ```
      /home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:pi4api_de_user --operation Read --topic pi4api.queuing.transactions --command-config /home/pi-user/pingidentity/kafka/config/client.properties
      ```

   2. IoAs topic:

      ```
      /home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:pi4api_de_user --operation Read --topic pi4api.queuing.ioas --command-config /home/pi-user/pingidentity/kafka/config/client.properties
      ```

   3. Anomalies topic:

      ```
      /home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:pi4api_de_user --operation Create --operation Read --operation Write --topic epi4api.queuing.anomalies --command-config /home/pi-user/pingidentity/kafka/config/client.properties
      ```

9. Configure ACLs for groups.

   Command line and parameters:

   ```
   <installation path>/pingidentity/kafka/bin/kafka-acls.sh
   --bootstrap-server <Kafka master IP>:<Kafka SSL port>
   --add --allow-principal User:<username>
   --operation <operation>
   --group <group ID>
   --command-config <installation path>/pingidentity/kafka/config/client.properties
   ```

   1. Configure permissions for the ABS consumer user belonging to the ABS consumer group to perform read operations.

      For example:

      ```
      /home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_consumer --operation Read --group pi4api.abs --command-config /home/pi-user/pingidentity/kafka/config/client.properties
      ```

   2. Configure permissions for the data engine consumer user belonging to the data engine consumer group to perform read operations.

      For example:

      ```
      /home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:pi4api_de_user --operation Read --group pi4api.data-engine --command-config /home/pi-user/pingidentity/kafka/config/client.properties
      ```
