Kafka Connect provides a simple and flexible way to move data between Kafka and other systems. One powerful feature in Kafka Connect is the ability to apply transformations to the data as it flows through the connectors.
This guide will explore how transformations work in Kafka Connect and provide examples of common transformations that can be applied to source and sink connectors.
Kafka Connect transformations allow you to modify records before they are sent from a source system to Kafka, or from Kafka to a sink system. They act as a lightweight stream processing layer within the connector pipeline.
Transformations can be used for a variety of purposes, such as:
Transformations are defined in the connector configuration using the transforms
property. You can chain multiple transformations together to create a pipeline of data manipulation steps.
Each transformation requires a name and a class that implements the transformation logic. Some transformations also have additional configuration options.
The InsertField
transformation allows you to add a static field to every record. For example, you can add a field that indicates the source of the data.
{
"name": "jdbc-source-connector",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "dbuser",
"connection.password": "dbpassword",
"table.whitelist": "my_table",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "jdbc-",
"poll.interval.ms": "10000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "InsertSource",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "source",
"transforms.InsertSource.static.value": "mysql"
}
In this example, every record that flows through the connector will have a new field called source
with the value mysql
.
The MaskField
transformation allows you to mask sensitive information in records, such as credit card numbers or passwords.
{
"name": "jdbc-source-connector",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "dbuser",
"connection.password": "dbpassword",
"table.whitelist": "my_table",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "jdbc-",
"poll.interval.ms": "10000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "MaskSensitive",
"transforms.MaskSensitive.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskSensitive.fields": "credit_card_number"
}
This example masks the credit_card_number
field in all records, ensuring that sensitive data is not exposed when written to Kafka.
The Filter
transformation can be used to filter records based on certain conditions, ensuring that only specific records pass through the connector.
{
"name": "jdbc-source-connector",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "dbuser",
"connection.password": "dbpassword",
"table.whitelist": "my_table",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "jdbc-",
"poll.interval.ms": "10000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "FilterRecords",
"transforms.FilterRecords.type": "org.apache.kafka.connect.transforms.Filter$Value",
"transforms.FilterRecords.predicate": "isActive",
"transforms.FilterRecords.negate": "false"
}
Here, only records where the isActive
field is true will pass through the connector.
Multiple transformations can be applied to a single connector by chaining them together using the transforms
property. Each transformation is applied in the order it is defined.
{
"name": "jdbc-source-connector",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "dbuser",
"connection.password": "dbpassword",
"table.whitelist": "my_table",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "jdbc-",
"poll.interval.ms": "10000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "InsertSource, MaskSensitive",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "source",
"transforms.InsertSource.static.value": "mysql",
"transforms.MaskSensitive.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskSensitive.fields": "credit_card_number"
}
In this example, the InsertSource
and MaskSensitive
transformations are applied in sequence. First, the source field is added, and then the credit card number is masked.
Kafka Connect transformations provide a powerful way to manipulate data as it flows between Kafka and external systems. Whether you need to filter records, mask sensitive data, or reformat fields, transformations can help you customize the data pipeline to fit your needs.