EDP

  1. Home
  2. Docs
  3. EDP
  4. 5. IMPLEMENTATION
  5. 5.3 Message format

5.3 Message format

Messages (aka Records) are always written in batches. The technical term for a batch of messages is a record batch, and a record batch contains one or more records. In the degenerate case, we could have a record batch containing a single record. Record batches and records have their own headers. The format of each is described below.

Messages (aka Records) are always written in batches. The technical term for a batch of messages is a record batch, and a record batch contains one or more records. In the degenerate case, we could have a record batch containing a single record. Record batches and records have their own headers. The format of each is described below. 

5.3.1 Record Batch

The following is the on-disk format of a RecordBatch. 

		baseOffset: int64
		batchLength: int32
		partitionLeaderEpoch: int32
		magic: int8 (current magic value is 2)
		crc: int32
		attributes: int16
			bit 0~2:
				0: no compression
				1: gzip
				2: snappy
				3: lz4
				4: zstd
			bit 3: timestampType
			bit 4: isTransactional (0 means not transactional)
			bit 5: isControlBatch (0 means not a control batch)
			bit 6~15: unused
		lastOffsetDelta: int32
		firstTimestamp: int64
		maxTimestamp: int64
		producerId: int64
		producerEpoch: int16
		baseSequence: int32
		records: [Record]

Note that when compression is enabled, the compressed record data is serialized directly following the count of the number of records. 

The CRC covers the data from the attributes to the end of the batch (i.e. all the bytes that follow the CRC). It is located after the magic byte, which means that clients must parse the magic byte before deciding how to interpret the bytes between the batch length and the magic byte. The partition leader epoch field is not included in the CRC computation to avoid the need to recompute the CRC when this field is assigned for every batch that is received by the broker. The CRC-32C (Castagnoli) polynomial is used for the computation.

On compaction: unlike the older message formats, magic v2 and above preserves the first and last offset/sequence numbers from the original batch when the log is cleaned. This is required in order to be able to restore the producer’s state when the log is reloaded. If we did not retain the last sequence number, for example, then after a partition leader failure, the producer might see an OutOfSequence error. The base sequence number must be preserved for duplicate checking (the broker checks incoming Produce requests for duplicates by verifying that the first and last sequence numbers of the incoming batch match the last from that producer). As a result, it is possible to have empty batches in the log when all the records in the batch are cleaned but batch is still retained in order to preserve a producer’s last sequence number. One oddity here is that the firstTimestamp field is not preserved during compaction, so it will change if the first record in the batch is compacted away.

5.3.1.1 Control Batches

A control batch contains a single record called the control record. Control records should not be passed on to applications. Instead, they are used by consumers to filter out aborted transactional messages.

The key of a control record conforms to the following schema: 

       version: int16 (current version is 0)
       type: int16 (0 indicates an abort marker, 1 indicates a commit)

The schema for the value of a control record is dependent on the type. The value is opaque to clients.

5.3.2 Record

Record level headers were introduced in Kafka 0.11.0. The on-disk format of a record with Headers is delineated below. 

		length: varint
		attributes: int8
			bit 0~7: unused
		timestampDelta: varint
		offsetDelta: varint
		keyLength: varint
		key: byte[]
		valueLen: varint
		value: byte[]
		Headers => [Header]
5.3.2.1 Record Header
		headerKeyLength: varint
		headerKey: String
		headerValueLength: varint
		Value: byte[]

We use the same varint encoding as Protobuf. More information on the latter can be found here. The count of headers in a record is also encoded as a varint.

5.3.3 Old Message Format

Prior to Kafka 0.11, messages were transferred and stored in message sets. In a message set, each message has its own metadata. Note that although message sets are represented as an array, they are not preceded by an int32 array size like other array elements in the protocol.Message Set:

    MessageSet (Version: 0) => [offset message_size message]
        offset => INT64
        message_size => INT32
        message => crc magic_byte attributes key value
            crc => INT32
            magic_byte => INT8
            attributes => INT8
                bit 0~2:
                    0: no compression
                    1: gzip
                    2: snappy
                bit 3~7: unused
            key => BYTES
            value => BYTES
    MessageSet (Version: 1) => [offset message_size message]
        offset => INT64
        message_size => INT32
        message => crc magic_byte attributes timestamp key value
            crc => INT32
            magic_byte => INT8
            attributes => INT8
                bit 0~2:
                    0: no compression
                    1: gzip
                    2: snappy
                    3: lz4
                bit 3: timestampType
                    0: create time
                    1: log append time
                bit 4~7: unused
            timestamp => INT64
            key => BYTES
            value => BYTES

In versions prior to Kafka 0.10, the only supported message format version (which is indicated in the magic value) was 0. Message format version 1 was introduced with timestamp support in version 0.10.

  • Similarly to version 2 above, the lowest bits of attributes represent the compression type.
  • In version 1, the producer should always set the timestamp type bit to 0. If the topic is configured to use log append time, (through either broker level config log.message.timestamp.type = LogAppendTime or topic level config message.timestamp.type = LogAppendTime), the broker will overwrite the timestamp type and the timestamp in the message set.
  • The highest bits of attributes must be set to 0.

In message format versions 0 and 1 Kafka supports recursive messages to enable compression. In this case the message’s attributes must be set to indicate one of the compression types and the value field will contain a message set compressed with that type. We often refer to the nested messages as “inner messages” and the wrapping message as the “outer message.” Note that the key should be null for the outer message and its offset will be the offset of the last inner message.

When receiving recursive version 0 messages, the broker decompresses them and each inner message is assigned an offset individually. In version 1, to avoid server side re-compression, only the wrapper message will be assigned an offset. The inner messages will have relative offsets. The absolute offset can be computed using the offset from the outer message, which corresponds to the offset assigned to the last inner message.

The crc field contains the CRC32 (and not CRC-32C) of the subsequent message bytes (i.e. from magic byte to the value).