# Send data to Apache Kafka

{% hint style="info" %}
See the changelog of the **Apache Kafka** Data sink type [here](/data-sinks/apache-kafka-data-sink.md).
{% endhint %}

## Overview

Onum supports integration with [Apache Kafka](https://kafka.apache.org/).

The **Apache Kafka** Data sink lets you send data messages to a Kafka topic, using information from events to construct messages. This component supports various configurations, including Kafka brokers, topic partitions, authentication methods, and message compression, providing a flexible and scalable approach for integrating with Kafka.

## Prerequisites&#x20;

You will need to set up a running Kafka cluster, with optional group IDs and Topics.

## Onum Setup

{% stepper %}
{% step %}
Log in to your Onum tenant and click **Data Sinks> New Data sink**.
{% endstep %}

{% step %}
Double-click the **Apache Kafka** Sink.
{% endstep %}

{% step %}
Enter a **Name** for the new Data Sink. Optionally, add a **Description** and some **Tags** to identify the Sink.
{% endstep %}

{% step %}
Decide whether or not to include this Data sink info in the metrics and graphs of the [**Home**](/the-workspace/home.md) area.
{% endstep %}

{% step %}
**Kafka Brokers**

The broker hosts the partitions and handles incoming requests. Find this in the **Instances** area. The format is `Kafka installed Machine HostName:Port Number.`

* **Host**<mark style="color:red;">**\***</mark>
* **Port**<mark style="color:red;">**\***</mark>

Click **Add element** to add the required number of brokers.
{% endstep %}

{% step %}
In the **Authentication** section, if needed, here is where you enter your authentication parameters (either **SASL** or **SSL/TLS**). Otherwise, select **None**.

<details>

<summary><strong>SSL/TLS</strong></summary>

* **Minimum TLS version** - Choose your minimum TLS version.
* **Certificate** - Add your TLS certificate from your [Secrets](/administration/global-settings/organization-settings/secrets-management.md) or create one.
* **Private Key** - Add your private key from your [Secrets](/administration/global-settings/organization-settings/secrets-management.md) or create one.
* **Skip TLS Verification** - Toggle on to skip TLS verification.
* **CA Chain** - Add your CA chain from your [Secrets](/administration/global-settings/organization-settings/secrets-management.md) or create one.

</details>

<details>

<summary><strong>SASL</strong></summary>

* **Mechanism**<mark style="color:red;">**\***</mark> - SASL provides different mechanisms (methods) for authentication. Choose between **PLAIN**, **SCRAM-SHA-256**, **SCRAM-SHA-512**, **OAUTHBEARER**, and **GSSAPI**.
* **saslUsername** - Enter your SASL name.
* **saslPassword** - Add your password from your [Secrets](/administration/global-settings/organization-settings/secrets-management.md) or create one.

</details>
{% endstep %}

{% step %}
Click **New secret** to create a new one:

* Give the secret a **Name**.
* Turn off the **Expiration date** option.
* Click **Add new value** and paste the secret corresponding to the JWT token you generated before. Remember that the token will be added in the Zscaler configuration.
* Click **Save**.

<figure><picture><source srcset="/files/NeeWsSQzoChVxRIY76Nt" media="(prefers-color-scheme: dark)"><img src="/files/1oTccyPmgZJ1laY7IhZH" alt=""></picture><figcaption></figcaption></figure>

{% hint style="info" %}
Learn more about secrets in Onum in [this article](/administration/global-settings/organization-settings/secrets-management.md).
{% endhint %}
{% endstep %}

{% step %}
Configure your **Topic & Partition:**

* **Kafka topic**<mark style="color:red;">**\***</mark>

  Use `kafka-topics --broker :9092 --describe` and write the result here.
* The **Partition** is the subdivision of the topic, designed to allow parallelism, scalability, and fault tolerance in message processing. This is where it is contained within the specified broker. Enter `-1` for automatic selection.
  * &#x20;`0` - \[msg1, msg4, msg7]
  * &#x20;`1` **-** \[msg2, msg5, msg8]
    {% endstep %}

{% step %}
The **Acknowledgment** (acks) setting controls how producers handle message durability and delivery confirmation from brokers. It defines how many replicas must confirm the write before the producer considers it successful.&#x20;

* **All** - Full replication. The leader waits for all in-sync replicas (ISR) to acknowledge before confirming.&#x20;
* **-1** - This is the same as **All**.
* **0** - No acknowledgmen&#x74;**.** The producer sends data but does **not wait** for confirmation.
* **1** - Leader acknowledgment. The leader broker confirms receipt **without waiting** for replicas.
  {% endstep %}

{% step %}
**Batch configuration**

Toggle on this switch button to enable batch settings. The values will be filled in by default; however, you can change them if needed.

<table><thead><tr><th width="169.2265625">Parameter</th><th>Description</th></tr></thead><tbody><tr><td><strong>Batch size</strong> </td><td>The batch size (in bytes) to allow through. The default value is <code>1000000</code>.</td></tr><tr><td><strong>Batch number of messages</strong></td><td>The number of messages to allow through. The default value is <code>10000</code>.</td></tr><tr><td><strong>Queue max number of messages</strong></td><td>Enter how many events to collect until considered full. The default value is <code>100000</code>.</td></tr><tr><td><strong>Queue max size</strong></td><td>Enter the maximum number of events in KB to collect until considered full. The default value is <code>102400</code>.</td></tr><tr><td><strong>Linger</strong></td><td>Enter the number of seconds to wait before considering the batch completed. The default value is <code>5</code>.</td></tr><tr><td><strong>Max in-flight requests per connection</strong></td><td>Controls how many unacknowledged produce requests can be sent at the same time on a single TCP connection between a Kafka producer and a broker. The default value is <code>1000000</code>.</td></tr><tr><td><strong>Delivery timeout</strong></td><td>The maximum amount of time to wait for a message to be successfully sent before it is marked as failed. The default value is <code>300000</code>.</td></tr><tr><td><strong>Request timeout</strong></td><td>The number of milliseconds to wait before considering the batch request a timeout. The default value is <code>30000</code>.</td></tr></tbody></table>
{% endstep %}

{% step %}
**Use compression**

Toggle on this switch if you need to compress your data and choose the required type between **Gzip**, **Snappy**, **LZ4**, or **Zstd**.
{% endstep %}

{% step %}
Enter the number of times to retry the server before deeming the request as timed out. The default value is `3`, and the minimum value is `0`.
{% endstep %}
{% endstepper %}

Click **Create data sink** when complete.&#x20;

Your new Data sink will appear in the **Data sinks** area list.

## Pipeline configuration

When it comes to using this Data sink in a [Pipeline](/the-workspace/pipelines.md), you must configure the following output parameters. To do it, simply click the Data sink on the canvas and select **Configuration**.

### Output configuration

<table><thead><tr><th width="170.01953125">Parameter</th><th>Description</th></tr></thead><tbody><tr><td><strong>Message</strong><mark style="color:red;"><strong>*</strong></mark></td><td>Choose the field containing the message to send.</td></tr></tbody></table>

#### Advanced configuration

<table><thead><tr><th width="170.03515625">Parameter</th><th>Description</th></tr></thead><tbody><tr><td><strong>Message key</strong></td><td>If the message has a key, choose the field containing it here.</td></tr></tbody></table>

Click **Save** to save your configuration.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.onum.com/the-workspace/data-sinks/data-sink-integrations/send-data-to-apache-kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
