• /
  • EnglishEspañolFrançais日本語한국어Português
  • Inicia sesiónComenzar ahora

Te ofrecemos esta traducción automática para facilitar la lectura.

En caso de que haya discrepancias entre la versión en inglés y la versión traducida, se entiende que prevalece la versión en inglés. Visita esta página para obtener más información.

Crea una propuesta

Monitorea Kafka autohospedado con OpenTelemetry

Monitoree su clúster de Apache Kafka autohospedado instalando el OpenTelemetry Collector directamente en hosts Linux. Elija entre el enfoque del agente de Java de OpenTelemetry o el de Prometheus JMX Exporter para recopilar métricas de JMX de sus brokers de Kafka.

Arquitectura

New Relic admite dos enfoques para el monitoreo de Kafka autohospedado: el agente de Java de OpenTelemetry o el Prometheus JMX Exporter. El siguiente diagrama ilustra el flujo de datos para cada enfoque.

Self-hosted Kafka monitoring architecture

Pasos de instalación

Siga estos pasos para configurar un monitoreo integral de Kafka instalando el agente de Java de OpenTelemetry en sus brokers y desplegando un recolector para recopilar y enviar métricas y logs a New Relic.

Antes de que empieces

Asegúrese de tener:

  • Una cuenta de New Relic con un
  • Acceso de red desde el colector al puerto del servidor de arranque de Kafka (típicamente 9092)

Crear configuración del colector

Cree la configuración principal de OpenTelemetry Collector en ~/opentelemetry/collector-kafka-config.yaml en un host de monitoreo.

receivers:
# OTLP receiver for Kafka and JMX metrics from Java agents and application telemetry
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
# Kafka metrics receiver for cluster-level metrics
kafkametrics:
brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES}
protocol_version: 2.0.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
# Exclude internal Kafka topics (prefixed with __) at the source
topic_match: "^[^_].*$"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
processors:
batch/aggregation:
send_batch_size: 1024
timeout: 30s
resourcedetection:
detectors: [env, ec2, system]
system:
resource_attributes:
host.name:
enabled: true
host.id:
enabled: true
resource:
attributes:
- action: insert
key: kafka.cluster.name
value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id:
metric_statements:
# Remove broker.id for cluster-level metrics — these represent the whole cluster,
# not a specific broker. broker.id is retained on broker-level metrics pipelines.
- context: resource
statements:
- delete_key(attributes, "broker.id")
transform/remove_extra_attributes:
metric_statements:
- context: resource
statements:
# Delete all attributes starting with "process."
- delete_matching_keys(attributes, "^process\\..*")
# Delete all attributes starting with "telemetry."
- delete_matching_keys(attributes, "^telemetry\\..*")
- delete_key(attributes, "host.arch")
- delete_key(attributes, "os.description")
- delete_key(attributes, "host.image.id")
- delete_key(attributes, "host.type")
- delete_matching_keys(attributes, "^cloud\\..*")
- delete_key(attributes, "service.instance.id") where IsMatch(attributes["service.name"], "^unknown_service:")
- delete_key(attributes, "service.name") where IsMatch(attributes["service.name"], "^unknown_service:")
# Filter internal Kafka topics as a safety net (kafkametrics topic_match handles the receiver side)
filter/internal_topics:
metrics:
datapoint:
- 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
transform/des_units:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
- include: kafka.partition.replicas
action: insert
new_name: kafka.partition.replicas.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
filter/remove_partition_level_replicas:
metrics:
exclude:
match_type: strict
metric_names:
- kafka.partition.replicas_in_sync
groupbyattrs/cluster:
keys: [kafka.cluster.name]
metricstransform/cluster_max:
transforms:
- include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count"
match_type: regexp
action: update
operations:
- action: aggregate_labels
aggregation_type: max
label_set: []
exporters:
otlp/newrelic:
endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT}
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
compression: gzip
timeout: 30s
service:
pipelines:
# Broker metrics pipeline (excludes cluster-level metrics)
metrics/broker:
receivers: [otlp, kafkametrics]
processors:
- resourcedetection
- resource
- filter/exclude_cluster_metrics
- filter/internal_topics
- transform/remove_extra_attributes
- transform/des_units
- cumulativetodelta
- metricstransform/kafka_topic_sum_aggregation
- filter/remove_partition_level_replicas
- batch/aggregation
exporters: [otlp/newrelic]
# Cluster metrics pipeline (controller-emitted metrics like offline partitions, topic/partition counts — no broker.id)
metrics/cluster:
receivers: [otlp]
processors:
- resourcedetection
- resource
- filter/include_cluster_metrics
- transform/remove_broker_id
- transform/remove_extra_attributes
- transform/des_units
- cumulativetodelta
- groupbyattrs/cluster
- metricstransform/cluster_max
- batch/aggregation
exporters: [otlp/newrelic]
# APM traces pipeline (producer + consumer spans via OTel Java agent)
traces/apps:
receivers: [otlp]
processors: [resourcedetection, resource, batch/aggregation]
exporters: [otlp/newrelic]
# APM logs pipeline (producer + consumer logs via OTel Java agent)
logs/apps:
receivers: [otlp]
processors: [resourcedetection, resource, batch/aggregation]
exporters: [otlp/newrelic]

