Kafka Connect is designed to enable scalable and reliable data transfer between Kafka and external systems. It comes with a wide variety of built-in connectors for popular data sources and sinks, but in some cases, you may need to create custom connectors to meet specific requirements.
Custom Kafka Connectors are used when no out-of-the-box connectors are available for a specific data source or sink. Kafka Connect allows developers to implement their own source or sink connectors using Java by extending the Kafka Connect framework.
A custom connector typically consists of three components:
A custom source connector reads data from an external system (e.g., a database, a REST API) and writes it into Kafka topics.
The SourceConnector
class is responsible for initializing the connector, managing configuration, and creating tasks.
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import java.util.List;
import java.util.Map;
public class CustomSourceConnector extends SourceConnector {
private Map config;
@Override
public String version() {
return "1.0";
}
@Override
public void start(Map config) {
this.config = config;
}
@Override
public Class<? extends Task> taskClass() {
return CustomSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return List.of(config);
}
@Override
public void stop() {
// Stop any resources when the connector is stopped
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("some.config", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Custom Config Parameter");
}
}
The SourceTask
is responsible for reading data from the source system and pushing it to Kafka. In this example, we simulate reading data from a custom data source.
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class CustomSourceTask extends SourceTask {
private String someConfig;
@Override
public String version() {
return "1.0";
}
@Override
public void start(Map<String, String> config) {
this.someConfig = config.get("some.config");
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
// Simulate reading data from a custom source
// Create SourceRecord and return as a list
SourceRecord record = new SourceRecord(
null, null,
"custom-topic", null,
null, "key",
null, "value"
);
return Collections.singletonList(record);
}
@Override
public void stop() {
// Clean up resources when stopping the task
}
}
A custom sink connector reads data from Kafka topics and writes it to an external system (e.g., a database, storage system).
The SinkConnector
class handles the configuration and task management of the sink connector.
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import java.util.List;
import java.util.Map;
public class CustomSinkConnector extends SinkConnector {
private Map<String, String> config;
@Override
public String version() {
return "1.0";
}
@Override
public void start(Map<String, String> config) {
this.config = config;
}
@Override
public Class<? extends Task> taskClass() {
return CustomSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return List.of(config);
}
@Override
public void stop() {
// Stop any resources when the connector is stopped
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("some.config", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Custom Config Parameter");
}
}
The SinkTask
reads data from Kafka and writes it to the external system.
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import java.util.Collection;
import java.util.Map;
public class CustomSinkTask extends SinkTask {
private String someConfig;
@Override
public String version() {
return "1.0";
}
@Override
public void start(Map<String, String> config) {
this.someConfig = config.get("some.config");
}
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
// Write data to the external system
System.out.println("Received record: " + record.value());
}
}
@Override
public void stop() {
// Clean up resources when stopping the task
}
}
Once you’ve implemented your custom connector, you need to package it as a JAR file and place it in the connectors
directory of your Kafka Connect installation. You can then configure and deploy it like any other connector.
It’s important to test your custom connector thoroughly to ensure it works as expected. You can use tools like curl
or the Kafka Connect REST API
to deploy and manage your custom connectors. Be sure to test both the source and sink connectors by feeding data through Kafka topics and verifying that the custom source or sink works correctly.
Creating custom Kafka Connectors enables you to extend Kafka’s integration capabilities to new systems. By implementing custom connectors, you can build tailored solutions for specific use cases that are not supported by the built-in connectors. Following this guide, you can build your own custom connectors for both source and sink systems.