Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. The REST API server can be configured using the listeners
configuration option. This field should contain a list of listeners in the following format: protocol://host:port,protocol2://host2:port2
. Currently supported protocols are http
and https
. For example:
listeners=http://localhost:8080,https://localhost:8443
By default, if no listeners
are specified, the REST server runs on port 8083 using the HTTP protocol. When using HTTPS, the configuration has to include the SSL configuration. By default, it will use the ssl.*
settings. In case it is needed to use different configuration for the REST API than for connecting to Kafka brokers, the fields can be prefixed with listeners.https
. When using the prefix, only the prefixed options will be used and the ssl.*
options without the prefix will be ignored. Following fields can be used to configure HTTPS for the REST API:
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.key.password
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
ssl.enabled.protocols
ssl.provider
ssl.protocol
ssl.cipher.suites
ssl.keymanager.algorithm
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.endpoint.identification.algorithm
ssl.client.auth
The REST API is used not only by users to monitor / manage Kafka Connect. It is also used for the Kafka Connect cross-cluster communication. Requests received on the follower nodes REST API will be forwarded to the leader node REST API. In case the URI under which is given host reachable is different from the URI which it listens on, the configuration options rest.advertised.host.name
, rest.advertised.port
and rest.advertised.listener
can be used to change the URI which will be used by the follower nodes to connect with the leader. When using both HTTP and HTTPS listeners, the rest.advertised.listener
option can be also used to define which listener will be used for the cross-cluster communication. When using HTTPS for communication between nodes, the same ssl.*
or listeners.https
options will be used to configure the HTTPS client.
The following are the currently supported REST API endpoints:
GET /connectors
– return a list of active connectorsPOST /connectors
– create a new connector; the request body should be a JSON object containing a stringname
field and an objectconfig
field with the connector configuration parametersGET /connectors/{name}
– get information about a specific connectorGET /connectors/{name}/config
– get the configuration parameters for a specific connectorPUT /connectors/{name}/config
– update the configuration parameters for a specific connectorGET /connectors/{name}/status
– get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasksGET /connectors/{name}/tasks
– get a list of tasks currently running for a connectorGET /connectors/{name}/tasks/{taskid}/status
– get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failedPUT /connectors/{name}/pause
– pause the connector and its tasks, which stops message processing until the connector is resumedPUT /connectors/{name}/resume
– resume a paused connector (or do nothing if the connector is not paused)POST /connectors/{name}/restart
– restart a connector (typically because it has failed)POST /connectors/{name}/tasks/{taskId}/restart
– restart an individual task (typically because it has failed)DELETE /connectors/{name}
– delete a connector, halting all tasks and deleting its configurationGET /connectors/{name}/topics
– get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issuedPUT /connectors/{name}/topics/reset
– send a request to empty the set of active topics of a connector
Kafka Connect also provides a REST API for getting information about connector plugins:
GET /connector-plugins
– return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jarsPUT /connector-plugins/{connector-type}/config/validate
– validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.
The following is a supported REST request at the top-level (root) endpoint:
GET /
– return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to.
Error Reporting in Connect
Kafka Connect provides error reporting to handle errors encountered along various stages of processing. By default, any error encountered during conversion or within transformations will cause the connector to fail. Each connector configuration can also enable tolerating such errors by skipping them, optionally writing each error and the details of the failed operation and problematic record (with various levels of detail) to the Connect application log. These mechanisms also capture errors when a sink connector is processing the messages consumed from its Kafka topics, and all of the errors can be written to a configurable “dead letter queue” (DLQ) Kafka topic.
To report errors within a connector’s converter, transforms, or within the sink connector itself to the log, set errors.log.enable=true
in the connector configuration to log details of each error and problem record’s topic, partition, and offset. For additional debugging purposes, set errors.log.include.messages=true
to also log the problem record key, value, and headers to the log (note this may log sensitive information).
To report errors within a connector’s converter, transforms, or within the sink connector itself to a dead letter queue topic, set errors.deadletterqueue.topic.name
, and optionally errors.deadletterqueue.context.headers.enable=true
.
By default connectors exhibit “fail fast” behavior immediately upon an error or exception. This is equivalent to adding the following configuration properties with their defaults to a connector configuration:
# disable retries on failure
errors.retry.timeout=0
# do not log the error and their contexts
errors.log.enable=false
# do not record errors in a dead letter queue topic
errors.deadletterqueue.topic.name=
# Fail on first error
errors.tolerance=none
These and other related connector configuration properties can be changed to provide different behavior. For example, the following configuration properties can be added to a connector configuration to setup error handling with multiple retries, logging to the application logs and the my-connector-errors
Kafka topic, and tolerating all errors by reporting them rather than failing the connector task:
# retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
errors.retry.timeout=600000
errors.retry.delay.max.ms=30000
# log error context along with application logs, but do not include configs and messages
errors.log.enable=true
errors.log.include.messages=false
# produce error context into the Kafka topic
errors.deadletterqueue.topic.name=my-connector-errors
# Tolerate all errors.
errors.tolerance=all