Establecer variables de entorno

Configure las variables de entorno requeridas en el host de monitoreo antes de instalar el recolector:

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"
$
export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region

Parámetro de configuración

La siguiente tabla describe los parámetros de configuración clave:

VariableDescripción
NEW_RELIC_LICENSE_KEYSu clave de licencia de New Relic, por ejemplo YOUR_LICENSE_KEY
KAFKA_CLUSTER_NAMEUn nombre único para su clúster de Kafka, por ejemplo my-kafka-cluster
KAFKA_BOOTSTRAP_BROKER_ADDRESSESLas direcciones de su broker de arranque de Kafka, por ejemplo broker1-host:9092,broker2-host:9092,broker3-host:9092
NEW_RELIC_OTLP_ENDPOINTExtremo de ingesta de OTLP. Use https://otlp.nr-data.net:4317 para la región de EE. UU. o https://otlp.eu01.nr-data.net:4317 para la región de la UE. Para otras configuraciones, consulte Configure su extremo OTLP.

Instale e inicie el recopilador

Instale y ejecute el recolector en el host de monitoreo. Elija entre NRDOT Collector (distribución de New Relic) u OpenTelemetry Collector:

Sugerencia

NRDOT Collector es la distribución de New Relic de OpenTelemetry Collector con soporte de New Relic para asistencia.

Paso 1. Descargue e instale el binario

Descargue e instale el binario de NRDOT Collector para su sistema operativo host. El siguiente ejemplo es para la arquitectura linux_amd64:

bash
$
# Set version and architecture
$
NRDOT_VERSION="1.9.0"
$
ARCH="amd64" # or arm64
$
$
# Download and extract
$
curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \
>
--location --output collector.tar.gz
$
tar -xzf collector.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv nrdot-collector /usr/local/bin/
$
$
# Verify installation
$
nrdot-collector --version

Importante

Para otros sistemas operativos y arquitecturas, visita lanzamientos de NRDOT Collector y descarga el binario adecuado para tu sistema.

Paso 2. Inicie el recolector

Ejecute el colector con su archivo de configuración para comenzar a monitorear:

bash
$
nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml

El recolector ahora está en ejecución y listo para recibir datos. Complete los pasos restantes para adjuntar el agente de Java a sus brokers de Kafka antes de que las métricas aparezcan en New Relic.

Paso 1. Descargue e instale el binario

Descargue e instale el binario de OpenTelemetry Collector Contrib para su sistema operativo host. El siguiente ejemplo es para la arquitectura linux_amd64:

bash
$
# Set version and architecture
$
# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version
$
OTEL_VERSION="<collector_version>"
$
ARCH="amd64"
$
$
# Download the collector
$
curl -L -o otelcol-contrib.tar.gz \
>
"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"
$
$
# Extract the binary
$
tar -xzf otelcol-contrib.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv otelcol-contrib /usr/local/bin/
$
$
# Verify installation
$
otelcol-contrib --version

Para otros sistemas operativos, visite la página de lanzamientos de OpenTelemetry Collector.

Paso 2. Inicie el recolector

Ejecute el colector con su archivo de configuración para comenzar a monitorear:

bash
$
otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml

El recolector ahora está en ejecución y listo para recibir datos. Complete los pasos restantes para adjuntar el agente de Java a sus brokers de Kafka antes de que las métricas aparezcan en New Relic.

Descargar el agente de Java de OpenTelemetry

Importante

Asegúrese de que su OpenTelemetry Collector esté en ejecución antes de (re)iniciar los brokers de Kafka con el agente de Java adjunto. El agente comienza a enviar métricas inmediatamente al iniciar el broker, por lo que el recolector debe estar disponible para recibirlas.

El agente de Java de OpenTelemetry se ejecuta como un agente de Java adjunto a sus brokers de Kafka, recopilando métricas de Kafka y JMX y enviándolas al recolector mediante OTLP:

