Kafka Connect with JDBC (Java Database Connectivity) allows you to integrate Kafka with relational databases by reading from and writing to databases using JDBC drivers. This is particularly useful for loading data from databases into Kafka topics or persisting Kafka messages to relational databases.
The Kafka Connect JDBC connector provides a way to connect Kafka with databases. It supports:
The connector supports various databases, such as MySQL, PostgreSQL, Oracle, and SQL Server, through JDBC drivers.
To use Kafka Connect with JDBC, follow these steps:
Download the Kafka Connect JDBC connector from the Confluent Hub or the Apache Kafka website. Place the JAR files in the plugins
directory of your Kafka Connect installation.
Create a JSON configuration file for the JDBC source connector. Here’s an example configuration to read data from a MySQL database:
{
"name": "jdbc-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/mydatabase",
"connection.user": "myuser",
"connection.password": "mypassword",
"table.whitelist": "mytable",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "jdbc-",
"poll.interval.ms": "10000"
}
}
This configuration connects to a MySQL database, reads data from the mytable
table, and writes it to a Kafka topic prefixed with jdbc-
.
Create a JSON configuration file for the JDBC sink connector. Here’s an example configuration to write data to a PostgreSQL database:
{
"name": "jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydatabase",
"connection.user": "myuser",
"connection.password": "mypassword",
"topics": "jdbc-mytable",
"auto.create": "true",
"pk.mode": "record_value",
"pk.fields": "id",
"insert.mode": "upsert"
}
}
This configuration reads from the Kafka topic jdbc-mytable
and writes the data to a PostgreSQL table. It will create the table if it doesn’t exist and perform upserts based on the id
field.
Start Kafka Connect with the JDBC connectors by using the following command in distributed mode:
bin/connect-distributed.sh config/connect-distributed.properties
Or in standalone mode:
bin/connect-standalone.sh config/connect-standalone.properties config/jdbc-source-connector.properties
bin/connect-standalone.sh config/connect-standalone.properties config/jdbc-sink-connector.properties
Kafka Connect provides REST APIs to monitor and manage JDBC connectors. Use the following commands:
List all connectors:
curl -X GET http://localhost:8083/connectors
Check the status of a specific connector:
curl -X GET http://localhost:8083/connectors/jdbc-source-connector/status
Pause a connector:
curl -X PUT http://localhost:8083/connectors/jdbc-source-connector/pause
Resume a connector:
curl -X PUT http://localhost:8083/connectors/jdbc-source-connector/resume
Restart a connector:
curl -X POST http://localhost:8083/connectors/jdbc-source-connector/restart
Kafka Connect with JDBC provides a powerful way to integrate Kafka with relational databases. By configuring source and sink connectors, you can efficiently move data between Kafka topics and your database, facilitating real-time data processing and analysis.