EDP

  1. Home
  2. Docs
  3. EDP
  4. 6. OPERATIONS
  5. 6.6.6 Streams Monitoring

6.6.6 Streams Monitoring

A Kafka Streams instance contains all the producer and consumer metrics as well as additional metrics specific to Streams. By default Kafka Streams has metrics with two recording levels: debug and info.

Note that the metrics have a 4-layer hierarchy. At the top level there are client-level metrics for each started Kafka Streams client. Each client has stream threads, with their own metrics. Each stream thread has tasks, with their own metrics. Each task has a number of processor nodes, with their own metrics. Each task also has a number of state stores and record caches, all with their own metrics.Use the following configuration option to specify which metrics you want collected:

metrics.recording.level="info"
Client Metrics

All of the following metrics have a recording level of info:

METRIC/ATTRIBUTE NAMEDESCRIPTIONMBEAN NAME
versionThe version of the Kafka Streams client.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
commit-idThe version control commit ID of the Kafka Streams client.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
application-idThe application ID of the Kafka Streams client.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
topology-descriptionThe description of the topology executed in the Kafka Streams client.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
stateThe state of the Kafka Streams client.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
Thread Metrics

All of the following metrics have a recording level of info:

METRIC/ATTRIBUTE NAMEDESCRIPTIONMBEAN NAME
commit-latency-avgThe average execution time in ms, for committing, across all running tasks of this thread.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
commit-latency-maxThe maximum execution time in ms, for committing, across all running tasks of this thread.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-latency-avgThe average execution time in ms, for consumer polling.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-latency-maxThe maximum execution time in ms, for consumer polling.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-latency-avgThe average execution time in ms, for processing.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-latency-maxThe maximum execution time in ms, for processing.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-latency-avgThe average execution time in ms, for punctuating.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-latency-maxThe maximum execution time in ms, for punctuating.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
commit-rateThe average number of commits per second.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
commit-totalThe total number of commit calls.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-rateThe average number of consumer poll calls per second.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-totalThe total number of consumer poll calls.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-rateThe average number of processed records per second.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-totalThe total number of processed records.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-rateThe average number of punctuate calls per second.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-totalThe total number of punctuate calls.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-created-rateThe average number of tasks created per second.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-created-totalThe total number of tasks created.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-closed-rateThe average number of tasks closed per second.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-closed-totalThe total number of tasks closed.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
Task Metrics

All of the following metrics have a recording level of debug, except for metrics dropped-records-rate and dropped-records-total which have a recording level of info:

METRIC/ATTRIBUTE NAMEDESCRIPTIONMBEAN NAME
process-latency-avgThe average execution time in ns, for processing.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
process-latency-maxThe maximum execution time in ns, for processing.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
process-rateThe average number of processed records per second across all source processor nodes of this task.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
process-totalThe total number of processed records across all source processor nodes of this task.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
commit-latency-avgThe average execution time in ns, for committing.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
commit-latency-maxThe maximum execution time in ns, for committing.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
commit-rateThe average number of commit calls per second.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
commit-totalThe total number of commit calls. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
record-lateness-avgThe average observed lateness of records (stream time – record timestamp).kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
record-lateness-maxThe max observed lateness of records (stream time – record timestamp).kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
enforced-processing-rateThe average number of enforced processings per second.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
enforced-processing-totalThe total number enforced processings.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
dropped-records-rateThe average number of records dropped within this task.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
dropped-records-totalThe total number of records dropped within this task.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
Processor Node Metrics

The following metrics are only available on certain types of nodes, i.e., process-rate and process-total are only available for source processor nodes and suppression-emit-rate and suppression-emit-total are only available for suppression operation nodes. All of the metrics have a recording level of debug:

METRIC/ATTRIBUTE NAMEDESCRIPTIONMBEAN NAME
process-rateThe average number of records processed by a source processor node per second.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
process-totalThe total number of records processed by a source processor node per second.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
suppression-emit-rateThe rate at which records that have been emitted downstream from suppression operation nodes.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
suppression-emit-totalThe total number of records that have been emitted downstream from suppression operation nodes.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
State Store Metrics