bash
$
# Create directory for OpenTelemetry components
$
mkdir -p ~/opentelemetry
$
$
# Download OpenTelemetry Java agent
$
curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \
>
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar

Crear configuración personalizada de JMX

Crea un archivo de configuración JMX del agente de Java de OpenTelemetry para recopilar métricas de Kafka de los MBeans de JMX.

Cree el archivo ~/opentelemetry/kafka-jmx-config.yaml en cada host del broker con la siguiente configuración:

---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
mapping:
Count:
metric: kafka.message.count
type: counter
desc: The number of messages received by the broker
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.count
type: &type counter
desc: &desc The number of requests received by the broker
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.failed
type: &type counter
desc: &desc The number of requests to the broker resulting in a failure
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
99thPercentile:
metric: kafka.request.time.99p
type: gauge
desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize
mapping:
Value:
metric: kafka.request.queue
type: gauge
desc: Size of the request queue
unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
metricAttribute:
direction: const(in)
mapping:
Count:
metric: &metric kafka.network.io
type: &type counter
desc: &desc The bytes received or sent by the broker
unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
metricAttribute:
direction: const(out)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch
metricAttribute:
type: param(delayedOperation)
mapping:
Value:
metric: kafka.purgatory.size
type: gauge
desc: The number of requests waiting in purgatory
unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount
mapping:
Value:
metric: kafka.partition.count
type: gauge
desc: The number of partitions on the broker
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount
mapping:
Value:
metric: kafka.partition.offline
type: gauge
desc: The number of partitions offline
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
mapping:
Value:
metric: kafka.partition.under_replicated
type: gauge
desc: The number of under replicated partitions
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
metricAttribute:
operation: const(shrink)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
metricAttribute:
operation: const(expand)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
mapping:
Value:
metric: kafka.max.lag
type: gauge
desc: The max lag in messages between follower and leader replicas
unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount
mapping:
Value:
metric: kafka.controller.active.count
type: gauge
desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.
unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs
mapping:
Count:
metric: kafka.leader.election.rate
type: counter
desc: The leader election count
unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
mapping:
Count:
metric: kafka.unclean.election.rate
type: counter
desc: Unclean leader election count - increasing indicates broker failures
unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
# Request latency: total count, 50th percentile, and average (99p kept above)
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
Count:
metric: kafka.request.time.total
type: counter
desc: The total time the broker has taken to service requests
50thPercentile:
metric: kafka.request.time.50p
type: gauge
desc: The 50th percentile time the broker has taken to service requests
Mean:
metric: kafka.request.time.avg
type: gauge
desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
unit: ms
type: gauge
prefix: kafka.logs.flush.
mapping:
Count:
metric: count
unit: '{flush}'
type: counter
desc: Log flush count
50thPercentile:
metric: time.50p
desc: Log flush time - 50th percentile
99thPercentile:
metric: time.99p
desc: Log flush time - 99th percentile
# GC elapsed time (cumulative collection time in ms)
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
# JVM class loading
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
# JVM heap committed (in addition to heap.used and heap.max)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: Committed heap memory
type: gauge
# Additional JVM CPU and system metrics
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
# JVM memory pool breakdown (by generation: G1 Old Gen, Eden, Survivor, etc.)
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

Configurar el broker de Kafka

Adjunte el agente de Java de OpenTelemetry a su broker de Kafka configurando la variable de entorno KAFKA_OPTS antes de iniciar Kafka.

Ejemplo de broker único:

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"
$
$
nohup env KAFKA_OPTS="-javaagent:$OTEL_AGENT \
>
-Dotel.jmx.enabled=true \
>
-Dotel.jmx.config=$JMX_CONFIG \
>
-Dotel.resource.attributes=broker.id=1,kafka.cluster.name=my-kafka-cluster \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol=grpc \
>
-Dotel.metrics.exporter=otlp \
>
-Dotel.logs.exporter=otlp \
>
-Dotel.instrumentation.runtime-telemetry.enabled=false \
>
-Dotel.metric.export.interval=30000" \
>
bin/kafka-server-start.sh config/server.properties &

Importante

Clústeres de múltiples brokers: Para múltiples brokers, utilice la misma configuración con valores broker.id únicos (p. ej., broker.id=1, broker.id=2, broker.id=3) en el parámetro -Dotel.resource.attributes para cada broker.

Sugerencia

Los logs del broker se habilitan automáticamente con el flag -Dotel.logs.exporter=otlp anterior. Para deshabilitar la recopilación de logs del broker, configure -Dotel.logs.exporter=none en su lugar.

