• /
  • EnglishEspañolFrançais日本語한국어Português
  • Inicia sesiónComenzar ahora

Te ofrecemos esta traducción automática para facilitar la lectura.

En caso de que haya discrepancias entre la versión en inglés y la versión traducida, se entiende que prevalece la versión en inglés. Visita esta página para obtener más información.

Crea una propuesta

Monitorea Kafka autohospedado con OpenTelemetry

Supervise su clúster de Apache Kafka autohospedado instalando el recopilador de OpenTelemetry directamente en los hosts de Linux.

Antes de que empieces

Asegúrese de tener:

  • Una cuenta de New Relic con un

  • OpenJDK instalado en el host de monitoreo

  • JMX habilitado en los brokers de Kafka (típicamente en el puerto 9999)

  • Acceso a la red desde el recopilador a los brokers de Kafka:

    • Puerto del servidor de arranque (normalmente 9092)
    • Puerto JMX (típicamente 9999)

Paso 1: Instale el recopilador OpenTelemetry

Descargue e instale el binario de OpenTelemetry Collector Contrib para el sistema operativo de su host desde las versiones de OpenTelemetry Collector.

Paso 2: Descargue el scraper de JMX

El scraper JMX recopila métricas detalladas de los MBeans del broker de Kafka:

bash
$
# Create directory in user home (no sudo needed)
$
mkdir -p ~/opentelemetry
$
curl -L -o ~/opentelemetry/opentelemetry-jmx-scraper.jar \
>
https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v1.52.0/opentelemetry-jmx-scraper.jar

Importante

Compatibilidad de versiones: Esta guía utiliza JMX Scraper 1.52.0. Es posible que las versiones anteriores del OpenTelemetry Collector no incluyan el hash de este scraper en su lista de compatibilidad. Para obtener los mejores resultados, utilice la última versión de OpenTelemetry Collector, que incluye soporte para esta versión de JMX Scraper.

Paso 3: Crear la configuración de métricas personalizadas JMX

Cree un archivo de configuración JMX personalizado para recopilar métricas adicionales de Kafka que no están incluidas en el sistema de destino predeterminado.

Cree el archivo ~/opentelemetry/kafka-jmx-config.yaml con la siguiente configuración:

---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages in per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: current heap usage
type: gauge
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

Sugerencia

Personalizar la recopilación de métricas: Puede extraer métricas de Kafka adicionales agregando reglas MBean personalizadas al archivo kafka-jmx-config.yaml:

Paso 4: Crear la configuración del recopilador

Cree la configuración principal de OpenTelemetry Collector en ~/opentelemetry/config.yaml.

receivers:
# Kafka metrics receiver for cluster-level metrics
kafkametrics:
brokers:
- ${env:KAFKA_BROKER_ADDRESS}
protocol_version: 2.8.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
topic_match: ".*"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
# JMX receiver for broker-specific metrics
jmx/kafka_broker-1:
jar_path: ${env:HOME}/opentelemetry/opentelemetry-jmx-scraper.jar
endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
target_system: kafka
collection_interval: 30s
jmx_configs: ${env:HOME}/opentelemetry/kafka-jmx-config.yaml
resource_attributes:
broker.id: "1"
broker.endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
processors:
batch/aggregation:
send_batch_size: 1024
timeout: 30s
resourcedetection:
detectors: [env, ec2, system]
system:
resource_attributes:
host.name:
enabled: true
host.id:
enabled: true
resource:
attributes:
- action: insert
key: kafka.cluster.name
value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id:
metric_statements:
- context: resource
statements:
- delete_key(attributes, "broker.id")
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
transform/des_units:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [ topic ]
aggregation_type: sum
exporters:
otlp/newrelic:
endpoint: https://otlp.nr-data.net:4317
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
compression: gzip
timeout: 30s
service:
pipelines:
metrics/brokers-cluster-topics:
receivers: [jmx/kafka_broker-1, kafkametrics]
processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, batch/aggregation]
exporters: [otlp/newrelic]
metrics/jmx-cluster:
receivers: [jmx/kafka_broker-1]
processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/des_units, cumulativetodelta, batch/aggregation]
exporters: [otlp/newrelic]

Notas de configuración:

  • Punto final OTLP: Usa https://otlp.nr-data.net:4317 (región de EE. UU.) o https://otlp.eu01.nr-data.net:4317 (región de la UE). Consulte Configure su punto final OTLP para otras regiones

Importante

Para múltiples brokers, agregue receptores JMX adicionales con diferentes puntos finales e ID de broker para monitorear cada broker en su clúster.

Paso 5: Establecer variables de entorno

