Kafka Connect is a framework for integrating Kafka with other systems in a scalable and reliable way. It supports both source connectors (reading data from external systems into Kafka) and sink connectors (writing data from Kafka to external systems).
In this guide, we will discuss how to configure Kafka connectors, including key properties and example configurations for both source and sink connectors.
Each connector requires a set of common configuration properties:
Below is an example configuration for a Kafka source connector that reads data from a JDBC database:
{
"name": "jdbc-source-connector",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "dbuser",
"connection.password": "dbpassword",
"table.whitelist": "my_table",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "jdbc-",
"poll.interval.ms": "10000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
This configuration defines a source connector that reads data from a MySQL database table my_table
, using the id
column to track new records and write them to a Kafka topic prefixed with jdbc-
.
Below is an example configuration for a Kafka sink connector that writes data from a Kafka topic to an HDFS system:
{
"name": "hdfs-sink-connector",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "2",
"topics": "my_kafka_topic",
"hdfs.url": "hdfs://namenode:9000",
"flush.size": "1000",
"rotate.interval.ms": "600000",
"topics.dir": "/kafka/topics",
"logs.dir": "/kafka/logs",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
This configuration defines a sink connector that reads messages from the Kafka topic my_kafka_topic
and writes them to an HDFS directory. It flushes data to HDFS after every 1,000 messages or after a 10-minute interval.
Once the connector configuration is created, it can be submitted to the Kafka Connect cluster either through the REST API or using the Kafka Connect CLI tool:
# Using REST API
curl -X POST -H "Content-Type: application/json" --data @source-connector.json http://localhost:8083/connectors
# Using Kafka Connect CLI
bin/connect-standalone.sh config/connect-standalone.properties config/source-connector.properties
Kafka Connect simplifies integrating Kafka with various external systems by providing connectors for both source and sink operations. By properly configuring connectors, you can stream data efficiently between Kafka and other systems such as databases, file systems, or messaging services.