Parámetro de configuración

La siguiente tabla describe los parámetros de configuración clave:

ParámetroDescripción
otlp.endpointReemplace con la IP o el nombre de host del host que ejecuta su OpenTelemetry Collector, por ejemplo http://collector-host-ip:4317
broker.idReemplace 1 con el ID de broker único para cada broker, por ejemplo, broker.id=1, broker.id=2, broker.id=3
kafka.cluster.nameReemplace my-kafka-cluster con el nombre de su clúster Kafka. Debe coincidir con el valor establecido en la configuración del recolector.
logs.exporterHabilita la recopilación de logs del broker cuando se configura en otlp. Establézcalo en none para deshabilitar el reenvío de logs del broker.

Para ver las opciones de configuración completas, consulte la guía de configuración del agente de Java.

(Opcional) Instrumentar aplicaciones productoras o consumidoras

Importante

Soporte de lenguajes: actualmente, solo se admiten aplicaciones Java para la instrumentación de clientes Kafka mediante el agente de Java de OpenTelemetry.

Para recopilar telemetría a nivel de aplicación de sus aplicaciones de productor y consumidor de Kafka, descargue el agente de Java de OpenTelemetry desde el paso Descargar el agente de Java de OpenTelemetry anterior.

Inicie su aplicación con el agente:

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
$
java \
>
-javaagent:$OTEL_AGENT \
>
-Dotel.service.name="order-process-service" \
>
-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol="grpc" \
>
-Dotel.metrics.exporter="otlp" \
>
-Dotel.traces.exporter="otlp" \
>
-Dotel.logs.exporter="otlp" \
>
-Dotel.instrumentation.kafka.experimental-span-attributes="true" \
>
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
>
-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
>
-Dotel.instrumentation.kafka.enabled="true" \
>
-Dotel.instrumentation.runtime-telemetry.enabled="false" \
>
-jar your-kafka-application.jar

Parámetro de configuración

La siguiente tabla describe los parámetros de configuración clave:

ParámetroDescripción
service.nameReemplace con un nombre único para su aplicación de productor o consumidor, por ejemplo order-process-service
kafka.cluster.nameReemplace con el mismo nombre de clúster utilizado en la configuración de su recolector, por ejemplo my-kafka-cluster
otlp.endpointReemplace con el nombre de host o la IP del host que ejecuta su OpenTelemetry Collector, por ejemplo http://collector-host-ip:4317

Sugerencia

La configuración anterior envía telemetría a un OpenTelemetry Collector que se ejecuta en collector-host-ip:4317. Si desea un recolector independiente dedicado a la telemetría de la aplicación, cree uno con la siguiente configuración:

El agente de Java proporciona instrumentación de Kafka lista para usar sin cambios de código, capturando la latencia de las solicitudes, las métricas de rendimiento, la tasa de errores y el rastreo distribuido.

Para una configuración avanzada, consulte la documentación de instrumentación de Kafka.

Siga estos pasos para configurar un monitoreo integral de Kafka instalando el Prometheus JMX Exporter en sus brokers y desplegando un recolector para recopilar y enviar métricas a New Relic.

Antes de que empieces

Asegúrese de tener:

  • Una cuenta de New Relic con un
  • Acceso a la red desde el host del recolector a cada broker en el puerto 9404
  • Acceso de red desde el recolector al puerto de arranque de Kafka (típicamente 9092)

Descarga el expositor JMX de Prometheus

Descargue el JAR de Prometheus JMX Exporter en cada host del broker de Kafka:

bash
$
# Create directory for Prometheus components
$
mkdir -p ~/opentelemetry
$
$
# Download the Prometheus JMX Exporter agent JAR
$
# Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases.
$
JMX_EXPORTER_VERSION="1.5.0"
$
curl -L -o ~/opentelemetry/jmx_prometheus_javaagent.jar \
>
"https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar"

Crear configuración de métricas JMX

Cree el archivo de configuración de JMX Exporter que defina qué métricas de Kafka recolectar. Guarde como ~/opentelemetry/kafka-jmx-config.yaml en cada host del broker:

