Kafka Receiver
core, contrib
Maintainers: @pavolloffay, @MovieStoreGuy, @axw, @paulojmdias
Source: opentelemetry-collector-contrib
Supported Telemetry
Overview
Getting Started
[!NOTE] The Kafka receiver uses theThere are no required settings. The following settings can be optionally configured:franz-goclient library, which provides better performance and support for modern Kafka features. Thefranz-goclient supports directly consuming from multiple topics by specifying a regex expression. To enable this feature, prefix your topic with the^character. This is identical to how thelibrdkafkaclient works. If you use the^prefix, in the deprecatedtopicsetting, if any of the topics have the^prefix, regex consuming will be enabled.
brokers(default = localhost:9092): The list of kafka brokers.protocol_version(default = 2.1.0): Kafka protocol version.resolve_canonical_bootstrap_servers_only(default = false): Whether to resolve then reverse-lookup broker IPs during startuplogstopic(Deprecated [v0.142.0]: usetopics) (default = otlp_logs): If this is set, it will take precedence over default value oftopicstopics(default = otlp_logs): List of kafka topics from which to consume logsencoding(default = otlp_proto): The encoding for the Kafka topic. See Supported encodings.exclude_topic(Deprecated [v0.142.0]: useexclude_topics) (default = ""): If this is set, it will take precedence over default value ofexclude_topicsexclude_topics(default = ""): When using regex topic patterns (prefix with^), this regex pattern excludes matching topics.
metricstopic(Deprecated [v0.142.0]: usetopics) (default = otlp_metrics): If this is set, it will take precedence over default value oftopicstopics(default = otlp_metrics): List of Kafka topic from which to consume metrics.encoding(default = otlp_proto): The encoding for the Kafka topic. See Supported encodings.exclude_topic(Deprecated [v0.142.0]: useexclude_topics) (default = ""): If this is set, it will take precedence over default value ofexclude_topicsexclude_topics(default = ""): When using regex topic patterns (prefix with^), this regex pattern excludes matching topics.
tracestopic(Deprecated [v0.142.0]: usetopics) (default = otlp_spans): If this is set, it will take precedence over default value oftopicstopics(default = otlp_spans): List of Kafka topic from which to consume traces.encoding(default = otlp_proto): The encoding for the Kafka topic. See Supported encodings.exclude_topic(Deprecated [v0.142.0]: useexclude_topics) (default = ""): If this is set, it will take precedence over default value ofexclude_topicsexclude_topics(default = ""): When using regex topic patterns (prefix with^), this regex pattern excludes matching topics.
profilestopic(Deprecated [v0.142.0]: usetopics) (default = otlp_profiles): If this is set, it will take precedence over default value oftopicstopics(default = otlp_profiles): List of Kafka topic from which to consume profiles.encoding(default = otlp_proto): The encoding for the Kafka topic. See Supported encodings.exclude_topic(Deprecated [v0.142.0]: useexclude_topics) (default = ""): If this is set, it will take precedence over default value ofexclude_topicsexclude_topics(default = ""): When using regex topic patterns (prefix with^), this regex pattern excludes matching topics.
group_id(default = otel-collector): The consumer group that receiver will be consuming messages fromclient_id(default = otel-collector): The consumer client ID that receiver will userack_id(default = ""): The rack identifier for this client. When set and brokers are configured with a rack-aware replica selector, the client will prefer fetching from the closest replica.use_leader_epoch(default = true): (Experimental) When enabled, the consumer uses the leader epoch returned by brokers (KIP-320) to detect log truncation. Setting this to false clears the leader epoch from fetch offsets, disabling KIP-320. Disabling can improve compatibility with brokers that don’t fully support leader epochs (e.g., Azure Event Hubs), at the cost of losing automatic log-truncation safety.conn_idle_timeout(default =9m): The time after which idle connections to Kafka brokers are not reused and may be closed.initial_offset(default = latest): The initial offset to use if no offset was previously committed. Must belatestorearliest.session_timeout(default =10s): The request timeout for detecting client failures when using Kafka’s group management facilities.heartbeat_interval(default =3s): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.group_rebalance_strategy(default =cooperative-sticky): This strategy is used to assign partitions to consumers within a consumer group. This setting determines how Kafka distributes topic partitions among the consumers in the group during rebalances. Supported strategies are:range: This strategy assigns partitions to consumers based on a range. It aims to distribute partitions evenly across consumers, but it can lead to uneven distribution if the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RangeAssignor documentation, see RangeAssignor.roundrobin: This strategy assigns partitions to consumers in a round-robin fashion. It ensures a more even distribution of partitions across consumers, especially when the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RoundRobinAssignor documentation, see RoundRobinAssignor.sticky: This strategy aims to maintain the same partition assignments during rebalances as much as possible. It minimizes the number of partition movements, which can be beneficial for stateful consumers. For more information, refer to the Kafka StickyAssignor documentation, see StickyAssignor.cooperative-sticky: This strategy is similar tosticky, but it supports cooperative rebalancing. It allows consumers to incrementally adjust their partition assignments without requiring a full rebalance, which can reduce downtime during rebalances. For more information, refer to the Kafka CooperativeStickyAssignor documentation, see CooperativeStickyAssignor.
group_instance_id: A unique identifier for the consumer instance within a consumer group.- If set to a non-empty string, the consumer is treated as a static member of the group. This means that the consumer will maintain its partition assignments across restarts and rebalances, as long as it rejoins the group with the same
group_instance_id. - If set to an empty string (or not set), the consumer is treated as a dynamic member. In this case, the consumer’s partition assignments may change during rebalances.
- Using a
group_instance_idis useful for stateful consumers or when you need to ensure that a specific consumer instance is always assigned the same set of partitions.
- If set to a non-empty string, the consumer is treated as a static member of the group. This means that the consumer will maintain its partition assignments across restarts and rebalances, as long as it rejoins the group with the same
min_fetch_size(default =1): The minimum number of message bytes to fetch in a request, defaults to 1 byte.max_fetch_size(default =1048576): The maximum number of message bytes to fetch in a request, defaults to 1MB. Must be greater than or equal tomin_fetch_size.max_fetch_wait(default =250ms): The maximum amount of time the broker should wait formin_fetch_sizebytes to be available before returning anyway.max_partition_fetch_size(default =1048576): The default number of message bytes to fetch in a request per partition, defaults to 1MB. If a single record batch is larger than this value, the broker will still return it to ensure the consumer can make progress.tls: see TLS Configuration Settings for the full set of available options.authplain_text(Deprecated in v0.123.0: use sasl with mechanism set to PLAIN instead.)username: The username to use.password: The password to use
saslusername: The username to use.password: The password to use.mechanism: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM_OAUTHBEARER, or PLAIN)aws_mskregion: AWS Region in case of AWS_MSK_IAM_OAUTHBEARER mechanism
tls(Deprecated in v0.124.0: configure tls at the top level): this is an alias for tls at the top level.kerberosservice_name: Kerberos service namerealm: Kerberos realmuse_keytab: Use of keytab instead of password, if this is true, keytab file will be used instead of passwordusername: The Kerberos username used for authenticate with KDCpassword: The Kerberos password used for authenticate with KDCconfig_file: Path to Kerberos configuration. i.e /etc/krb5.confkeytab_file: Path to keytab file. i.e /etc/security/kafka.keytabdisable_fast_negotiation: Disable PA-FX-FAST negotiation (Pre-Authentication Framework - Fast). Some common Kerberos implementations do not support PA-FX-FAST negotiation. This is set tofalseby default.
metadatafull(default = true): Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.refresh_interval(default = 10m): The refreshInterval controls the frequency at which cluster metadata is refreshed in the background.retrymax(default = 3): The number of retries to get metadatabackoff(default = 250ms): How long to wait between metadata retries
autocommitenable: (default = true) Whether or not to auto-commit updated offsets back to the brokerinterval: (default = 1s) How frequently to commit updated offsets. Ineffective unless auto-commit is enabled
message_marking:after: (default = false) If true, the messages are marked after the pipeline executionon_error: (default = false) If false, only the successfully processed messages are marked. This applies to non-permanent errors. Note: this can block the entire partition in case a message processing returns a non-permanent erroron_permanent_error: (default = value ofon_error) If false, messages that generate permanent errors are not marked. If true, messages that generate permanent errors are marked. Note: this can block the entire partition in case a message processing returns a permanent error
header_extraction:extract_headers(default = false): Allows user to attach header fields to resource attributes in otel pipelineheaders(default = []): List of headers they’d like to extract from kafka record. Note: Matching pattern will beexact. Regexes are not supported as of now.
error_backoff: BackOff configuration in case of errorsenabled: (default = false) Whether to enable backoff when next consumers return errorsinitial_interval: The time to wait after the first error before retryingmax_interval: The upper bound on backoff interval between consecutive retriesmultiplier: The value multiplied by the backoff interval boundsrandomization_factor: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor)max_elapsed_time: The maximum amount of time trying to backoff before giving up. If set to 0, the retries are never stopped.
telemetrymetricskafka_receiver_records_delay:enabled(default = false) Whether the metric kafka_receiver_records_delay will be reported or not.
Supported encodings
The Kafka receiver supports encoding extensions, as well as the following built-in encodings. Available for all signals:otlp_proto: the payload is decoded as OTLP Protobufotlp_json: the payload is decoded as OTLP JSON
jaeger_proto: the payload is deserialized to a single Jaeger protoSpan.jaeger_json: the payload is deserialized to a single Jaeger JSON Span usingjsonpb.zipkin_proto: the payload is deserialized into a list of Zipkin proto spans.zipkin_json: the payload is deserialized into a list of Zipkin V2 JSON spans.zipkin_thrift: the payload is deserialized into a list of Zipkin Thrift spans.
raw: the payload’s bytes are inserted as the body of a log record.text: the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can usetext_<ENCODING>, liketext_utf-8,text_shift_jis, etc., to customize this behavior.json: the payload is decoded as JSON and inserted as the body of a log record.azure_resource_logs(Deprecated [v0.149.0]: useazureencodingextension): the payload is converted from Azure Resource Logs format to OTel format.
Message metadata propagation
The Kafka receiver includes the following record metadata as request metadata (context) for each consumed message:kafka.topic: the topic the message was consumed fromkafka.partition: the partition the message was consumed fromkafka.offset: the offset of the message within the partition
Example configurations
Minimal configuration
By default, the receiver does not require any configuration. With the following configuration, the receiver will consume messages from the default topics from localhost:9092 using theotlp_proto encoding:
TLS and authentication
In this example the receiver is configured to connect to Kafka using TLS for encryption, and SASL/SCRAM for authentication:Header extraction
In addition to propagating Kafka message metadata as described above in Message metadata propagation, the Kafka receiver can also be configured to extract and attach specific headers as resource attributes. e.g.Regex topic patterns with exclusions
When using thefranz-go client, you can consume from multiple topics using regex patterns
and exclude specific topics from consumption. This is useful when you want to consume from
a dynamic set of topics but need to filter out certain ones.
Note: Both topic and exclude_topic must use regex patterns (prefix with ^) for
exclusion to work. This feature is only available with the franz-go client.
- For logs: the receiver will consume from topics like
logs-prod,logs-staging,logs-appbut will excludelogs-testandlogs-dev - For metrics: the receiver will consume from topics like
metrics-app,metrics-infrabut will exclude any topics starting withmetrics-internal-
Attributes
| Attribute Name | Description | Type | Values |
|---|---|---|---|
node_id | The Kafka node ID. | int | |
outcome | The operation outcome. | string | success, failure |
partition | The Kafka topic partition. | int | |
topic | The Kafka topic. | string |
Configuration
Example Configuration
Last generated: 2026-04-13