EDP

  1. Home
  2. Docs
  3. EDP
  4. 8. KAFKA CONNECT
  5. 8.2.4 Transformations

8.2.4 Transformations

Connectors can be configured with transformations to make lightweight message-at-a-time modifications. They can be convenient for data massaging and event routing.

A transformation chain can be specified in the connector configuration.

  • transforms – List of aliases for the transformation, specifying the order in which the transformations will be applied.
  • transforms.$alias.type – Fully qualified class name for the transformation.
  • transforms.$alias.$transformationSpecificConfig Configuration properties for the transformation

For example, lets take the built-in file source connector and use a transformation to add a static field.

Throughout the example we’ll use schemaless JSON data format. To use schemaless format, we changed the following two lines in connect-standalone.properties from true to false:

        key.converter.schemas.enable
        value.converter.schemas.enable

The file source connector reads each line as a String. We will wrap each line in a Map and then add a second field to identify the origin of the event. To do this, we use two transformations:

  • HoistField to place the input line inside a Map
  • InsertField to add the static field. In this example we’ll indicate that the record came from a file connector

After adding the transformations, connect-file-source.properties file looks as following:

        name=local-file-source
        connector.class=FileStreamSource
        tasks.max=1
        file=test.txt
        topic=connect-test
        transforms=MakeMap, InsertSource
        transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
        transforms.MakeMap.field=line
        transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
        transforms.InsertSource.static.field=data_source
        transforms.InsertSource.static.value=test-file-source

All the lines starting with transforms were added for the transformations. You can see the two transformations we created: “InsertSource” and “MakeMap” are aliases that we chose to give the transformations. The transformation types are based on the list of built-in transformations you can see below. Each transformation type has additional configuration: HoistField requires a configuration called “field”, which is the name of the field in the map that will include the original String from the file. InsertField transformation lets us specify the field name and the value that we are adding.

When we ran the file source connector on my sample file without the transformations, and then read them using kafka-console-consumer.sh, the results were:

        "foo"
        "bar"
        "hello world"

We then create a new file connector, this time after adding the transformations to the configuration file. This time, the results will be:

        {"line":"foo","data_source":"test-file-source"}
        {"line":"bar","data_source":"test-file-source"}
        {"line":"hello world","data_source":"test-file-source"}

You can see that the lines we’ve read are now part of a JSON map, and there is an extra field with the static value we specified. This is just one example of what you can do with transformations.

Included transformations

Several widely-applicable data and routing transformations are included with Kafka Connect:

  • InsertField – Add a field using either static data or record metadata
  • ReplaceField – Filter or rename fields
  • MaskField – Replace field with valid null value for the type (0, empty string, etc) or custom replacement (non-empty string or numeric value only)
  • ValueToKey – Replace the record key with a new key formed from a subset of fields in the record value
  • HoistField – Wrap the entire event as a single field inside a Struct or a Map
  • ExtractField – Extract a specific field from Struct and Map and include only this field in results
  • SetSchemaMetadata – modify the schema name or version
  • TimestampRouter – Modify the topic of a record based on original topic and timestamp. Useful when using a sink that needs to write to different tables or indexes based on timestamps
  • RegexRouter – modify the topic of a record based on original topic, replacement string and a regular expression
  • Filter – Removes messages from all further processing. This is used with a predicate to selectively filter certain messages.

Details on how to configure each transformation are listed below:

org.apache.kafka.connect.transforms.InsertField

Insert field(s) using attributes from the record metadata or a configured static value.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.InsertField$Key) or value (org.apache.kafka.connect.transforms.InsertField$Value).

  • offset.fieldField name for Kafka offset – only applicable to sink connectors.
    Suffix with ! to make this a required field, or ? to keep it optional (the default).Type:stringDefault:nullValid Values:Importance:medium
  • partition.fieldField name for Kafka partition. Suffix with ! to make this a required field, or ? to keep it optional (the default).Type:stringDefault:nullValid Values:Importance:medium
  • static.fieldField name for static data field. Suffix with ! to make this a required field, or ? to keep it optional (the default).Type:stringDefault:nullValid Values:Importance:medium
  • static.valueStatic field value, if field name configured.Type:stringDefault:nullValid Values:Importance:medium
  • timestamp.fieldField name for record timestamp. Suffix with ! to make this a required field, or ? to keep it optional (the default).Type:stringDefault:nullValid Values:Importance:medium
  • topic.fieldField name for Kafka topic. Suffix with ! to make this a required field, or ? to keep it optional (the default).Type:stringDefault:nullValid Values:Importance:medium
