Kafka Intermediate: Kafka Connect Transformations

Kafka Connect provides a simple and flexible way to move data between Kafka and other systems. One powerful feature in Kafka Connect is the ability to apply transformations to the data as it flows through the connectors.

This guide will explore how transformations work in Kafka Connect and provide examples of common transformations that can be applied to source and sink connectors.

1. What Are Kafka Connect Transformations?

Kafka Connect transformations allow you to modify records before they are sent from a source system to Kafka, or from Kafka to a sink system. They act as a lightweight stream processing layer within the connector pipeline.

Transformations can be used for a variety of purposes, such as:

2. How Kafka Connect Transformations Work

Transformations are defined in the connector configuration using the transforms property. You can chain multiple transformations together to create a pipeline of data manipulation steps.

Each transformation requires a name and a class that implements the transformation logic. Some transformations also have additional configuration options.

3. Example: Insert Field Transformation

The InsertField transformation allows you to add a static field to every record. For example, you can add a field that indicates the source of the data.


{
  "name": "jdbc-source-connector",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "tasks.max": "1",
  "connection.url": "jdbc:mysql://localhost:3306/mydb",
  "connection.user": "dbuser",
  "connection.password": "dbpassword",
  "table.whitelist": "my_table",
  "mode": "incrementing",
  "incrementing.column.name": "id",
  "topic.prefix": "jdbc-",
  "poll.interval.ms": "10000",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "transforms": "InsertSource",
  "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.InsertSource.static.field": "source",
  "transforms.InsertSource.static.value": "mysql"
}

    

In this example, every record that flows through the connector will have a new field called source with the value mysql.

4. Example: Mask Field Transformation

The MaskField transformation allows you to mask sensitive information in records, such as credit card numbers or passwords.


{
  "name": "jdbc-source-connector",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "tasks.max": "1",
  "connection.url": "jdbc:mysql://localhost:3306/mydb",
  "connection.user": "dbuser",
  "connection.password": "dbpassword",
  "table.whitelist": "my_table",
  "mode": "incrementing",
  "incrementing.column.name": "id",
  "topic.prefix": "jdbc-",
  "poll.interval.ms": "10000",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "transforms": "MaskSensitive",
  "transforms.MaskSensitive.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.MaskSensitive.fields": "credit_card_number"
}

    

This example masks the credit_card_number field in all records, ensuring that sensitive data is not exposed when written to Kafka.

5. Example: Filter Transformation

The Filter transformation can be used to filter records based on certain conditions, ensuring that only specific records pass through the connector.


{
  "name": "jdbc-source-connector",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "tasks.max": "1",
  "connection.url": "jdbc:mysql://localhost:3306/mydb",
  "connection.user": "dbuser",
  "connection.password": "dbpassword",
  "table.whitelist": "my_table",
  "mode": "incrementing",
  "incrementing.column.name": "id",
  "topic.prefix": "jdbc-",
  "poll.interval.ms": "10000",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "transforms": "FilterRecords",
  "transforms.FilterRecords.type": "org.apache.kafka.connect.transforms.Filter$Value",
  "transforms.FilterRecords.predicate": "isActive",
  "transforms.FilterRecords.negate": "false"
}

    

Here, only records where the isActive field is true will pass through the connector.

6. Chaining Multiple Transformations

Multiple transformations can be applied to a single connector by chaining them together using the transforms property. Each transformation is applied in the order it is defined.


{
  "name": "jdbc-source-connector",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "tasks.max": "1",
  "connection.url": "jdbc:mysql://localhost:3306/mydb",
  "connection.user": "dbuser",
  "connection.password": "dbpassword",
  "table.whitelist": "my_table",
  "mode": "incrementing",
  "incrementing.column.name": "id",
  "topic.prefix": "jdbc-",
  "poll.interval.ms": "10000",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "transforms": "InsertSource, MaskSensitive",
  "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.InsertSource.static.field": "source",
  "transforms.InsertSource.static.value": "mysql",
  "transforms.MaskSensitive.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.MaskSensitive.fields": "credit_card_number"
}

    

In this example, the InsertSource and MaskSensitive transformations are applied in sequence. First, the source field is added, and then the credit card number is masked.

7. Conclusion

Kafka Connect transformations provide a powerful way to manipulate data as it flows between Kafka and external systems. Whether you need to filter records, mask sensitive data, or reformat fields, transformations can help you customize the data pipeline to fit your needs.