All of the following metrics have a recording level of debug. Note that the store-scopevalue is specified in StoreSupplier#metricsScope() for user’s customized state stores; for built-in state stores, currently we have:

  • in-memory-state
  • in-memory-lru-state
  • in-memory-window-state
  • in-memory-suppression (for suppression buffers)
  • rocksdb-state (for RocksDB backed key-value store)
  • rocksdb-window-state (for RocksDB backed window store)
  • rocksdb-session-state (for RocksDB backed session store)

Metrics suppression-buffer-size-avg, suppression-buffer-size-max, suppression-buffer-count-avg, and suppression-buffer-count-max are only available for suppression buffers. All other metrics are not available for suppression buffers.

METRIC/ATTRIBUTE NAMEDESCRIPTIONMBEAN NAME
put-latency-avgThe average put execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-latency-maxThe maximum put execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-latency-avgThe average put-if-absent execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-latency-maxThe maximum put-if-absent execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-latency-avgThe average get execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-latency-maxThe maximum get execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-latency-avgThe average delete execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-latency-maxThe maximum delete execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-latency-avgThe average put-all execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-latency-maxThe maximum put-all execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-latency-avgThe average all operation execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-latency-maxThe maximum all operation execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-latency-avgThe average range execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-latency-maxThe maximum range execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-latency-avgThe average flush execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-latency-maxThe maximum flush execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-latency-avgThe average restore execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-latency-maxThe maximum restore execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-rateThe average put rate for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-rateThe average put-if-absent rate for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-rateThe average get rate for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-rateThe average delete rate for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-rateThe average put-all rate for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-rateThe average all operation rate for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-rateThe average range rate for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-rateThe average flush rate for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-rateThe average restore rate for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
suppression-buffer-size-avgThe average total size, in bytes, of the buffered data over the sampling window.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
suppression-buffer-size-maxThe maximum total size, in bytes, of the buffered data over the sampling window.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
suppression-buffer-count-avgThe average number of records buffered over the sampling window.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
suppression-buffer-count-maxThe maximum number of records buffered over the sampling window.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
RocksDB Metrics

All of the following metrics have a recording level of debug. The metrics are collected every minute from the RocksDB state stores. If a state store consists of multiple RocksDB instances as it is the case for aggregations over time and session windows, each metric reports an aggregation over the RocksDB instances of the state store. Note that the store-scope for built-in RocksDB state stores are currently the following:

  • rocksdb-state (for RocksDB backed key-value store)
  • rocksdb-window-state (for RocksDB backed window store)
  • rocksdb-session-state (for RocksDB backed session store)
METRIC/ATTRIBUTE NAMEDESCRIPTIONMBEAN NAME
bytes-written-rateThe average number of bytes written per second to the RocksDB state store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-written-totalThe total number of bytes written to the RocksDB state store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-rateThe average number of bytes read per second from the RocksDB state store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-totalThe total number of bytes read from the RocksDB state store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-bytes-flushed-rateThe average number of bytes flushed per second from the memtable to disk.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-bytes-flushed-totalThe total number of bytes flushed from the memtable to disk.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-hit-ratioThe ratio of memtable hits relative to all lookups to the memtable.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-data-hit-ratioThe ratio of block cache hits for data blocks relative to all lookups for data blocks to the block cache.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-index-hit-ratioThe ratio of block cache hits for index blocks relative to all lookups for index blocks to the block cache.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-filter-hit-ratioThe ratio of block cache hits for filter blocks relative to all lookups for filter blocks to the block cache.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
write-stall-duration-avgThe average duration of write stalls in ms.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
write-stall-duration-totalThe total duration of write stalls in ms.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-compaction-rateThe average number of bytes read per second during compaction.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-written-compaction-rateThe average number of bytes written per second during compaction.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
number-open-filesThe number of current open files.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
number-file-errors-totalThe total number of file errors occurred.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
Record Cache Metrics

All of the following metrics have a recording level of debug:

METRIC/ATTRIBUTE NAMEDESCRIPTIONMBEAN NAME
hit-ratio-avgThe average cache hit ratio defined as the ratio of cache read hits over the total cache read requests.kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
hit-ratio-minThe mininum cache hit ratio.kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
hit-ratio-maxThe maximum cache hit ratio.kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)