startDelaySeconds: 0
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Cluster-level controller metrics
- pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value'
name: kafka_cluster_topic_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value'
name: kafka_cluster_partition_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value'
name: kafka_broker_fenced_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value'
name: kafka_partition_non_preferred_leader
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
name: kafka_partition_offline
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
name: kafka_controller_active_count
type: GAUGE
# Broker-level replica metrics
- pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value'
name: kafka_partition_under_min_isr
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value'
name: kafka_broker_leader_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value'
name: kafka_partition_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
name: kafka_partition_under_replicated
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value'
name: kafka_max_lag
type: GAUGE
# Broker topic metrics (totals)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count'
name: kafka_message_count
type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "out"
# Per-topic metrics (only appear after traffic flows)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count'
name: kafka_prod_msg_count
type: COUNTER
labels:
topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "out"
# Request metrics
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile'
name: kafka_request_time_99p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value'
name: kafka_request_queue
type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value'
name: kafka_purgatory_size
type: GAUGE
labels:
type: "$1"
# Controller stats
- pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count'
name: kafka_leader_election_rate
type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count'
name: kafka_unclean_election_rate
type: COUNTER
# JVM Garbage Collection
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount'
name: jvm_gc_collections_count
type: COUNTER
labels:
name: "$1"
# JVM Memory
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>max'
name: jvm_memory_heap_max
type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used'
name: jvm_memory_heap_used
type: GAUGE
# JVM Threading and System
- pattern: 'java.lang<type=Threading><>ThreadCount'
name: jvm_thread_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad'
name: jvm_system_cpu_utilization
type: GAUGE
# Broker uptime
- pattern: 'java.lang<type=Runtime><>Uptime'
name: kafka_broker_uptime
type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above)
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count'
name: kafka_request_time_total
type: COUNTER
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile'
name: kafka_request_time_50p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean'
name: kafka_request_time_avg
type: GAUGE
labels:
type: "$1"
# Log flush metrics
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count'
name: kafka_logs_flush_count
type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile'
name: kafka_logs_flush_time_50p
type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile'
name: kafka_logs_flush_time_99p
type: GAUGE
# JVM GC elapsed time
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime'
name: jvm_gc_collections_elapsed
type: COUNTER
labels:
name: "$1"
# JVM Memory heap committed
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed'
name: jvm_memory_heap_committed
type: GAUGE
# JVM class loading
- pattern: 'java.lang<type=ClassLoading><>LoadedClassCount'
name: jvm_class_count
type: GAUGE
# Additional JVM OS metrics
- pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage'
name: jvm_system_cpu_load_1m
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors'
name: jvm_cpu_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad'
name: jvm_cpu_recent_utilization
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount'
name: jvm_file_descriptor_count
type: GAUGE
# JVM Memory Pool
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used'
name: jvm_memory_pool_used
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max'
name: jvm_memory_pool_max
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used'
name: jvm_memory_pool_used_after_last_gc
type: GAUGE
labels:
name: "$1"

Sugerencia

Personalizar métricas: puede agregar o modificar patrones consultando los ejemplos de Prometheus JMX Exporter y la documentación de Kafka MBean. Consulte la documentación de reglas de JMX Exporter para configuración adicional.

Configurar los brokers de Kafka para utilizar JMX Exporter

Adjunte el Prometheus JMX Exporter como agente de Java a cada broker de Kafka agregándolo a sus opciones de inicio de Kafka.

Ejemplo de broker único:

bash
$
JMX_JAR="$HOME/opentelemetry/jmx_prometheus_javaagent.jar"
$
JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"
$
$
nohup env KAFKA_OPTS="-javaagent:${JMX_JAR}=9404:${JMX_CONFIG}" \
>
bin/kafka-server-start.sh config/server.properties &

Cada broker expondrá ahora métricas de Prometheus en el puerto 9404. Verificar:

bash
$
curl http://localhost:9404/metrics | grep kafka_

Importante

Clústeres de múltiples brokers: aplique la misma configuración KAFKA_OPTS a cada broker. Cada broker expone métricas en el puerto 9404 desde la IP de su propio host.

Crear configuración del colector

Cree la configuración de OpenTelemetry Collector en ~/opentelemetry/collector-kafka-config.yaml en un host de monitoreo.

El receptor de Prometheus raspa todos los extremos del broker. El recolector escucha en 0.0.0.0:4317 en busca de cualquier dato de OTLP (trazas de aplicación, registros) además de extraer datos de los extremos de Prometheus.

