Skip to main content

Kafka Consumer connector

Consumes data from one or more Apache Kafka message brokers.

component image

In-ports

subscribe Boolean — triggers a subscription when a true value is received, unsubscribes on a false value. If the port is disconnected, the component will subscribe immediately after the initialization.

config JSON (dynamic) — accepts a JSON object with configuration properties that can be set at runtime.

Out-ports

messages JSON — emits data received from a Kafka broker.

errors JSON — emits error messages in case a subscription could not be started or failed.

status JSON — emits information messages about the subscription status.

Overview

This component can consume data from Kafka brokers or Kafka topics. Multiple brokers and topics are supported.

Subscribe to brokers and topics by specifying them in the component settings. Alternatively, you can specify component’s settings at runtime through the dedicated config port. The component starts a subscription immediately after initialization or configuration update. The messages it receives will be emitted on the messages port. To manually trigger a subscription to start, pass a true value to the subscribe port. To end an ongoing subscription, pass a false value to the subscribe port.

The status port emits SUBSCRIBED/UNSUBSCRIBED events indicating whether the component is ready to receive data from the broker. A SUBSCRIBED status message will also include metadata with current consumer configuration.

Errors

The errors port emits error messages in case the component can’t subscribe to a topic or failed to receive messages from the topic. The error message contains the isExternalServiceException flag to indicate whether the error occurred in the Kafka broker or Kelp Data Gateway. A true value indicates of an error in the Kafka broker. If the flag is not present, it means that Kelp Data Gateway was unable to connect to the Kafka broker.

Settings

Authentication

Configure authentication to the target Kafka environment or leave empty if no authentication and authorization is required.

Enable realtime config port

If enabled, the config port expects an object with dynamic configuration properties. Using this port instead of the component settings enables you to set or overwrite dynamic properties at runtime.

An event with a configuration object will not cause the component to reinitialize, but some previous state of the component may be lost.

Bootstrap server address (brokers)

URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.

Type: String
Required: Yes

Topic (topic)

Specify one or several Kafka topics in a comma-separated list.

Type: String
Required: Yes

Topic is pattern (topicIsPattern)

Enable if you want to set the topic name as a regular expression pattern. This can be used to subscribe to dynamic number of topics matching the pattern.

Type: Boolean
Required: No

Security protocol (securityProtocol)

Protocol used to communicate with brokers. The following protocols are supported:

  • PLAINTEXT (PLAINTEXT) — Data is sent in plain text. No authentication is used.
  • SSL (SSL) — Data is sent through SSL-encrypted channel. No authentication is used.
  • SASL_PLAINTEXT (SASL_PLAINTEXT) — Data is sent in plain text. SASL authentication is required.
  • SASL_SSL (SASL_SSL) — Data is sent through SSL-encrypted channel. SASL authentication is required.

Type: String
Default: SSL
Required: Yes

Seek to (seekTo)

Set where to start reading records from on startup.

  • Beginning (beginning) — read from the beginning.
  • End (end) — read from the end.

Type: String
Required: No

Auto offset reset (autoOffsetReset)

Specify action when there is no initial offset or if an offset is out of range.

  • Earliest (earliest) — automatically reset the offset to the earliest offset.
  • Latest (latest) — automatically reset the offset to the latest offset.

Type: String
Default: latest
Required: No

Client ID (clientId)

The client ID is the user-specified string sent in each request to help trace calls. It should logically identify the application making the request.

Type: String
Required: No

Group ID (groupId)

A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.

Type: String
Required: No

Group instance ID (groupInstanceId)

A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability (e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.

Type: String
Required: No

Schema registry URL (schemaRegistryURL)

This setting is only available for the Confluent Platform (not standard Apache Kafka). It is known as schema.registry.url in the Confluent Platform documentation and sets URLs for Schema Registry instances. The format is host1:port1,host2:port2.

Type: String
Required: No

Connections reuse strategy (connectionsReuseStrategy)

This setting lets you manage how the connections to the SSE server are reused between multiple SSE Stream components in your application. The connections are uniquely identified by application ID, user ID and component configuration object. The strategy is used by Kelp Data Gateway (DGW) to decide if an existing connection should be reused or the new one created. The following strategies are supported:

  • Always new (alwaysNew) — This strategy ensures that upstream server connections are never reused. Each time a subscription request is made, DGW will try to create a new connection to the upstream server. This approach can be useful when there are security concerns or data isolation requirements.
  • Reuse within application (reuseWithinApp) — With this strategy, an upstream server connection will be created once per user and application, and then reused for any subsequent subscription requests made from any user device. This means that the connection will be kept open while user has application running on any device, and will be reused for any new subscription requests made during that time. This approach can improve performance by reducing the overhead associated with creating a new connection for each request.

Type: String
Default: reuseWithinApp
Required: No