Kafka: Using Kafka Connect with JDBC

Kafka Connect with JDBC (Java Database Connectivity) allows you to integrate Kafka with relational databases by reading from and writing to databases using JDBC drivers. This is particularly useful for loading data from databases into Kafka topics or persisting Kafka messages to relational databases.

1. Overview of Kafka Connect JDBC

The Kafka Connect JDBC connector provides a way to connect Kafka with databases. It supports:

The connector supports various databases, such as MySQL, PostgreSQL, Oracle, and SQL Server, through JDBC drivers.

2. Setting Up Kafka Connect JDBC

To use Kafka Connect with JDBC, follow these steps:

2.1. Download and Install the JDBC Connector

Download the Kafka Connect JDBC connector from the Confluent Hub or the Apache Kafka website. Place the JAR files in the plugins directory of your Kafka Connect installation.

2.2. Configure the JDBC Source Connector

Create a JSON configuration file for the JDBC source connector. Here’s an example configuration to read data from a MySQL database:


{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://localhost:3306/mydatabase",
    "connection.user": "myuser",
    "connection.password": "mypassword",
    "table.whitelist": "mytable",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "jdbc-",
    "poll.interval.ms": "10000"
  }
}
    

This configuration connects to a MySQL database, reads data from the mytable table, and writes it to a Kafka topic prefixed with jdbc-.

2.3. Configure the JDBC Sink Connector

Create a JSON configuration file for the JDBC sink connector. Here’s an example configuration to write data to a PostgreSQL database:


{
  "name": "jdbc-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://localhost:5432/mydatabase",
    "connection.user": "myuser",
    "connection.password": "mypassword",
    "topics": "jdbc-mytable",
    "auto.create": "true",
    "pk.mode": "record_value",
    "pk.fields": "id",
    "insert.mode": "upsert"
  }
}
    

This configuration reads from the Kafka topic jdbc-mytable and writes the data to a PostgreSQL table. It will create the table if it doesn’t exist and perform upserts based on the id field.

2.4. Start Kafka Connect with JDBC Connectors

Start Kafka Connect with the JDBC connectors by using the following command in distributed mode:


bin/connect-distributed.sh config/connect-distributed.properties
    

Or in standalone mode:


bin/connect-standalone.sh config/connect-standalone.properties config/jdbc-source-connector.properties
bin/connect-standalone.sh config/connect-standalone.properties config/jdbc-sink-connector.properties
    

3. Monitoring and Managing JDBC Connectors

Kafka Connect provides REST APIs to monitor and manage JDBC connectors. Use the following commands:

3.1. List Connectors

List all connectors:


curl -X GET http://localhost:8083/connectors
    

3.2. View Connector Status

Check the status of a specific connector:


curl -X GET http://localhost:8083/connectors/jdbc-source-connector/status
    

3.3. Pause and Resume Connectors

Pause a connector:


curl -X PUT http://localhost:8083/connectors/jdbc-source-connector/pause
    

Resume a connector:


curl -X PUT http://localhost:8083/connectors/jdbc-source-connector/resume
    

3.4. Restart Connectors

Restart a connector:


curl -X POST http://localhost:8083/connectors/jdbc-source-connector/restart
    

4. Conclusion

Kafka Connect with JDBC provides a powerful way to integrate Kafka with relational databases. By configuring source and sink connectors, you can efficiently move data between Kafka topics and your database, facilitating real-time data processing and analysis.