receivers:
# OTLP receiver for application traces, metrics, and logs (listens on port 4317)
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
# Prometheus receiver scrapes JMX metrics from Kafka brokers
prometheus/kafka-jmx:
config:
scrape_configs:
- job_name: 'kafka-jmx-metrics'
metrics_path: /metrics
scrape_interval: 30s
static_configs:
# TODO: Replace each target with your broker hostname or IP, and set a unique broker.id per broker
- targets: ['broker1-host:9404']
labels:
broker.id: '0'
- targets: ['broker2-host:9404']
labels:
broker.id: '1'
- targets: ['broker3-host:9404']
labels:
broker.id: '2'
# Kafka metrics receiver for cluster-level consumer lag, topic, and partition metrics
kafkametrics/cluster:
brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES}
protocol_version: 2.0.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
# Exclude internal Kafka topics (prefixed with __) at the source
topic_match: "^[^_].*$"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
exporters:
otlp/backend:
endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT}
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
tls:
insecure: false
sending_queue:
num_consumers: 12
queue_size: 5000
retry_on_failure:
enabled: true
processors:
# Batch processor for efficient export
batch/export:
send_batch_size: 1024
timeout: 30s
# Memory limiter to prevent OOM
memory_limiter:
limit_percentage: 80
spike_limit_percentage: 30
check_interval: 1s
# Transform metric naming conventions (underscore to dot, normalize special names)
transform/metric-naming:
metric_statements:
- context: metric
statements:
- replace_pattern(name, "_", ".")
- replace_pattern(name, "\\.load\\.1", ".load_1")
- replace_pattern(name, "\\.recent\\.util", ".recent_util")
- replace_pattern(name, "file\\.descriptor\\.count", "file_descriptor.count")
- replace_pattern(name, "\\.memory\\.pool\\.used\\.bytes$", ".memory.pool.used")
- replace_pattern(name, "\\.memory\\.pool\\.max\\.bytes$", ".memory.pool.max")
- replace_pattern(name, "\\.memory\\.pool\\.collection\\.used\\.bytes$", ".memory.pool.used_after_last_gc")
- replace_pattern(name, "\\.non\\.preferred\\.leader", ".non_preferred_leader")
- replace_pattern(name, "\\.under\\.min\\.isr", ".under_min_isr")
- replace_pattern(name, "\\.under\\.replicated", ".under_replicated")
- replace_pattern(name, "\\.total$", "") where name != "kafka.request.time.total"
- context: datapoint
statements:
- set(attributes["name"], attributes["gc"]) where attributes["gc"] != nil
- delete_key(attributes, "gc") where attributes["gc"] != nil
- set(attributes["name"], attributes["pool"]) where attributes["pool"] != nil
- delete_key(attributes, "pool") where attributes["pool"] != nil
# Add cluster name to all metrics
resource/cluster-name:
attributes:
- key: kafka.cluster.name
# TODO: Replace with your Kafka cluster name
value: ${env:KAFKA_CLUSTER_NAME}
action: upsert
# Remove broker.id for cluster-level metrics
transform/remove_broker_id:
metric_statements:
- context: datapoint
statements:
- delete_key(attributes, "broker.id")
# Filter out scrape overhead metrics
filter/scrape-overhead:
metrics:
exclude:
match_type: regexp
metric_names:
- "^jmx_.*"
- "^process_.*"
- "^jvm_buffer_pool_.*"
- "^jvm_threads_.*"
- "^jvm_classes_.*"
- "^jvm_memory_(heap|non_heap)_(committed|init|max|used)_bytes$"
- "^jvm_compilation_.*"
- "^jvm_(runtime|info).*"
- "^jvm_memory_pool_(allocated_bytes_total|committed_bytes|init_bytes|collection_(committed|init|max)_bytes)$"
# Include only cluster-level metrics for the cluster pipeline
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "^kafka\\.partition\\.offline$"
- "^kafka\\.(leader|unclean)\\.election\\.rate$"
- "^kafka\\.partition\\.non_preferred_leader$"
- "^kafka\\.broker\\.fenced\\.count$"
- "^kafka\\.cluster\\.partition\\.count$"
- "^kafka\\.cluster\\.topic\\.count$"
# Exclude cluster-level metrics from the broker pipeline
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "^kafka\\.partition\\.offline$"
- "^kafka\\.(leader|unclean)\\.election\\.rate$"
- "^kafka\\.partition\\.non_preferred_leader$"
- "^kafka\\.broker\\.fenced\\.count$"
- "^kafka\\.cluster\\.partition\\.count$"
- "^kafka\\.cluster\\.topic\\.count$"
# Remove unnecessary attributes
transform/remove_attributes:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
- context: resource
statements:
- delete_key(attributes, "server.address")
- delete_key(attributes, "server.port")
- delete_key(attributes, "service.instance.id")
- delete_key(attributes, "host.name")
- delete_key(attributes, "url.scheme")
# Aggregate partition metrics to topic level
metricstransform/topic-aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
- include: kafka.partition.replicas
action: insert
new_name: kafka.partition.replicas.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
# Filter out original partition replicas metric
filter/exclude_partition_replicas_metric:
metrics:
exclude:
match_type: strict
metric_names:
- kafka.partition.replicas_in_sync
# Filter internal Kafka topics as a safety net
filter/internal_topics:
metrics:
datapoint:
- 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
# Convert cumulative to delta metrics
cumulativetodelta:
groupbyattrs/cluster:
keys: [kafka.cluster.name]
metricstransform/cluster_max:
transforms:
- include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count"
match_type: regexp
action: update
operations:
- action: aggregate_labels
aggregation_type: max
label_set: []
service:
pipelines:
# Application traces from instrumented Kafka clients and apps
traces:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Application metrics from instrumented Kafka clients and apps
metrics:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Application logs from instrumented Kafka clients and apps
logs:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Broker-level metrics from Prometheus JMX scraping
metrics/broker:
receivers:
- prometheus/kafka-jmx
processors:
- resource/cluster-name
- filter/scrape-overhead
- transform/metric-naming
- transform/remove_attributes
- filter/exclude_cluster_metrics
- memory_limiter
- cumulativetodelta
- batch/export
exporters:
- otlp/backend
# Cluster-level metrics from Prometheus JMX scraping
metrics/cluster/prometheus:
receivers:
- prometheus/kafka-jmx
processors:
- resource/cluster-name
- filter/scrape-overhead
- transform/metric-naming
- transform/remove_attributes
- filter/include_cluster_metrics
- transform/remove_broker_id
- memory_limiter
- cumulativetodelta
- groupbyattrs/cluster
- metricstransform/cluster_max
- batch/export
exporters:
- otlp/backend
# Cluster-level metrics from Kafka metrics receiver (consumer lag, topics, partitions)
metrics/cluster/kafkametrics:
receivers:
- kafkametrics/cluster
processors:
- resource/cluster-name
- filter/internal_topics
- transform/remove_attributes
- metricstransform/topic-aggregation
- filter/exclude_partition_replicas_metric
- memory_limiter
- cumulativetodelta
- batch/export
exporters:
- otlp/backend

