Installing and configuring Kafka and Zookeeper
PingIntelligence uses Kafka and Zookeeper for processing event streaming.
About this task
From PingIntelligence 5.2, you can configure Kafka in Secure Sockets Layer (SSL) mode only. For more information on Kafka, see the following documentation:
|
Steps
-
Create a truststore and keystore:
-
Create
.crt
and.key
files:#openssl req -new -x509 -keyout pi4api-kafka-key.key -out pi4api-kafka-crt.crt -days 730
-
Create a
.p12
file:#openssl pkcs12 -export -in pi4api-kafka-crt.crt -inkey pi4api-kafka-key.key -name pingidentity -out kafka.p12 -password pass:changeme
-
Create a truststore:
#keytool -keystore kafka_truststore.jks -alias pingidentity -import -file pi4api-kafka-crt.crt -storepass changeme -noprompt
-
Create a keystore:
#keytool -importkeystore -deststorepass changeme -deststoretype JKS -destkeystore kafka_keystore.jks -srckeystore kafka.p12 -srcstoretype PKCS12 -srcstorepass changeme -noprompt
-
-
Configure and start the Zookeeper service:
-
Customize the
zookeeper.properties
file for your installation.Example:
dataDir=/home/pi-user/pingidentity/kafka/data/zookeeper dataLogDir=/home/pi-user/pingidentity/kafka/datalog tickTime=2000 initLimit=5 syncLimit=2 autopurge.snapRetainCount=3 autopurge.purgeInterval=0 maxClientCnxns=60 standaloneEnabled=true admin.enableServer=true admin.serverPort=9090 server.1=172.16.40.244:2888:3888 # the port at which the clients will connect secureClientPort=2182 authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory ssl.trustStore.location=/home/pi-user/pingidentity/kafka/kafka_truststore.jks ssl.trustStore.password=changeme ssl.keyStore.location=/home/pi-user/pingidentity/kafka/kafka_keystore.jks ssl.keyStore.password=changeme ssl.clientAuth=need ssl.hostnameVerification=false sslQuorum=true ssl.quorum.keyStore.location=/home/pi-user/pingidentity/kafka/kafka_keystore.jks ssl.quorum.keyStore.password=changeme ssl.quorum.trustStore.location=/home/pi-user/pingidentity/kafka/kafka_truststore.jks ssl.quorum.trustStore.password=changeme ssl.quorum.hostnameVerification=false portUnification=false
-
Start the Zookeeper service:
#./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
-
Check the Zookeeper logfile:
#tail -f logs/zookeeper.out
-
-
Configure and start the Kafka server:
-
Configure the SASL SCRAM server authentication file:
vim /home/pi-user/pingidentity/kafka/config/sasl_server.conf KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required; };
-
Export the server authentication filepath as the environment variable
KAFKA_OPTS
in the Kafka server startup scriptkafka-server-start.sh
.Example:
#vim /bin/kafka-server-start.sh export KAFKA_OPTS="-Djava.security.auth.login.config=/home/pi-user/pingidentity/kafka/config/sasl_server.conf"
-
Customize the
kafka/config/server.properties
file for your installation.Example:
broker.id=0 listeners=SSL://172.16.40.244:9091,SCRAM_SASL_SSL://172.16.40.244:9093 advertised.listeners=SSL://172.16.40.244:9091,SCRAM_SASL_SSL://172.16.40.244:9093 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/home/pi-user/pingidentity/kafka/data/kafka/ num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=172.16.40.244:2182 (Important to change the SSL port) zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 Appending the following ssl.keystore.location=/home/pi-user/pingidentity/kafka/kafka_keystore.jks ssl.keystore.password=changeme ssl.key.password=changeme ssl.truststore.location=/home/pi-user/pingidentity/kafka/kafka_truststore.jks ssl.truststore.password=changeme ssl.client.auth=required sasl.enabled.mechanisms=SCRAM-SHA-512 ssl.enabled.protocols=TLSv1.2 listener.security.protocol.map= SSL:SSL,SCRAM_SASL_SSL:SASL_SSL delete.topic.enable=False authorizer.class.name=kafka.security.authorizer.AclAuthorizer allow.everyone.if.no.acl.found=true ssl.endpoint.identification.algorithm= security.inter.broker.protocol=SSL zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty zookeeper.ssl.client.enable=true zookeeper.ssl.protocol=TLSv1.2 zookeeper.ssl.truststore.location=/home/pi-user/pingidentity/kafka/kafka_truststore.jks zookeeper.ssl.truststore.password=changeme zookeeper.ssl.keystore.location=/home/pi-user/pingidentity/kafka/kafka_keystore.jks zookeeper.ssl.keystore.password=changeme zookeeper.ssl.quorum.hostnameVerification=false zookeeper.ssl.hostnameVerification=false zookeeper.ssl.endpoint.identification.algorithm=
-
Start the Kafka server:
#./bin/kafka-server-start.sh -daemon config/server.properties
-
Check the Kafka server logfile and server status:
# tail -f logs/kafkaServer.out #netstat -tupln | grep -E 9093
-
-
Configure topics and access control lists (ACL) in Kafka’s
config/client.properties
file.Example:
# vim config/client.properties security.protocol=SSL ssl.truststore.location=/home/pi-user/pingidentity/kafka/kafka_truststore.jks ssl.truststore.password=changeme ssl.keystore.location=/home/pi-user/pingidentity/kafka/kafka_keystore.jks ssl.keystore.password=changeme ssl.key.password=changeme ssl.enabled.protocols=TLSv1.2 ssl.truststore.type=JKS ssl.keystore.type=JKS enable.ssl.certificate.verification=false ssl.endpoint.identification.algorithm=
-
Configure producer and consumer users in Zookeeper’s
config/zookeeper_client.properties
file.Example:
# vim config/zookeeper_client.properties zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty zookeeper.ssl.client.enable=true zookeeper.ssl.protocol=TLSv1.2 #zookeeper.ssl.quorum.hostnameVerification=false #zookeeper.ssl.hostnameVerification=false zookeeper.ssl.truststore.location=/home/pi-user/pingidentity/kafka/kafka_truststore.jks zookeeper.ssl.truststore.password=changeme zookeeper.ssl.keystore.location=/home/pi-user/pingidentity/kafka/kafka_keystore.jks zookeeper.ssl.keystore.password=changeme zookeeper.ssl.endpoint.identification.algorithm= zookeeper.ssl.hostnameVerification=false
-
Create topics:
Command line and parameters:
<installation path>/pingidentity/kafka/bin/kafka-topics.sh --bootstrap-server <Kafka master IP>:<Kafka SSL port> --create --topic <ABS transactions topic> --partitions <ABS topic partitions> --replication-factor <ABS replication factor> --command-config <installation path>/pingidentity/kafka/config/client.properties
-
Create the transactions topic for events related to all API traffic.
Example:
/home/pi-user/pingidentity/kafka/bin/kafka-topics.sh --bootstrap-server 172.16.40.244:9091 --create --topic pi4api.queuing.transactions --partitions 1 --replication-factor 1 --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
Create the indicators of attack (IoA) topic for IoA-related events.
Example:
For example:
/home/pi-user/pingidentity/kafka/bin/kafka-topics.sh --bootstrap-server 172.16.40.244:9091 --create --topic pi4api.queuing.ioas --partitions 1 --replication-factor 1 --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
Create the anomalies topic for anomaly-related events.
Example:
/home/pi-user/pingidentity/kafka/bin/kafka-topics.sh --bootstrap-server 172.16.40.244:9091 --create --topic epi4api.queuing.anomalies --partitions 1 --replication-factor 1 --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
Create the discovery topic for discovery-related events.
Example:
/home/pi-user/pingidentity/kafka/bin/kafka-topics.sh --bootstrap-server 172.16.40.244:9091 --create --topic pi4api.queuing.apis --partitions 1 --replication-factor 1 --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
-
Create users:
Command line and parameters:
<installation path>/pingidentity/kafka/bin/kafka-configs.sh --zookeeper <Kafka master IP>:<Zookeeper.ssl_port> --alter --add-config SCRAM-SHA-512=<user authentication password> --entity-type users --entity-name <username> -zk-tls-config-file <installation path>/pingidentity/kafka/config/zookeeper_client.properties
-
Create the ABS producer user for sending machine-learning data.
Example:
/home/pi-user/pingidentity/kafka/bin/kafka-configs.sh --zookeeper 10.96.6.126:2182 --alter --add-config SCRAM-SHA-512=[iterations=8192,password=changeme]] --entity-type users --entity-name abs_producer -zk-tls-config-file /home/pi-user/pingidentity/kafka/config/zookeeper_client.properties
-
Create the ABS consumer user for consuming machine-language data for job processing.
Example:
/home/pi-user/pingidentity/kafka/bin/kafka-configs.sh --zookeeper 10.96.6.126:2182 --alter --add-config SCRAM-SHA-512=[iterations=8192,password=changeme]] --entity-type users --entity-name abs_consumer -zk-tls-config-file /home/pi-user/pingidentity/kafka/config/zookeeper_client.properties
-
Create the data engine consumer for pulling transactions, anomalies, and indicators of compromise (IOCs).
Example:
/home/pi-user/pingidentity/kafka/bin/kafka-configs.sh --zookeeper 10.96.6.126:2182 --alter --add-config SCRAM-SHA-512=[iterations=8192,password=changeme]] --entity-type users --entity-name pi4api_de_user -zk-tls-config-file /home/pi-user/pingidentity/kafka/config/zookeeper_client.properties
-
-
Configure ACLs for users.
The following table lists the topics and operations for each user type.
User Allowed operations Topics ABS producer
-
Create
-
Read
-
Transactions
-
IoAs
-
Anomalies
Describe
Discovery
Write
-
Transactions
-
IoAs
-
Anomalies
-
Discovery
ABS consumer
Read
-
Transactions
-
IoAs
-
Anomalies
-
Discovery
Describe
-
Transactions
-
Discovery
Data engine consumer
Read
-
Transactions
-
IoAs
-
Anomalies
-
Discovery
Describe
Discovery
Command line and parameters:
<installation path>/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server<Kafka master IP>:<Kafka SSL port> --add --allow-principal User:<username> --operation <operation> [--operation <operation 2>] [--operation <operation n>] --topic <topic name> --command-config <installation path>/pingidentity/kafka/config/client.properties
-
Create the ACLs for the ABS producer user.
Example:
-
Transactions topic:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_producer --operation Create --operation Read --operation Write --topic pi4api.queuing.transactions --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
IoAs topic:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_producer --operation Create --operation Read --operation Write --topic pi4api.queuing.ioas --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
Anomalies topic:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_producer --operation Create --operation Read --operation Write --topic epi4api.queuing.anomalies --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
Discovery topic:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_producer --operation Create --operation Read --operation Write --topic pi4api.queuing.apis --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
-
Create the ACLs for the ABS consumer user.
Example:
-
Transactions topic:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_consumer --operation Read --operation Describe --topic pi4api.queuing.transactions --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
IoAs topic:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_consumer --operation Read --topic pi4api.queuing.ioas --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
Anomalies topic:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_consumer --operation Read --topic epi4api.queuing.anomalies --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
Discovery topic:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_consumer --operation Read --topic pi4api.queuing.apis --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
-
Create the ACLs for the data engine consumer user.
Example:
-
Transactions topic:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:pi4api_de_user --operation Read --topic pi4api.queuing.transactions --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
IoAs topic:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:pi4api_de_user --operation Read --topic pi4api.queuing.ioas --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
Anomalies topic:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:pi4api_de_user --operation Create --operation Read --operation Write --topic epi4api.queuing.anomalies --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
Discovery topic:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:pi4api_de_user --operation Create --operation Read --operation Write --topic pi4api.queuing.apis --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
-
Add the ACLs below in Kafka if they have not already been added:
Current ACLs for resource
ResourcePattern(resourceType=TOPIC, name=pi4api.queuing.anomalies, patternType=LITERAL)
: (principal=Group:pi4api.abs, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=User:pi4api_de_user, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_consumer, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_consumer, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=Group:pi4api.abs, host=, operation=READ, permissionType=ALLOW) (principal=Group:pi4api.data-engine, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=User:pi4api_de_user, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=User:abs_producer, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=Group:pi4api.data-engine, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_producer, host=, operation=WRITE, permissionType=ALLOW) Current ACLs for resourceResourcePattern(resourceType=GROUP, name=pi4api.abs, patternType=LITERAL)
: (principal=User:abs_consumer, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_consumer, host=, operation=DESCRIBE, permissionType=ALLOW) Current ACLs for resourceResourcePattern(resourceType=TOPIC, name=pi4api.queuing.ioas, patternType=LITERAL)
: (principal=Group:pi4api.abs, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=User:pi4api_de_user, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_consumer, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_consumer, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=Group:pi4api.abs, host=, operation=READ, permissionType=ALLOW) (principal=Group:pi4api.data-engine, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=User:pi4api_de_user, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=User:abs_producer, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=Group:pi4api.data-engine, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_producer, host=, operation=WRITE, permissionType=ALLOW) Current ACLs for resourceResourcePattern(resourceType=TOPIC, name=pi4api.queuing.apis, patternType=LITERAL)
: (principal=User:abs_producer, host=, operation=READ, permissionType=ALLOW) (principal=Group:pi4api.abs, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=User:pi4api_de_user, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_consumer, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_consumer, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=Group:pi4api.abs, host=, operation=READ, permissionType=ALLOW) (principal=Group:pi4api.data-engine, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=User:pi4api_de_user, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=User:pi4api_de_user, host=, operation=WRITE, permissionType=ALLOW) (principal=User:abs_producer, host=, operation=CREATE, permissionType=ALLOW) (principal=User:abs_producer, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=Group:pi4api.data-engine, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_producer, host=, operation=WRITE, permissionType=ALLOW) (principal=User:pi4api_de_user, host=, operation=CREATE, permissionType=ALLOW) Current ACLs for resourceResourcePattern(resourceType=GROUP, name=pi4api.data-engine, patternType=LITERAL)
: (principal=User:pi4api_de_user, host=, operation=READ, permissionType=ALLOW) (principal=User:pi4api_de_user, host=, operation=DESCRIBE, permissionType=ALLOW) Current ACLs for resourceResourcePattern(resourceType=TOPIC, name=pi4api.queuing.transactions, patternType=LITERAL)
: (principal=Group:pi4api.abs, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=User:pi4api_de_user, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_consumer, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_consumer, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=Group:pi4api.abs, host=, operation=READ, permissionType=ALLOW) (principal=Group:pi4api.data-engine, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=User:pi4api_de_user, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=User:abs_producer, host=, operation=DESCRIBE, permissionType=ALLOW) (principal=Group:pi4api.data-engine, host=, operation=READ, permissionType=ALLOW) (principal=User:abs_producer, host=, operation=WRITE, permissionType=ALLOW)
-
-
Configure ACLs for groups.
Command line and parameters:
<installation path>/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server <Kafka master IP>:<Kafka SSL port> --add --allow-principal User:<username> --operation <operation> --group <group ID> --command-config <installation path>/pingidentity/kafka/config/client.properties
-
Configure permissions for the ABS consumer user belonging to the ABS consumer group to perform read operations.
Example:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:abs_consumer --operation Read --group pi4api.abs --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-
Configure permissions for the data engine consumer user belonging to the data engine consumer group to perform read operations.
Example:
/home/pi-user/pingidentity/kafka/bin/kafka-acls.sh --bootstrap-server 172.16.40.244:9091 --add --allow-principal User:pi4api_de_user --operation Read --group pi4api.data-engine --command-config /home/pi-user/pingidentity/kafka/config/client.properties
-