Establezca las variables de entorno requeridas:

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BROKER_ADDRESS="localhost:9092"
$
export KAFKA_BROKER_JMX_ADDRESS="localhost:9999"

Reemplazar:

  • YOUR_LICENSE_KEY con su clave de licencia de New Relic
  • my-kafka-cluster con un nombre único para su clúster de Kafka
  • localhost:9092 con la dirección de su servidor de arranque de Kafka
  • localhost:9999 con su punto final JMX del broker de Kafka

Paso 6: Inicie el recopilador

Ejecute el recopilador directamente (no se necesita sudo):

bash
$
# Start the collector with your config
$
otelcol-contrib --config ~/opentelemetry/config.yaml

El recopilador comenzará a enviar métricas de Kafka a New Relic en unos minutos.

Cree un servicio systemd para la ejecución persistente (requiere sudo para la configuración única):

bash
$
# Create systemd service file
$
sudo tee /etc/systemd/system/otelcol-contrib.service > /dev/null <<EOF
$
[Unit]
$
Description=OpenTelemetry Collector for Kafka
$
After=network.target
$
$
[Service]
$
Type=simple
$
User=$USER
$
WorkingDirectory=$HOME/opentelemetry
$
ExecStart=/usr/local/bin/otelcol-contrib --config $HOME/opentelemetry/config.yaml
$
Restart=on-failure
$
Environment="NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY"
$
Environment="KAFKA_CLUSTER_NAME=my-kafka-cluster"
$
Environment="KAFKA_BROKER_ADDRESS=localhost:9092"
$
Environment="KAFKA_BROKER_JMX_ADDRESS=localhost:9999"
$
$
[Install]
$
WantedBy=multi-user.target
$
EOF

Reemplace YOUR_LICENSE_KEY y otros valores, luego habilite e inicie el servicio:

bash
$
sudo systemctl daemon-reload
$
sudo systemctl enable otelcol-contrib
$
sudo systemctl start otelcol-contrib
$
sudo systemctl status otelcol-contrib

Paso 7: (Opcional) Instrumente las aplicaciones de productor o consumidor

Para recopilar telemetría a nivel de aplicación de sus aplicaciones de productor y consumidor de Kafka, use el Agente Java de OpenTelemetry:

  1. Descargue el agente Java:

    bash
    $
    mkdir -p ~/otel-java
    $
    curl -L -o ~/otel-java/opentelemetry-javaagent.jar \
    >
    https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
  2. Inicie su aplicación con el agente:

    bash
    $
    java \
    >
    -javaagent:~/otel-java/opentelemetry-javaagent.jar \
    >
    -Dotel.service.name="kafka-producer-1" \
    >
    -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
    >
    -Dotel.exporter.otlp.endpoint=https://otlp.nr-data.net:4317 \
    >
    -Dotel.exporter.otlp.protocol="grpc" \
    >
    -Dotel.metrics.exporter="otlp" \
    >
    -Dotel.traces.exporter="otlp" \
    >
    -Dotel.logs.exporter="otlp" \
    >
    -Dotel.instrumentation.kafka.experimental-span-attributes="true" \
    >
    -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
    >
    -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
    >
    -Dotel.instrumentation.kafka.enabled="true" \
    >
    -jar your-kafka-application.jar

Reemplazar:

  • kafka-producer-1 con un nombre único para su aplicación de productor o consumidor
  • my-kafka-cluster con el mismo nombre de clúster utilizado en la configuración de su recolector
  • https://otlp.nr-data.net:4317 con su punto final OTLP de New Relic (use https://otlp.eu01.nr-data.net:4317 para la región de la UE). Para otros endpoints y opciones de configuración, consulte Configure su endpoint OTLP.

El agente Java proporciona instrumentación de Kafka lista para usar sin cambios de código, capturando:

  • Latencias de solicitud
  • Métricas de rendimiento
  • Tasas de error
  • Rastreo distribuido

Para una configuración avanzada, consulte la documentación de instrumentación de Kafka.

Paso 6: (Opcional) Reenviar los logs del broker Kafka

Para recopilar logs de agente de Kafka de sus hosts y enviarlos a New Relic, configure el receptor de log de archivos en su OpenTelemetry Collector.

Encuentra tus datos

Después de unos minutos, sus métricas de Kafka deberían aparecer en New Relic. Consulte Encuentre sus datos para obtener instrucciones detalladas sobre cómo explorar sus métricas de Kafka en diferentes vistas en la interfaz de usuario de New Relic.

También puede consultar sus datos con NRQL:

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'

Resolución de problemas

Próximos pasos

Copyright © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.