Managing Apache Kafka Topics
Instaclustr has several different tools that you can use to manage Topics in Apache Kafka. These include a CLI tool, the Instaclustr API, and the Instaclustr Terraform Provider.
Table of Contents
Kafka Topic Management via CLI
Kafka comes with its own topic management tools that can be downloaded from here. For our offering of Kafka 2.6.1 and newer, you can use Kafka-topics.sh tool that they have provided. However, for our older offerings, you will need to use ic-Kafka-topics.
In the following instructions, we will assume the use of ic-Kafka-topics.
Introduction to Kafka Topic Management
Ic-Kafka-topics is a tool developed by Instaclustr that can be used to manage Kafka topics using a connection to a Kafka broker. Ic-Kafka-topics is based on the standard Kafka-topics tool, but unlike Kafka-topics, it does not require a zookeeper connection to work.
The core “actions” supported by ic-Kafka-topics include:
- list – list the topics available on the cluster
- create – create a topic
- describe – provide details of one or more topics
- alter – change the properties of one or more topics
- delete – delete one or more topics
Installing ic-kafka-topics
Ic-Kafka-topics is available as a tarfile, downloadable from the Instaclustr Console cluster connection page. To install:
- Download the tarfile from the Instaclustr console
- Unpack the tarfile:
tar -xf kafka_2.11-2.1.2.tgz - Go to the bin directory of the Kafka installation
> cd kafka_2.11-2.1.2/bin - Run ic-kafka-topics.sh
> ./ic-kafka-topics.sh
This should display help text for ic-Kafka-topic.sh:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
ic-kafka-topics: Create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions, replica assignment, and/or configuration for the topic. --bootstrap-server <String: host> REQUIRED: The connection string for the Kafka connection in the form host:port. --config <String: name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: cleanup.policy .... .... |
Using ic-kafka-topics
Once you have downloaded and installed ic-Kafka-topics, you can use it to manage topics with your cluster.
To enable ic-Kafka-topics to connect to your cluster you will always need to provide two options to the tool:
Option | Description |
‑‑bootstrap-server | provides the details of one of the nodes (brokers) in your Kafka cluster in the form host:port. These details are available on the Instaclustr console in the cluster connections page. |
‑‑properties-file | a path to a file which contains details of your cluster username and password. These details are also available from the customer support site. |
A minimal properties file requires your userid and password in the following format:
1 2 3 4 5 |
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ickafka" password="123abc123abc123abc123abc123abc123abc123abc123abc123abc123abc"; |
(carefully note the placement of the backslashes and semicolon at the end of the lines)
If your Kafka cluster has client ⇆ broker encryption enabled your properties file should look like this, ensuring the password and truststore location are correct:
1 2 3 4 5 6 7 8 9 |
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.truststore.location=truststore.jks ssl.truststore.password=instaclustr ssl.protocol=TLS security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ickafka" password="361d4871ff1a5ef58deaf3b887b489029faee9690e62c549078a1f51f18f755"; |
You can find out more about using certificates with Kafka clients including where to find them here.
All usage of the ic-Kafka-topics command will specify these details. For example, to list the topics on your cluster, run:
1 |
> ./ic-kafka-topics.sh --bootstrap-server 192.168.1.1:9092 --properties-file mycluster.properties --list |
The following sections describe each of the actions supported by the tool. You can get help at any time at the command line by running ic-Kafka-topics.sh with no arguments.
Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
List
The List action provides a listing of the topics that are available on the cluster. To use the List action, include the ‑‑list option
Option | Description |
‑‑list | the list action |
Example:
1 2 3 4 5 |
> ./ic-kafka-topics.sh --bootstrap-server 192.168.1.1:9092 --properties-file mycluster.properties --list device-metrics-topic device-log-topic device-activation-topic customer-change-topic |
Create
The “Create” action is used to create a new topic.
For simple topic creation
For simple topic creation: use the Create action. You must provide the following details:
Option | Description |
‑‑create | the create action |
‑‑topic <topic-name> | the name of the topic your wish to create |
‑‑partitions <num-of-partitions> | the number of partitions you want the topic to have |
‑‑replication-factor <replication-factor> | the replication factor for the topic. may not be greater than the number of brokers |
‑‑config <key>=<value> (Optional) | a custom config value to set for this cluster. This can be omitted to use default configuration settings. You can specify more than one config to be changed using multiple ‑‑config options |
Example:
The following example will create a topic called “mytopic” with 9 partitions and a replication factor of 3.
1 |
./ic-kafka-topics.sh --bootstrap-server 192.168.1.1:9092 --properties-file mycluster.properties --create --topic mytopic --partitions 9 --replication-factor 3 |
Kafka will automatically assign partitions to available brokers.
Creating topics with a replica-assignment
If you want more control about the assignment of the new topic’s partitions to existing brokers, you may provide a manual partition to broker assignment using the ‑‑replica-assignment option.
Describe
The “describe” action is used to provide a detailed description of a topic. You need to provide the following details
Option | Description |
‑‑describe | the creation action |
‑‑topic <TopicName> | the name of the topic to be altered. If this is not specified, all topics will be described. A (java-style) regular expression can also be specified to select multiple topics. Regular expressions must be enclosed in double quotations. |
Only configurations which are not set to defaults will be displayed. This includes values inherited from broker default configs or values which have been set for the topic.
For more details see the description of the output format for the standard Kafka-topics tool here:
Example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
./ic-kafka-topics.sh --bootstrap-server 192.168.1.45:9092 --properties-file mycluster.properties --describe --topic customer Topic:customer PartitionCount:12 ReplicationFactor:3 Configs:min.insync.replicas=2 Topic: customer Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: customer Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: customer Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: customer Partition: 3 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: customer Partition: 4 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: customer Partition: 5 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: customer Partition: 6 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: customer Partition: 7 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: customer Partition: 8 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: customer Partition: 9 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: customer Partition: 10 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: customer Partition: 11 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 |
Alter
The “alter” action is used to set custom configurations on the topic. You can configure additional custom configurations on a property using the ‑‑alter action.
To set a configuration on a topic you must specify the following options:
option | description |
‑‑alter | the alter action |
‑‑topic <topic-name> | the name of the topic to be altered. If this is not specified, all topics will be modified. A (java-style) regular expression can also be specified to select multiple topics. Regular expressions must be enclosed in double quotations. |
‑‑config <key>=<value> | a config value to change. You can specify more than one config to be changed using multiple ‑‑config options |
‑‑delete-config <key> | a config value to remove. You may specifiy more than one config to be removed using multiple ‑‑delete-config options |
Examples
The following command will alter the retention.ms configuration on the topic called “mytopic“.
1 |
./ic-kafka-topics.sh --bootstrap-server 192.168.1.1:9092 --properties-file mycluster.properties --alter --topic mytopic --config retention.ms=50000000 |
The following command will alter two values (retention.ms and compression.type) on the topic called “mytopic”
1 |
./ic-kafka-topics.sh --bootstrap-server 192.168.1.1:9092 --properties-file mycluster.properties --alter --topic mytopic --config retention.ms=50000000 --config compression.type=gzip |
The following command will set retention.ms to 50000000 on all topics starting with the string “test”
1 |
./ic-kafka-topics.sh --bootstrap-server 192.168.1.1:9092 --properties-file mycluster.properties --alter --topic "test.*" --config retention.ms=50000000 |
The following command will change the min.insync.replicas property value for this topic and override the default min.insync.replicas value of the broker.
1 |
./ic-kafka-topics.sh --bootstrap-server 192.168.1.1:9092 --properties-file mycluster.properties --alter --topic mytopic --config min.insync.replicas=2 |
Deleting a configuration
To remove a custom configuration from a topic, use ‑‑alter combined with the ‑‑delete-config option. The ‑‑delete-config option should specify the name of the configuration to remove. Removing a custom configuration will cause the value to revert to the default value.
Example:
The following command will remove the retention.ms configuration, reverting it to the default value.
1 |
./ic-kafka-topics.sh --bootstrap-server 192.168.1.1:9092 --properties-file mycluster.properties --alter --topic mytopic --delete-config retention.ms |
Delete (Topic Deletion)
The Delete action is used to a delete a topic. To delete a topic you must specify the following options:
option | description |
‑‑delete | the delete action |
‑‑topic <topic-name> | the name of the topic to be altered.
A (java-style) regular expression can also be specified to select multiple topics. Regular expressions must be enclosed in double quotations. |
Examples
The following command will delete the topic called “mytopic“.
1 2 3 |
./ic-kafka-topics.sh --bootstrap-server 192.168.1.1:9092 --properties-file mycluster.properties --delete --topic mytopic Topic mytopic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. |
The following command will delete the all topics starting with “dev”.
1 2 3 4 5 |
./ic-kafka-topics.sh --bootstrap-server 192.168.1.1:9092 --properties-file mycluster.properties --delete --topic "dev.*" Topic dev-customer is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. Topic dev-device is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. |
Kafka Topic Management via Instaclustr API
Instaclustr provides Topic Management API for Kafka clusters to help you with managing topics. The Topic Management API supports the following functions:
- List Topics
- Create a Topic
- Delete a Topic
- Get Topic information (number of partitions and replication factor)
- Get Topic configuration
- Modify Topic configuration
These examples show how to use the features of the Instaclustr Topic Management API.
For each endpoint listed below, all requests must include basic authentication details:
1 2 |
Username: <your Instaclustr account username> Password: <your Instaclustr provisioning API key> |
List Topics
- To retrieve a list of topics currently available in the Kafka cluster, make a GET request to:
1https://api.instaclustr.com/provisioning/v1/<cluster_id>/kafka/topics - On a successful request, the API should respond with a 200 status code and a JSON containing the list of topics. E.g.:
12345{"topics": ["test-topic"]}
Create a Topic
- To create a new topic to the Kafka cluster make a POST request to:
1https://api.instaclustr.com/provisioning/v1/<cluster_id>/kafka/topicsA request body should be included, specifying the topic name, number of partitions, and replication factor. E.g.:
12345{"topic": "test-topic","partitions": 3,"replicationFactor": 3}Number of partitions and the replication factor should be non-zero integers. The replication factor is limited by the number of available brokers in the clusters. Be aware of creating a topic with too many partitions, as it can slow down the cluster.
- On successful topic creation, the API should respond with a 201 status code and a short message. E.g.
123{"message": "Kafka topic test-topic created."}Once this message is received, the topic should be immediately available in the Kafka cluster.
Delete a Topic
- To delete an existing topic from the Kafka cluster make a DELETE request to:
1https://api.instaclustr.com/provisioning/v1/<cluster_id>/kafka/topics/<topic-name>No request body is required. The topic to be deleted is specified in the URL.
- When the topic is successfully deleted, the API should respond with a 200 status code and a short message. E.g.
123{"message": "Kafka topic test-topic has been deleted."}
Get Topic Information
- To retrieve the topic information of a particular topic available in the Kafka cluster, make a GET request to:
1https://api.instaclustr.com/provisioning/v1/<cluster_id>/kafka/topics/<topic-name> - The API should respond with a 200 status code and a JSON containing the information of the requested topic. E.g.:
12345{"topic": "test-topic","partitions": 3,"replicationFactor": 3}
Get Topic Configuration
- To retrieve the topic configuration of a particular topic available in the Kafka cluster, make a GET request to:
1https://api.instaclustr.com/provisioning/v1/<cluster_id>/kafka/topics/<topic-name>/config - On a successful request, the API should respond with a 200 status code and a JSON containing the configuration of the requested topic. E.g.:
12345678910111213141516171819202122232425262728293031323334{"topic": "test-topic","config": {"compression.type": "producer","leader.replication.throttled.replicas": "","remote.storage.enable": "false","message.downconversion.enable": "true","min.insync.replicas": "2","segment.jitter.ms": "0","local.retention.ms": "-2","cleanup.policy": "delete","flush.ms": "9223372036854775807","follower.replication.throttled.replicas": "*","segment.bytes": "1073741824","retention.ms": "604800000","flush.messages": "9223372036854775807","message.format.version": "3.0-IV1","max.compaction.lag.ms": "9223372036854775807","file.delete.delay.ms": "60000","max.message.bytes": "1048588","min.compaction.lag.ms": "0","message.timestamp.type": "CreateTime","local.retention.bytes": "-2","preallocate": "false","min.cleanable.dirty.ratio": "0.5","index.interval.bytes": "4096","unclean.leader.election.enable": "false","retention.bytes": "-1","delete.retention.ms": "86400000","segment.ms": "604800000","message.timestamp.difference.max.ms": "9223372036854775807","segment.index.bytes": "10485760"}}
Modify a Topic Configuration
- To modify the configuration of a topic in the Kafka cluster, make a PUT request to:
1https://api.instaclustr.com/provisioning/v1/<cluster_id>/kafka/topics/<topic-name>/configA request body should be included, specifying the configuration that you want to change. E.g.:
12345{"config": {"min.insync.replicas": 2,}}We do not need to provide all of the available keys in the request body, only the keys of interest are required in the body.
For available configuration keys and values, consult https://kafka.apache.org/23/documentation.html#topicconfigs . Alternatively, available configuration keys can also be inferred from the output of the get topic configuration endpoint. Note if you are using Kafka version that is newer than 2.3: Currently, we can’t change the message.format.version to a version that is newer than 2.3. Please use the Kafka CLI tools (kafka-configs.sh) to modify this configuration.
- The API should respond with a 200 status code and a short message. E.g.
123{"message": "Kafka topic test-topic has been modified."}
Kafka Topic Management via Instaclustr Terraform Provider
Instaclustr also provides a way to manage Kafka ACL using Terraform. For more information and examples, have a look at
1 |
https://github.com/instaclustr/terraform-provider-instaclustr |
The documentation for the ACL resource is located in docs/resources/kafka_topic.md and the ACL data source is located in docs/data-sources/kafka_topic.md .