Establecer variables de entorno

Configure las variables de entorno requeridas en el host de monitoreo antes de iniciar el recolector:

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"
$
export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region
$
# EU region: https://otlp.eu01.nr-data.net:4317

Parámetro de configuración

La siguiente tabla describe los parámetros de configuración clave:

VariableDescripción
NEW_RELIC_LICENSE_KEYSu clave de licencia de New Relic, por ejemplo YOUR_LICENSE_KEY
KAFKA_CLUSTER_NAMEUn nombre único para su clúster de Kafka, por ejemplo my-kafka-cluster
KAFKA_BOOTSTRAP_BROKER_ADDRESSESLas direcciones de su broker de arranque de Kafka, por ejemplo broker1-host:9092,broker2-host:9092,broker3-host:9092
NEW_RELIC_OTLP_ENDPOINTExtremo de ingesta de OTLP. Use https://otlp.nr-data.net:4317 para la región de EE. UU. o https://otlp.eu01.nr-data.net:4317 para la región de la UE. Para otras configuraciones, consulte Configure su extremo OTLP.

Instale e inicie el recopilador

Instale y ejecute el recolector en el host de monitoreo. Elija entre NRDOT Collector (distribución de New Relic) u OpenTelemetry Collector:

Sugerencia

NRDOT Collector es la distribución de New Relic de OpenTelemetry Collector con soporte de New Relic para asistencia. Para obtener más información, consulta el repositorio de GitHub de NRDOT Collector.

Paso 1. Descargue e instale el binario

bash
$
# Set version and architecture
$
NRDOT_VERSION="1.9.0"
$
ARCH="amd64" # or arm64
$
$
# Download and extract
$
curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \
>
--location --output collector.tar.gz
$
tar -xzf collector.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv nrdot-collector /usr/local/bin/
$
$
# Verify installation
$
nrdot-collector --version

Importante

Para otros sistemas operativos y arquitecturas, visita lanzamientos de NRDOT Collector y descarga el binario adecuado para tu sistema.

Paso 2. Inicie el recolector

bash
$
nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml

El recolector comenzará a recopilar métricas de Kafka y a enviarlas a New Relic en unos minutos.

Paso 1. Descargue e instale el binario

bash
$
# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version
$
OTEL_VERSION="<collector_version>"
$
ARCH="amd64"
$
$
curl -L -o otelcol-contrib.tar.gz \
>
"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"
$
$
tar -xzf otelcol-contrib.tar.gz
$
sudo mv otelcol-contrib /usr/local/bin/
$
otelcol-contrib --version

