Skip to main content

Kafka Consumer connector

Consumes data from one or more Apache Kafka message brokers.

component image

In-ports

config JSON (dynamic) — accepts dynamic configuration properties overwriting current properties specified in the component settings. See below for the Config object properties.

subscribe Boolean — triggers a subscription when a true value is received, unsubscribes on a false value. If the port is disconnected, the component will subscribe immediately after the initialization.

Out-ports

messages JSON — emits data received from a Kafka broker.

errors JSON — emits error messages in case a subscription could not be started or failed.

status JSON — emits information messages about the subscription status.

Overview

This component can consume data from Kafka brokers or Kafka topics. Multiple brokers and topics are supported.

Subscribe to brokers and topics by specifying them in the component settings. Alternatively, you can specify component’s settings at runtime through the dedicated config port. The component starts a subscription immediately after initialization or configuration update. The messages it receives will be emitted on the messages port. To manually trigger a subscription to start, pass a true value to the subscribe port. To end an ongoing subscription, pass a false value to the subscribe port.

The status port emits SUBSCRIBED/UNSUBSCRIBED events indicating whether the component is ready to receive data from the broker. A SUBSCRIBED status message will also include metadata with current consumer configuration.

Errors

The errors port emits error messages in case the component can’t subscribe to a topic or failed to receive messages from the topic. The error message contains the isExternalServiceException flag to indicate whether the error occurred in the Kafka broker or Kelp Data Gateway. A true value indicates of an error in the Kafka broker. If the flag is not present, it means that Kelp Data Gateway was unable to connect to the Kafka broker.

Settings

Authentication

Configure authentication to the target Kafka environment or leave empty if no authentication and authorization is required.

Enable config port

If enabled, the config port expects an object with dynamic configuration properties. Using this port instead of the component settings enables you to set or overwrite dynamic properties at runtime.

An event with a configuration object will not cause the component to reinitialize, but some previous state of the component may be lost.

Bootstrap server address

URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.

Topic

Specify one or several Kafka topics in a comma-separated list.

Topic is pattern

Enable if you want to set the topic name as a regular expression pattern. This can be used to subscribe to dynamic number of topics matching the pattern.

Security protocol

Protocol used to communicate with brokers. The following protocols are supported:

  • PLAINTEXT — Data is sent in plain text. No authentication is used.
  • SSL — Data is sent through SSL-encrypted channel. No authentication is used.
  • SASL_PLAINTEXT — Data is sent in plain text. SASL authentication is required.
  • SASL_SSL — Data is sent through SSL-encrypted channel. SASL authentication is required.

Seek to

Set where to start reading records from on startup.

  • BEGINNING — read from the beginning.
  • END — read from the end.

Auto offset reset

Specify action when there is no initial offset or if an offset is out of range.

  • Earliest — automatically reset the offset to the earliest offset.
  • Latest — automatically reset the offset to the latest offset.

Client ID

The client ID is the user-specified string sent in each request to help trace calls. It should logically identify the application making the request.

Group ID

A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.

Group instance ID

A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability (e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.

Schema registry URL

This setting is only available for the Confluent Platform (not standard Apache Kafka). It is known as schema.registry.url in the Confluent Platform documentation and sets URLs for Schema Registry instances. The format is host1:port1,host2:port2.

Config object properties

The component can be configured at runtime through the config port, given that the Enable config port setting is on. The configuration object may configure the following of the component’s settings:

PropertyTypeRequired?Default
brokersStringyes
topicStringyes
topicIsPatternBooleanno
securityProtocolStringyes"SSL"
seekToStringno
autoOffsetResetStringno"latest"
clientIdStringno
groupIdStringno
groupInstanceIdStringno
schemaRegistryURLStringno