org.apache.kafka.connect.transforms.ReplaceField

Filter or rename fields.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ReplaceField$Key) or value (org.apache.kafka.connect.transforms.ReplaceField$Value).

  • blacklistFields to exclude. This takes precedence over the whitelist.Type:listDefault:””Valid Values:Importance:medium
  • renamesField rename mappings.Type:listDefault:””Valid Values:list of colon-delimited pairs, e.g. foo:bar,abc:xyzImportance:medium
  • whitelistFields to include. If specified, only these fields will be used.Type:listDefault:””Valid Values:Importance:medium
org.apache.kafka.connect.transforms.MaskField

Mask specified fields with a valid null value for the field type (i.e. 0, false, empty string, and so on).For numeric and string fields, an optional replacement value can be specified that is converted to the correct type.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.MaskField$Key) or value (org.apache.kafka.connect.transforms.MaskField$Value).

  • fieldsNames of fields to mask.Type:listDefault:Valid Values:non-empty listImportance:high
  • replacementCustom value replacement, that will be applied to all ‘fields’ values (numeric or non-empty string values only).Type:stringDefault:nullValid Values:non-empty stringImportance:low
org.apache.kafka.connect.transforms.ValueToKey

Replace the record key with a new key formed from a subset of fields in the record value.

  • fieldsField names on the record value to extract as the record key.Type:listDefault:Valid Values:non-empty listImportance:high
org.apache.kafka.connect.transforms.HoistField

Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.HoistField$Key) or value (org.apache.kafka.connect.transforms.HoistField$Value).

  • fieldField name for the single field that will be created in the resulting Struct or Map.Type:stringDefault:Valid Values:Importance:medium
org.apache.kafka.connect.transforms.ExtractField

Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data. Any null values are passed through unmodified.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ExtractField$Key) or value (org.apache.kafka.connect.transforms.ExtractField$Value).

  • fieldField name to extract.Type:stringDefault:Valid Values:Importance:medium
org.apache.kafka.connect.transforms.SetSchemaMetadata

Set the schema name, version or both on the record’s key (org.apache.kafka.connect.transforms.SetSchemaMetadata$Key) or value (org.apache.kafka.connect.transforms.SetSchemaMetadata$Value) schema.

  • schema.nameSchema name to set.Type:stringDefault:nullValid Values:Importance:high
  • schema.versionSchema version to set.Type:intDefault:nullValid Values:Importance:high
org.apache.kafka.connect.transforms.TimestampRouter

Update the record’s topic field as a function of the original topic value and the record timestamp.This is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system(e.g. database table or search index name).

  • timestamp.formatFormat string for the timestamp that is compatible with java.text.SimpleDateFormat.Type:stringDefault:yyyyMMddValid Values:Importance:high
  • topic.formatFormat string which can contain ${topic} and ${timestamp} as placeholders for the topic and timestamp, respectively.Type:stringDefault:${topic}-${timestamp}Valid Values:Importance:high
org.apache.kafka.connect.transforms.RegexRouter

Update the record topic using the configured regular expression and replacement string.Under the hood, the regex is compiled to a java.util.regex.Pattern. If the pattern matches the input topic, java.util.regex.Matcher#replaceFirst() is used with the replacement string to obtain the new topic.

  • regexRegular expression to use for matching.Type:stringDefault:Valid Values:valid regexImportance:high
  • replacementReplacement string.Type:stringDefault:Valid Values:Importance:high
org.apache.kafka.connect.transforms.Flatten

Flatten a nested data structure, generating names for each field by concatenating the field names at each level with a configurable delimiter character. Applies to Struct when schema present, or a Map in the case of schemaless data. The default delimiter is ‘.’.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Flatten$Key) or value (org.apache.kafka.connect.transforms.Flatten$Value).

  • delimiterDelimiter to insert between field names from the input record when generating field names for the output recordType:stringDefault:.Valid Values:Importance:medium
org.apache.kafka.connect.transforms.Cast

