# Collect data from Apache Kafka

{% hint style="info" %}
See the changelog of the **Apache Kafka** Listener [here](/listeners/apache-kafka-listener.md).
{% endhint %}

{% hint style="warning" %}
The **Apache Kafka** Listener is a **Pull** Listener and therefore should not be used in environments with more than one cluster.
{% endhint %}

## Overview

Onum supports integration with [Apache Kafka](https://kafka.apache.org/).

Apache Kafka is a distributed, fault-tolerant, high-throughput, and scalable streaming platform. It's used for building real-time data pipelines and streaming applications.

Select **Apache Kafka** from the list of Listener types and click **Configuration** to start.&#x20;

## Prerequisites

{% hint style="warning" %}
In order to use the **Apache Kafka** Listener, you must activate the following environment variable in your distributor using docker compose: `KAFKA_LISTENER_EXECUTION_ENABLED`
{% endhint %}

## Apache Kafka Setup

You will need to set up a running Kafka cluster, with optional group IDs and Topics.

## Onum Setup

{% stepper %}
{% step %}
Log in to your Onum tenant and click **Listeners > New listener**.
{% endstep %}

{% step %}
Double-click the **Apache Kafka** Listener.
{% endstep %}

{% step %}
Enter a **Name**<mark style="color:$primary;">**\***</mark> for the new Listener. Optionally, add a **Description** and some **Tags** to identify the Listener.
{% endstep %}

{% step %}
Enter the **Bootstrap servers**. These are the host-port pairs that act as the starting point to access the full set of alive servers in the cluster. Enter your value with format `host:port` and click **Add element** to add as many elements as required.
{% endstep %}

{% step %}
Enter the **Group ID** string, which uniquely identifies the group of consumer processes. Find this in your Kafka Cluster at **Home > Configuration > Consumer Properties**.&#x20;
{% endstep %}

{% step %}
We need to let the Listener know the **Topics** to connect to. Use `kafka-topics --bootstrap-server :9092 --describe` and write the result here. Click **Add element** to add as many topics as required.
{% endstep %}

{% step %}
Now, you must choose an **Auto offset reset policy**<mark style="color:red;">**\***</mark>. This policy defines the behavior when there are no committed positions available or when an offset is out of range. Choose between **Earliest**, **Latest**, or **None**.
{% endstep %}

{% step %}
Next, you must define the **Authentication configuration**, or select **None** if no authentication is required. Choose between:
{% endstep %}

{% step %}

* **Plain** - Enter your **Username**<mark style="color:red;">**\***</mark> and select your **Password**<mark style="color:red;">**\***</mark> from your [Secrets](/administration/global-settings/organization-settings/secrets-management.md) or create a new one.
* **Scram** - Enter the required information:
  * **Username**<mark style="color:red;">**\***</mark> - Enter your username.
  * **Password**<mark style="color:red;">**\***</mark> - Select your password from your [Secrets](/administration/global-settings/organization-settings/secrets-management.md) or create a new one.
  * **SCRAM mechanism**<mark style="color:red;">**\***</mark> - Choose either **SHA-256** or **SHA-512**.
* **mTLS** - Enter the required information:
  * **CA Certificate**<mark style="color:red;">**\***</mark> - Select your CA certificate from your [Secrets](/administration/global-settings/organization-settings/secrets-management.md) or create a new one.
  * **Client certificate**<mark style="color:red;">**\***</mark> - Select your client certificate from your [Secrets](/administration/global-settings/organization-settings/secrets-management.md) or create a new one.
  * **Client key**<mark style="color:red;">**\***</mark> - Select your client key from your [Secrets](/administration/global-settings/organization-settings/secrets-management.md) or create a new one.
  * **Skip verify** - Select **true** to skip or **false** to require verification.
  * **Server name** - Enter the name of the server to connect to.
  * **Minimum TLS version** - Select the required minimum version from the menu.
    {% endstep %}

{% step %}
Finally, click **Create labels**. Optionally, you can set labels to be used for internal Onum routing of data. By default, data will be set as **Unlabelled**.  &#x20;

{% hint style="info" %}
Learn more about labels in [this article](/the-workspace/listeners/labels.md).
{% endhint %}
{% endstep %}

{% step %}
Click **Create listener** when you're done.
{% endstep %}
{% endstepper %}

## Output Ports <a href="#ports" id="ports"></a>

The **Apache Kafka** Listener has two output ports:

* **Default port** - Events are sent through this port if no error occurs while processing them.
* **Error port** - Events are sent through this port if an error occurs while processing them.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.onum.com/the-workspace/listeners/listener-integrations/collect-data-from-apache-kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
