EDP

  1. Home
  2. Docs
  3. EDP
  4. 8. KAFKA CONNECT
  5. 8.2.5 REST API

8.2.5 REST API

Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. The REST API server can be configured using the listeners configuration option. This field should contain a list of listeners in the following format: protocol://host:port,protocol2://host2:port2. Currently supported protocols are http and https. For example:

        listeners=http://localhost:8080,https://localhost:8443

By default, if no listeners are specified, the REST server runs on port 8083 using the HTTP protocol. When using HTTPS, the configuration has to include the SSL configuration. By default, it will use the ssl.* settings. In case it is needed to use different configuration for the REST API than for connecting to Kafka brokers, the fields can be prefixed with listeners.https. When using the prefix, only the prefixed options will be used and the ssl.* options without the prefix will be ignored. Following fields can be used to configure HTTPS for the REST API:

  • ssl.keystore.location
  • ssl.keystore.password
  • ssl.keystore.type
  • ssl.key.password
  • ssl.truststore.location
  • ssl.truststore.password
  • ssl.truststore.type
  • ssl.enabled.protocols
  • ssl.provider
  • ssl.protocol
  • ssl.cipher.suites
  • ssl.keymanager.algorithm
  • ssl.secure.random.implementation
  • ssl.trustmanager.algorithm
  • ssl.endpoint.identification.algorithm
  • ssl.client.auth

The REST API is used not only by users to monitor / manage Kafka Connect. It is also used for the Kafka Connect cross-cluster communication. Requests received on the follower nodes REST API will be forwarded to the leader node REST API. In case the URI under which is given host reachable is different from the URI which it listens on, the configuration options rest.advertised.host.namerest.advertised.port and rest.advertised.listener can be used to change the URI which will be used by the follower nodes to connect with the leader. When using both HTTP and HTTPS listeners, the rest.advertised.listener option can be also used to define which listener will be used for the cross-cluster communication. When using HTTPS for communication between nodes, the same ssl.* or listeners.https options will be used to configure the HTTPS client.

The following are the currently supported REST API endpoints:

  • GET /connectors – return a list of active connectors
  • POST /connectors – create a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
  • GET /connectors/{name} – get information about a specific connector
  • GET /connectors/{name}/config – get the configuration parameters for a specific connector
  • PUT /connectors/{name}/config – update the configuration parameters for a specific connector
  • GET /connectors/{name}/status – get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
  • GET /connectors/{name}/tasks – get a list of tasks currently running for a connector
  • GET /connectors/{name}/tasks/{taskid}/status – get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
  • PUT /connectors/{name}/pause – pause the connector and its tasks, which stops message processing until the connector is resumed
  • PUT /connectors/{name}/resume – resume a paused connector (or do nothing if the connector is not paused)
  • POST /connectors/{name}/restart – restart a connector (typically because it has failed)
  • POST /connectors/{name}/tasks/{taskId}/restart – restart an individual task (typically because it has failed)
  • DELETE /connectors/{name} – delete a connector, halting all tasks and deleting its configuration
  • GET /connectors/{name}/topics – get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued
  • PUT /connectors/{name}/topics/reset – send a request to empty the set of active topics of a connector

Kafka Connect also provides a REST API for getting information about connector plugins:

  • GET /connector-plugins– return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jars
  • PUT /connector-plugins/{connector-type}/config/validate – validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.

The following is a supported REST request at the top-level (root) endpoint:

  • GET /– return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to.

Error Reporting in Connect

Kafka Connect provides error reporting to handle errors encountered along various stages of processing. By default, any error encountered during conversion or within transformations will cause the connector to fail. Each connector configuration can also enable tolerating such errors by skipping them, optionally writing each error and the details of the failed operation and problematic record (with various levels of detail) to the Connect application log. These mechanisms also capture errors when a sink connector is processing the messages consumed from its Kafka topics, and all of the errors can be written to a configurable “dead letter queue” (DLQ) Kafka topic.

To report errors within a connector’s converter, transforms, or within the sink connector itself to the log, set errors.log.enable=true in the connector configuration to log details of each error and problem record’s topic, partition, and offset. For additional debugging purposes, set errors.log.include.messages=true to also log the problem record key, value, and headers to the log (note this may log sensitive information).

To report errors within a connector’s converter, transforms, or within the sink connector itself to a dead letter queue topic, set errors.deadletterqueue.topic.name, and optionally errors.deadletterqueue.context.headers.enable=true.

By default connectors exhibit “fail fast” behavior immediately upon an error or exception. This is equivalent to adding the following configuration properties with their defaults to a connector configuration:

        # disable retries on failure
        errors.retry.timeout=0

        # do not log the error and their contexts
        errors.log.enable=false

        # do not record errors in a dead letter queue topic
        errors.deadletterqueue.topic.name=

        # Fail on first error
        errors.tolerance=none

These and other related connector configuration properties can be changed to provide different behavior. For example, the following configuration properties can be added to a connector configuration to setup error handling with multiple retries, logging to the application logs and the my-connector-errors Kafka topic, and tolerating all errors by reporting them rather than failing the connector task:

        # retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
        errors.retry.timeout=600000
        errors.retry.delay.max.ms=30000

        # log error context along with application logs, but do not include configs and messages
        errors.log.enable=true
        errors.log.include.messages=false

        # produce error context into the Kafka topic
        errors.deadletterqueue.topic.name=my-connector-errors

        # Tolerate all errors.
        errors.tolerance=all