Cast fields or the entire key or value to a specific type, e.g. to force an integer field to a smaller width. Only simple primitive types are supported — integers, floats, boolean, and string. Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Cast$Key) or value (org.apache.kafka.connect.transforms.Cast$Value).

  • specList of fields and the type to cast them to of the form field1:type,field2:type to cast fields of Maps or Structs. A single type to cast the entire value. Valid types are int8, int16, int32, int64, float32, float64, boolean, and string.Type:listDefault:Valid Values:list of colon-delimited pairs, e.g. foo:bar,abc:xyzImportance:high
org.apache.kafka.connect.transforms.TimestampConverter

Convert timestamps between different formats such as Unix epoch, strings, and Connect Date/Timestamp types.Applies to individual fields or to the entire value.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.TimestampConverter$Key) or value (org.apache.kafka.connect.transforms.TimestampConverter$Value).

  • target.typeThe desired timestamp representation: string, unix, Date, Time, or TimestampType:stringDefault:Valid Values:Importance:high
  • fieldThe field containing the timestamp, or empty if the entire value is a timestampType:stringDefault:””Valid Values:Importance:high
  • formatA SimpleDateFormat-compatible format for the timestamp. Used to generate the output when type=string or used to parse the input if the input is a string.Type:stringDefault:””Valid Values:Importance:medium
org.apache.kafka.connect.transforms.Filter

Drops all records, filtering them from subsequent transformations in the chain. This is intended to be used conditionally to filter out records matching (or not matching) a particular Predicate.

Predicates

Transformations can be configured with predicates so that the transformation is applied only to messages which satisfy some condition. In particular, when combined with the Filtertransformation predicates can be used to selectively filter out certain messages.

Predicates are specified in the connector configuration.

  • predicates – Set of aliases for the predicates to be applied to some of the transformations.
  • predicates.$alias.type – Fully qualified class name for the predicate.
  • predicates.$alias.$predicateSpecificConfig – Configuration properties for the predicate.

All transformations have the implicit config properties predicate and negate. A predicular predicate is associated with a transformation by setting the transformation’s predicateconfig to the predicate’s alias. The predicate’s value can be reversed using the negateconfiguration property.

For example, suppose you have a source connector which produces messages to many different topics and you want to:

  • filter out the messages in the ‘foo’ topic entirely
  • apply the ExtractField transformation with the field name ‘other_field’ to records in all topics except the topic ‘bar’

To do this we need first to filter out the records destined for the topic ‘foo’. The Filter transformation removes records from further processing, and can use the TopicNameMatches predicate to apply the transformation only to records in topics which match a certain regular expression. TopicNameMatches’s only configuration property is pattern which is a Java regular expression for matching against the topic name. The configuration would look like this:

        transforms=Filter
        transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
        transforms.Filter.predicate=IsFoo

        predicates=IsFoo
        predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches
        predicates.IsFoo.pattern=foo

Next we need to apply ExtractField only when the topic name of the record is not ‘bar’. We can’t just use TopicNameMatches directly, because that would apply the transformation to matching topic names, not topic names which do not match. The transformation’s implicit negate config properties allows us to invert the set of records which a predicate matches. Adding the configuration for this to the previous example we arrive at:

        transforms=Filter,Extract
        transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
        transforms.Filter.predicate=IsFoo

        transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
        transforms.Extract.field=other_field
        transforms.Extract.predicate=IsBar
        transforms.Extract.negate=true

        predicates=IsFoo,IsBar
        predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches
        predicates.IsFoo.pattern=foo

        predicates.IsBar.type=org.apache.kafka.connect.predicates.TopicNameMatches
        predicates.IsBar.pattern=bar

Kafka Connect includes the following predicates:

  • TopicNameMatches – matches records in a topic with a name matching a particular Java regular expression.
  • HasHeaderKey – matches records which have a header with the given key.
  • RecordIsTombstone – matches tombstone records, that is records with a null value.

Details on how to configure each predicate are listed below:

org.apache.kafka.connect.transforms.predicates.HasHeaderKey

A predicate which is true for records with at least one header with the configured name.

  • nameThe header name.Type:stringDefault:Valid Values:non-empty stringImportance:medium
org.apache.kafka.connect.transforms.predicates.RecordIsTombstone

A predicate which is true for records which are tombstones (i.e. have null value).

org.apache.kafka.connect.transforms.predicates.TopicNameMatches

A predicate which is true for records with a topic name that matches the configured regular expression.

  • patternA Java regular expression for matching against the name of a record’s topic.Type:stringDefault:Valid Values:non-empty string, valid regexImportance:medium