Para otros sistemas operativos, visite la página de lanzamientos de OpenTelemetry Collector.

Paso 2. Inicie el recolector

bash
$
otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml

El recolector comenzará a recopilar métricas de Kafka y a enviarlas a New Relic en unos minutos.

(Opcional) Instrumentar aplicaciones productoras o consumidoras

Importante

Soporte de lenguajes: actualmente, solo se admiten aplicaciones Java para la instrumentación de clientes Kafka mediante el agente de Java de OpenTelemetry.

Para recopilar telemetría a nivel de aplicación de sus aplicaciones de productor y consumidor de Kafka, descargue el agente de Java de OpenTelemetry si aún no lo ha hecho:

bash
$
mkdir -p ~/opentelemetry
$
curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \
>
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar

Inicie su aplicación con el agente:

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
$
java \
>
-javaagent:$OTEL_AGENT \
>
-Dotel.service.name="order-process-service" \
>
-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol="grpc" \
>
-Dotel.metrics.exporter="otlp" \
>
-Dotel.traces.exporter="otlp" \
>
-Dotel.logs.exporter="otlp" \
>
-Dotel.instrumentation.kafka.experimental-span-attributes="true" \
>
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
>
-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
>
-Dotel.instrumentation.kafka.enabled="true" \
>
-Dotel.instrumentation.runtime-telemetry.enabled="false" \
>
-jar your-kafka-application.jar

Parámetro de configuración

La siguiente tabla describe los parámetros de configuración clave:

ParámetroDescripción
service.nameReemplace con un nombre único para su aplicación de productor o consumidor, por ejemplo order-process-service
kafka.cluster.nameReemplace con el mismo nombre de clúster utilizado en la configuración de su recolector, por ejemplo my-kafka-cluster
otlp.endpointReemplace con el nombre de host o la IP del host que ejecuta su OpenTelemetry Collector, por ejemplo http://collector-host-ip:4317

Sugerencia

La configuración anterior envía telemetría a un OpenTelemetry Collector que se ejecuta en collector-host-ip:4317. Si desea un recolector independiente dedicado a la telemetría de la aplicación, cree uno con la siguiente configuración:

El agente de Java proporciona instrumentación de Kafka lista para usar sin cambios de código, capturando latencias de solicitudes, métricas de rendimiento, tasas de errores y rastreo distribuido. Para una configuración avanzada, consulte la documentación de instrumentación de Kafka.

(Opcional) Enviar logs del broker de Kafka

Para recopilar logs del broker de Kafka y enviarlos a New Relic, configure el receptor filelog en su OpenTelemetry Collector.

Encuentra tus datos

Después de unos minutos, sus datos de Kafka deberían aparecer en New Relic. Consulta Encuentra tus datos para obtener instrucciones detalladas sobre cómo explorar tus datos de Kafka en las diferentes vistas de la UI de New Relic.

La siguiente tabla resume dónde se almacena cada tipo de señal. Reemplace my-kafka-cluster por su valor KAFKA_CLUSTER_NAME en todas las consultas a continuación:

SeñalTipo de eventoQué incluye
MétricaMetricMétricas de broker, topic, partición, grupo de consumidores y JVM
LogsLogLogs de aplicaciones de productor y consumidor (mediante el agente de Java de OTel) y logs del broker recopilados mediante el agente de Java
TrazaSpanSpans de productor y consumidor, incluidas las operaciones publish y receive por mensaje en todos los temas

Métrica

Las métricas de broker, tópico, partición, grupo de consumidores y JVM se almacenan en el tipo de evento Metric. Reemplace my-kafka-cluster con su valor KAFKA_CLUSTER_NAME:

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Logs

Los logs de las aplicaciones de productor y consumidor instrumentadas con el agente de Java de OpenTelemetry, y los logs del broker cuando -Dotel.logs.exporter=otlp está configurado, se almacenan en el tipo de evento Log:

FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Traza

Los spans de productor y consumidor, incluidas las operaciones publish y receive por mensaje en todos los temas, se almacenan en el tipo de evento Span:

FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Ejemplo

Un ejemplo funcional completo con la configuración de Docker Compose, la configuración de OTel Collector, la configuración del agente de Java de OTel y aplicaciones de muestra de productor/consumidor está disponible en el respositorio de New Relic OpenTelemetry Examples.

Resolución de problemas

Próximos pasos

Copyright © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.