Monitoree su clúster de Apache Kafka autohospedado instalando el OpenTelemetry Collector directamente en hosts Linux. Elija entre el enfoque del agente de Java de OpenTelemetry o el de Prometheus JMX Exporter para recopilar métricas de JMX de sus brokers de Kafka.
Arquitectura
New Relic admite dos enfoques para el monitoreo de Kafka autohospedado: el agente de Java de OpenTelemetry o el Prometheus JMX Exporter. El siguiente diagrama ilustra el flujo de datos para cada enfoque.

Pasos de instalación
Siga estos pasos para configurar un monitoreo integral de Kafka instalando el agente de Java de OpenTelemetry en sus brokers y desplegando un recolector para recopilar y enviar métricas y logs a New Relic.
Antes de que empieces
Asegúrese de tener:
- Una cuenta de New Relic con un
- Acceso de red desde el colector al puerto del servidor de arranque de Kafka (típicamente 9092)
Crear configuración del colector
Cree la configuración principal de OpenTelemetry Collector en ~/opentelemetry/collector-kafka-config.yaml en un host de monitoreo.
receivers: # OTLP receiver for Kafka and JMX metrics from Java agents and application telemetry otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
# Kafka metrics receiver for cluster-level metrics kafkametrics: brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES} protocol_version: 2.0.0 scrapers: - brokers - topics - consumers collection_interval: 30s # Exclude internal Kafka topics (prefixed with __) at the source topic_match: "^[^_].*$" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
processors: batch/aggregation: send_batch_size: 1024 timeout: 30s
resourcedetection: detectors: [env, ec2, system] system: resource_attributes: host.name: enabled: true host.id: enabled: true
resource: attributes: - action: insert key: kafka.cluster.name value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id: metric_statements: # Remove broker.id for cluster-level metrics — these represent the whole cluster, # not a specific broker. broker.id is retained on broker-level metrics pipelines. - context: resource statements: - delete_key(attributes, "broker.id")
transform/remove_extra_attributes: metric_statements: - context: resource statements: # Delete all attributes starting with "process." - delete_matching_keys(attributes, "^process\\..*") # Delete all attributes starting with "telemetry." - delete_matching_keys(attributes, "^telemetry\\..*") - delete_key(attributes, "host.arch") - delete_key(attributes, "os.description") - delete_key(attributes, "host.image.id") - delete_key(attributes, "host.type") - delete_matching_keys(attributes, "^cloud\\..*") - delete_key(attributes, "service.instance.id") where IsMatch(attributes["service.name"], "^unknown_service:") - delete_key(attributes, "service.name") where IsMatch(attributes["service.name"], "^unknown_service:")
# Filter internal Kafka topics as a safety net (kafkametrics topic_match handles the receiver side) filter/internal_topics: metrics: datapoint: - 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
transform/des_units: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum
- include: kafka.partition.replicas action: insert new_name: kafka.partition.replicas.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum
filter/remove_partition_level_replicas: metrics: exclude: match_type: strict metric_names: - kafka.partition.replicas_in_sync
groupbyattrs/cluster: keys: [kafka.cluster.name]
metricstransform/cluster_max: transforms: - include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count" match_type: regexp action: update operations: - action: aggregate_labels aggregation_type: max label_set: []
exporters: otlp/newrelic: endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT} headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} compression: gzip timeout: 30s
service: pipelines: # Broker metrics pipeline (excludes cluster-level metrics) metrics/broker: receivers: [otlp, kafkametrics] processors: - resourcedetection - resource - filter/exclude_cluster_metrics - filter/internal_topics - transform/remove_extra_attributes - transform/des_units - cumulativetodelta - metricstransform/kafka_topic_sum_aggregation - filter/remove_partition_level_replicas - batch/aggregation exporters: [otlp/newrelic]
# Cluster metrics pipeline (controller-emitted metrics like offline partitions, topic/partition counts — no broker.id) metrics/cluster: receivers: [otlp] processors: - resourcedetection - resource - filter/include_cluster_metrics - transform/remove_broker_id - transform/remove_extra_attributes - transform/des_units - cumulativetodelta - groupbyattrs/cluster - metricstransform/cluster_max - batch/aggregation exporters: [otlp/newrelic]
# APM traces pipeline (producer + consumer spans via OTel Java agent) traces/apps: receivers: [otlp] processors: [resourcedetection, resource, batch/aggregation] exporters: [otlp/newrelic]
# APM logs pipeline (producer + consumer logs via OTel Java agent) logs/apps: receivers: [otlp] processors: [resourcedetection, resource, batch/aggregation] exporters: [otlp/newrelic]Establecer variables de entorno
Configure las variables de entorno requeridas en el host de monitoreo antes de instalar el recolector:
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"$export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US regionParámetro de configuración
La siguiente tabla describe los parámetros de configuración clave:
| Variable | Descripción |
|---|---|
NEW_RELIC_LICENSE_KEY | Su clave de licencia de New Relic, por ejemplo YOUR_LICENSE_KEY |
KAFKA_CLUSTER_NAME | Un nombre único para su clúster de Kafka, por ejemplo my-kafka-cluster |
KAFKA_BOOTSTRAP_BROKER_ADDRESSES | Las direcciones de su broker de arranque de Kafka, por ejemplo broker1-host:9092,broker2-host:9092,broker3-host:9092 |
NEW_RELIC_OTLP_ENDPOINT | Extremo de ingesta de OTLP. Use https://otlp.nr-data.net:4317 para la región de EE. UU. o https://otlp.eu01.nr-data.net:4317 para la región de la UE. Para otras configuraciones, consulte Configure su extremo OTLP. |
Instale e inicie el recopilador
Instale y ejecute el recolector en el host de monitoreo. Elija entre NRDOT Collector (distribución de New Relic) u OpenTelemetry Collector:
Sugerencia
NRDOT Collector es la distribución de New Relic de OpenTelemetry Collector con soporte de New Relic para asistencia.
Paso 1. Descargue e instale el binario
Descargue e instale el binario de NRDOT Collector para su sistema operativo host. El siguiente ejemplo es para la arquitectura linux_amd64:
$# Set version and architecture$NRDOT_VERSION="1.9.0"$ARCH="amd64" # or arm64$
$# Download and extract$curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \> --location --output collector.tar.gz$tar -xzf collector.tar.gz$
$# Move to a location in PATH (optional)$sudo mv nrdot-collector /usr/local/bin/$
$# Verify installation$nrdot-collector --versionImportante
Para otros sistemas operativos y arquitecturas, visita lanzamientos de NRDOT Collector y descarga el binario adecuado para tu sistema.
Paso 2. Inicie el recolector
Ejecute el colector con su archivo de configuración para comenzar a monitorear:
$nrdot-collector --config ~/opentelemetry/collector-kafka-config.yamlEl recolector ahora está en ejecución y listo para recibir datos. Complete los pasos restantes para adjuntar el agente de Java a sus brokers de Kafka antes de que las métricas aparezcan en New Relic.
Paso 1. Descargue e instale el binario
Descargue e instale el binario de OpenTelemetry Collector Contrib para su sistema operativo host. El siguiente ejemplo es para la arquitectura linux_amd64:
$# Set version and architecture$# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version$OTEL_VERSION="<collector_version>"$ARCH="amd64"$
$# Download the collector$curl -L -o otelcol-contrib.tar.gz \> "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"$
$# Extract the binary$tar -xzf otelcol-contrib.tar.gz$
$# Move to a location in PATH (optional)$sudo mv otelcol-contrib /usr/local/bin/$
$# Verify installation$otelcol-contrib --versionPara otros sistemas operativos, visite la página de lanzamientos de OpenTelemetry Collector.
Paso 2. Inicie el recolector
Ejecute el colector con su archivo de configuración para comenzar a monitorear:
$otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yamlEl recolector ahora está en ejecución y listo para recibir datos. Complete los pasos restantes para adjuntar el agente de Java a sus brokers de Kafka antes de que las métricas aparezcan en New Relic.
Descargar el agente de Java de OpenTelemetry
Importante
Asegúrese de que su OpenTelemetry Collector esté en ejecución antes de (re)iniciar los brokers de Kafka con el agente de Java adjunto. El agente comienza a enviar métricas inmediatamente al iniciar el broker, por lo que el recolector debe estar disponible para recibirlas.
El agente de Java de OpenTelemetry se ejecuta como un agente de Java adjunto a sus brokers de Kafka, recopilando métricas de Kafka y JMX y enviándolas al recolector mediante OTLP:
$# Create directory for OpenTelemetry components$mkdir -p ~/opentelemetry$
$# Download OpenTelemetry Java agent$curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \> https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarCrear configuración personalizada de JMX
Crea un archivo de configuración JMX del agente de Java de OpenTelemetry para recopilar métricas de Kafka de los MBeans de JMX.
Cree el archivo ~/opentelemetry/kafka-jmx-config.yaml en cada host del broker con la siguiente configuración:
---rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec mapping: Count: metric: kafka.message.count type: counter desc: The number of messages received by the broker unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.count type: &type counter desc: &desc The number of requests received by the broker unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.failed type: &type counter desc: &desc The number of requests to the broker resulting in a failure unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: 99thPercentile: metric: kafka.request.time.99p type: gauge desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize mapping: Value: metric: kafka.request.queue type: gauge desc: Size of the request queue unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec metricAttribute: direction: const(in) mapping: Count: metric: &metric kafka.network.io type: &type counter desc: &desc The bytes received or sent by the broker unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec metricAttribute: direction: const(out) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch metricAttribute: type: param(delayedOperation) mapping: Value: metric: kafka.purgatory.size type: gauge desc: The number of requests waiting in purgatory unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount mapping: Value: metric: kafka.partition.count type: gauge desc: The number of partitions on the broker unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount mapping: Value: metric: kafka.partition.offline type: gauge desc: The number of partitions offline unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions mapping: Value: metric: kafka.partition.under_replicated type: gauge desc: The number of under replicated partitions unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec metricAttribute: operation: const(shrink) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec metricAttribute: operation: const(expand) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica mapping: Value: metric: kafka.max.lag type: gauge desc: The max lag in messages between follower and leader replicas unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount mapping: Value: metric: kafka.controller.active.count type: gauge desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker. unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs mapping: Count: metric: kafka.leader.election.rate type: counter desc: The leader election count unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec mapping: Count: metric: kafka.unclean.election.rate type: counter desc: Unclean leader election count - increasing indicates broker failures unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
# Request latency: total count, 50th percentile, and average (99p kept above) - beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: Count: metric: kafka.request.time.total type: counter desc: The total time the broker has taken to service requests 50thPercentile: metric: kafka.request.time.50p type: gauge desc: The 50th percentile time the broker has taken to service requests Mean: metric: kafka.request.time.avg type: gauge desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs unit: ms type: gauge prefix: kafka.logs.flush. mapping: Count: metric: count unit: '{flush}' type: counter desc: Log flush count 50thPercentile: metric: time.50p desc: Log flush time - 50th percentile 99thPercentile: metric: time.99p desc: Log flush time - 99th percentile
# GC elapsed time (cumulative collection time in ms) - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
# JVM class loading - bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
# JVM heap committed (in addition to heap.used and heap.max) - bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: Committed heap memory type: gauge
# Additional JVM CPU and system metrics - bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
# JVM memory pool breakdown (by generation: G1 Old Gen, Eden, Survivor, etc.) - bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)Configurar el broker de Kafka
Adjunte el agente de Java de OpenTelemetry a su broker de Kafka configurando la variable de entorno KAFKA_OPTS antes de iniciar Kafka.
Ejemplo de broker único:
$OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"$JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"$
$nohup env KAFKA_OPTS="-javaagent:$OTEL_AGENT \> -Dotel.jmx.enabled=true \> -Dotel.jmx.config=$JMX_CONFIG \> -Dotel.resource.attributes=broker.id=1,kafka.cluster.name=my-kafka-cluster \> -Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \> -Dotel.exporter.otlp.protocol=grpc \> -Dotel.metrics.exporter=otlp \> -Dotel.logs.exporter=otlp \> -Dotel.instrumentation.runtime-telemetry.enabled=false \> -Dotel.metric.export.interval=30000" \> bin/kafka-server-start.sh config/server.properties &Importante
Clústeres de múltiples brokers: Para múltiples brokers, utilice la misma configuración con valores broker.id únicos (p. ej., broker.id=1, broker.id=2, broker.id=3) en el parámetro -Dotel.resource.attributes para cada broker.
Sugerencia
Los logs del broker se habilitan automáticamente con el flag -Dotel.logs.exporter=otlp anterior. Para deshabilitar la recopilación de logs del broker, configure -Dotel.logs.exporter=none en su lugar.
Parámetro de configuración
La siguiente tabla describe los parámetros de configuración clave:
| Parámetro | Descripción |
|---|---|
otlp.endpoint | Reemplace con la IP o el nombre de host del host que ejecuta su OpenTelemetry Collector, por ejemplo http://collector-host-ip:4317 |
broker.id | Reemplace 1 con el ID de broker único para cada broker, por ejemplo, broker.id=1, broker.id=2, broker.id=3 |
kafka.cluster.name | Reemplace my-kafka-cluster con el nombre de su clúster Kafka. Debe coincidir con el valor establecido en la configuración del recolector. |
logs.exporter | Habilita la recopilación de logs del broker cuando se configura en otlp. Establézcalo en none para deshabilitar el reenvío de logs del broker. |
Para ver las opciones de configuración completas, consulte la guía de configuración del agente de Java.
(Opcional) Instrumentar aplicaciones productoras o consumidoras
Importante
Soporte de lenguajes: actualmente, solo se admiten aplicaciones Java para la instrumentación de clientes Kafka mediante el agente de Java de OpenTelemetry.
Para recopilar telemetría a nivel de aplicación de sus aplicaciones de productor y consumidor de Kafka, descargue el agente de Java de OpenTelemetry desde el paso Descargar el agente de Java de OpenTelemetry anterior.
Inicie su aplicación con el agente:
$OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"$
$java \> -javaagent:$OTEL_AGENT \> -Dotel.service.name="order-process-service" \> -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \> -Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \> -Dotel.exporter.otlp.protocol="grpc" \> -Dotel.metrics.exporter="otlp" \> -Dotel.traces.exporter="otlp" \> -Dotel.logs.exporter="otlp" \> -Dotel.instrumentation.kafka.experimental-span-attributes="true" \> -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \> -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \> -Dotel.instrumentation.kafka.enabled="true" \> -Dotel.instrumentation.runtime-telemetry.enabled="false" \> -jar your-kafka-application.jarParámetro de configuración
La siguiente tabla describe los parámetros de configuración clave:
| Parámetro | Descripción |
|---|---|
service.name | Reemplace con un nombre único para su aplicación de productor o consumidor, por ejemplo order-process-service |
kafka.cluster.name | Reemplace con el mismo nombre de clúster utilizado en la configuración de su recolector, por ejemplo my-kafka-cluster |
otlp.endpoint | Reemplace con el nombre de host o la IP del host que ejecuta su OpenTelemetry Collector, por ejemplo http://collector-host-ip:4317 |
Sugerencia
La configuración anterior envía telemetría a un OpenTelemetry Collector que se ejecuta en collector-host-ip:4317. Si desea un recolector independiente dedicado a la telemetría de la aplicación, cree uno con la siguiente configuración:
El agente de Java proporciona instrumentación de Kafka lista para usar sin cambios de código, capturando la latencia de las solicitudes, las métricas de rendimiento, la tasa de errores y el rastreo distribuido.
Para una configuración avanzada, consulte la documentación de instrumentación de Kafka.
Siga estos pasos para configurar un monitoreo integral de Kafka instalando el Prometheus JMX Exporter en sus brokers y desplegando un recolector para recopilar y enviar métricas a New Relic.
Antes de que empieces
Asegúrese de tener:
- Una cuenta de New Relic con un
- Acceso a la red desde el host del recolector a cada broker en el puerto
9404 - Acceso de red desde el recolector al puerto de arranque de Kafka (típicamente
9092)
Descarga el expositor JMX de Prometheus
Descargue el JAR de Prometheus JMX Exporter en cada host del broker de Kafka:
$# Create directory for Prometheus components$mkdir -p ~/opentelemetry$
$# Download the Prometheus JMX Exporter agent JAR$# Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases.$JMX_EXPORTER_VERSION="1.5.0"$curl -L -o ~/opentelemetry/jmx_prometheus_javaagent.jar \> "https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar"Crear configuración de métricas JMX
Cree el archivo de configuración de JMX Exporter que defina qué métricas de Kafka recolectar. Guarde como ~/opentelemetry/kafka-jmx-config.yaml en cada host del broker:
startDelaySeconds: 0lowercaseOutputName: truelowercaseOutputLabelNames: true
rules: # Cluster-level controller metrics - pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value' name: kafka_cluster_topic_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value' name: kafka_cluster_partition_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value' name: kafka_broker_fenced_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value' name: kafka_partition_non_preferred_leader type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value' name: kafka_partition_offline type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value' name: kafka_controller_active_count type: GAUGE
# Broker-level replica metrics - pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value' name: kafka_partition_under_min_isr type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value' name: kafka_broker_leader_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value' name: kafka_partition_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value' name: kafka_partition_under_replicated type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value' name: kafka_max_lag type: GAUGE
# Broker topic metrics (totals) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count' name: kafka_message_count type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "out"
# Per-topic metrics (only appear after traffic flows) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count' name: kafka_prod_msg_count type: COUNTER labels: topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "out"
# Request metrics - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile' name: kafka_request_time_99p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value' name: kafka_request_queue type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value' name: kafka_purgatory_size type: GAUGE labels: type: "$1"
# Controller stats - pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count' name: kafka_leader_election_rate type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count' name: kafka_unclean_election_rate type: COUNTER
# JVM Garbage Collection - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount' name: jvm_gc_collections_count type: COUNTER labels: name: "$1"
# JVM Memory - pattern: 'java.lang<type=Memory><HeapMemoryUsage>max' name: jvm_memory_heap_max type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used' name: jvm_memory_heap_used type: GAUGE
# JVM Threading and System - pattern: 'java.lang<type=Threading><>ThreadCount' name: jvm_thread_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad' name: jvm_system_cpu_utilization type: GAUGE
# Broker uptime - pattern: 'java.lang<type=Runtime><>Uptime' name: kafka_broker_uptime type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above) - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count' name: kafka_request_time_total type: COUNTER labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile' name: kafka_request_time_50p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean' name: kafka_request_time_avg type: GAUGE labels: type: "$1"
# Log flush metrics - pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count' name: kafka_logs_flush_count type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile' name: kafka_logs_flush_time_50p type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile' name: kafka_logs_flush_time_99p type: GAUGE
# JVM GC elapsed time - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime' name: jvm_gc_collections_elapsed type: COUNTER labels: name: "$1"
# JVM Memory heap committed - pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed' name: jvm_memory_heap_committed type: GAUGE
# JVM class loading - pattern: 'java.lang<type=ClassLoading><>LoadedClassCount' name: jvm_class_count type: GAUGE
# Additional JVM OS metrics - pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage' name: jvm_system_cpu_load_1m type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors' name: jvm_cpu_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad' name: jvm_cpu_recent_utilization type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount' name: jvm_file_descriptor_count type: GAUGE
# JVM Memory Pool - pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used' name: jvm_memory_pool_used type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max' name: jvm_memory_pool_max type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used' name: jvm_memory_pool_used_after_last_gc type: GAUGE labels: name: "$1"Sugerencia
Personalizar métricas: puede agregar o modificar patrones consultando los ejemplos de Prometheus JMX Exporter y la documentación de Kafka MBean. Consulte la documentación de reglas de JMX Exporter para configuración adicional.
Configurar los brokers de Kafka para utilizar JMX Exporter
Adjunte el Prometheus JMX Exporter como agente de Java a cada broker de Kafka agregándolo a sus opciones de inicio de Kafka.
Ejemplo de broker único:
$JMX_JAR="$HOME/opentelemetry/jmx_prometheus_javaagent.jar"$JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"$
$nohup env KAFKA_OPTS="-javaagent:${JMX_JAR}=9404:${JMX_CONFIG}" \> bin/kafka-server-start.sh config/server.properties &Cada broker expondrá ahora métricas de Prometheus en el puerto 9404. Verificar:
$curl http://localhost:9404/metrics | grep kafka_Importante
Clústeres de múltiples brokers: aplique la misma configuración KAFKA_OPTS a cada broker. Cada broker expone métricas en el puerto 9404 desde la IP de su propio host.
Crear configuración del colector
Cree la configuración de OpenTelemetry Collector en ~/opentelemetry/collector-kafka-config.yaml en un host de monitoreo.
El receptor de Prometheus raspa todos los extremos del broker. El recolector escucha en 0.0.0.0:4317 en busca de cualquier dato de OTLP (trazas de aplicación, registros) además de extraer datos de los extremos de Prometheus.
receivers: # OTLP receiver for application traces, metrics, and logs (listens on port 4317) otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
# Prometheus receiver scrapes JMX metrics from Kafka brokers prometheus/kafka-jmx: config: scrape_configs: - job_name: 'kafka-jmx-metrics' metrics_path: /metrics scrape_interval: 30s static_configs: # TODO: Replace each target with your broker hostname or IP, and set a unique broker.id per broker - targets: ['broker1-host:9404'] labels: broker.id: '0' - targets: ['broker2-host:9404'] labels: broker.id: '1' - targets: ['broker3-host:9404'] labels: broker.id: '2'
# Kafka metrics receiver for cluster-level consumer lag, topic, and partition metrics kafkametrics/cluster: brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES} protocol_version: 2.0.0 scrapers: - brokers - topics - consumers collection_interval: 30s # Exclude internal Kafka topics (prefixed with __) at the source topic_match: "^[^_].*$" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
exporters: otlp/backend: endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT} headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} tls: insecure: false sending_queue: num_consumers: 12 queue_size: 5000 retry_on_failure: enabled: true
processors: # Batch processor for efficient export batch/export: send_batch_size: 1024 timeout: 30s
# Memory limiter to prevent OOM memory_limiter: limit_percentage: 80 spike_limit_percentage: 30 check_interval: 1s
# Transform metric naming conventions (underscore to dot, normalize special names) transform/metric-naming: metric_statements: - context: metric statements: - replace_pattern(name, "_", ".") - replace_pattern(name, "\\.load\\.1", ".load_1") - replace_pattern(name, "\\.recent\\.util", ".recent_util") - replace_pattern(name, "file\\.descriptor\\.count", "file_descriptor.count") - replace_pattern(name, "\\.memory\\.pool\\.used\\.bytes$", ".memory.pool.used") - replace_pattern(name, "\\.memory\\.pool\\.max\\.bytes$", ".memory.pool.max") - replace_pattern(name, "\\.memory\\.pool\\.collection\\.used\\.bytes$", ".memory.pool.used_after_last_gc") - replace_pattern(name, "\\.non\\.preferred\\.leader", ".non_preferred_leader") - replace_pattern(name, "\\.under\\.min\\.isr", ".under_min_isr") - replace_pattern(name, "\\.under\\.replicated", ".under_replicated") - replace_pattern(name, "\\.total$", "") where name != "kafka.request.time.total" - context: datapoint statements: - set(attributes["name"], attributes["gc"]) where attributes["gc"] != nil - delete_key(attributes, "gc") where attributes["gc"] != nil - set(attributes["name"], attributes["pool"]) where attributes["pool"] != nil - delete_key(attributes, "pool") where attributes["pool"] != nil
# Add cluster name to all metrics resource/cluster-name: attributes: - key: kafka.cluster.name # TODO: Replace with your Kafka cluster name value: ${env:KAFKA_CLUSTER_NAME} action: upsert
# Remove broker.id for cluster-level metrics transform/remove_broker_id: metric_statements: - context: datapoint statements: - delete_key(attributes, "broker.id")
# Filter out scrape overhead metrics filter/scrape-overhead: metrics: exclude: match_type: regexp metric_names: - "^jmx_.*" - "^process_.*" - "^jvm_buffer_pool_.*" - "^jvm_threads_.*" - "^jvm_classes_.*" - "^jvm_memory_(heap|non_heap)_(committed|init|max|used)_bytes$" - "^jvm_compilation_.*" - "^jvm_(runtime|info).*" - "^jvm_memory_pool_(allocated_bytes_total|committed_bytes|init_bytes|collection_(committed|init|max)_bytes)$"
# Include only cluster-level metrics for the cluster pipeline filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "^kafka\\.partition\\.offline$" - "^kafka\\.(leader|unclean)\\.election\\.rate$" - "^kafka\\.partition\\.non_preferred_leader$" - "^kafka\\.broker\\.fenced\\.count$" - "^kafka\\.cluster\\.partition\\.count$" - "^kafka\\.cluster\\.topic\\.count$"
# Exclude cluster-level metrics from the broker pipeline filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "^kafka\\.partition\\.offline$" - "^kafka\\.(leader|unclean)\\.election\\.rate$" - "^kafka\\.partition\\.non_preferred_leader$" - "^kafka\\.broker\\.fenced\\.count$" - "^kafka\\.cluster\\.partition\\.count$" - "^kafka\\.cluster\\.topic\\.count$"
# Remove unnecessary attributes transform/remove_attributes: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != "" - context: resource statements: - delete_key(attributes, "server.address") - delete_key(attributes, "server.port") - delete_key(attributes, "service.instance.id") - delete_key(attributes, "host.name") - delete_key(attributes, "url.scheme")
# Aggregate partition metrics to topic level metricstransform/topic-aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum - include: kafka.partition.replicas action: insert new_name: kafka.partition.replicas.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum
# Filter out original partition replicas metric filter/exclude_partition_replicas_metric: metrics: exclude: match_type: strict metric_names: - kafka.partition.replicas_in_sync
# Filter internal Kafka topics as a safety net filter/internal_topics: metrics: datapoint: - 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
# Convert cumulative to delta metrics cumulativetodelta:
groupbyattrs/cluster: keys: [kafka.cluster.name]
metricstransform/cluster_max: transforms: - include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count" match_type: regexp action: update operations: - action: aggregate_labels aggregation_type: max label_set: []
service: pipelines: # Application traces from instrumented Kafka clients and apps traces: receivers: [otlp] processors: [memory_limiter, batch/export] exporters: [otlp/backend]
# Application metrics from instrumented Kafka clients and apps metrics: receivers: [otlp] processors: [memory_limiter, batch/export] exporters: [otlp/backend]
# Application logs from instrumented Kafka clients and apps logs: receivers: [otlp] processors: [memory_limiter, batch/export] exporters: [otlp/backend]
# Broker-level metrics from Prometheus JMX scraping metrics/broker: receivers: - prometheus/kafka-jmx processors: - resource/cluster-name - filter/scrape-overhead - transform/metric-naming - transform/remove_attributes - filter/exclude_cluster_metrics - memory_limiter - cumulativetodelta - batch/export exporters: - otlp/backend
# Cluster-level metrics from Prometheus JMX scraping metrics/cluster/prometheus: receivers: - prometheus/kafka-jmx processors: - resource/cluster-name - filter/scrape-overhead - transform/metric-naming - transform/remove_attributes - filter/include_cluster_metrics - transform/remove_broker_id - memory_limiter - cumulativetodelta - groupbyattrs/cluster - metricstransform/cluster_max - batch/export exporters: - otlp/backend
# Cluster-level metrics from Kafka metrics receiver (consumer lag, topics, partitions) metrics/cluster/kafkametrics: receivers: - kafkametrics/cluster processors: - resource/cluster-name - filter/internal_topics - transform/remove_attributes - metricstransform/topic-aggregation - filter/exclude_partition_replicas_metric - memory_limiter - cumulativetodelta - batch/export exporters: - otlp/backendEstablecer variables de entorno
Configure las variables de entorno requeridas en el host de monitoreo antes de iniciar el recolector:
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"$export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region$# EU region: https://otlp.eu01.nr-data.net:4317Parámetro de configuración
La siguiente tabla describe los parámetros de configuración clave:
| Variable | Descripción |
|---|---|
NEW_RELIC_LICENSE_KEY | Su clave de licencia de New Relic, por ejemplo YOUR_LICENSE_KEY |
KAFKA_CLUSTER_NAME | Un nombre único para su clúster de Kafka, por ejemplo my-kafka-cluster |
KAFKA_BOOTSTRAP_BROKER_ADDRESSES | Las direcciones de su broker de arranque de Kafka, por ejemplo broker1-host:9092,broker2-host:9092,broker3-host:9092 |
NEW_RELIC_OTLP_ENDPOINT | Extremo de ingesta de OTLP. Use https://otlp.nr-data.net:4317 para la región de EE. UU. o https://otlp.eu01.nr-data.net:4317 para la región de la UE. Para otras configuraciones, consulte Configure su extremo OTLP. |
Instale e inicie el recopilador
Instale y ejecute el recolector en el host de monitoreo. Elija entre NRDOT Collector (distribución de New Relic) u OpenTelemetry Collector:
Sugerencia
NRDOT Collector es la distribución de New Relic de OpenTelemetry Collector con soporte de New Relic para asistencia. Para obtener más información, consulta el repositorio de GitHub de NRDOT Collector.
Paso 1. Descargue e instale el binario
$# Set version and architecture$NRDOT_VERSION="1.9.0"$ARCH="amd64" # or arm64$
$# Download and extract$curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \> --location --output collector.tar.gz$tar -xzf collector.tar.gz$
$# Move to a location in PATH (optional)$sudo mv nrdot-collector /usr/local/bin/$
$# Verify installation$nrdot-collector --versionImportante
Para otros sistemas operativos y arquitecturas, visita lanzamientos de NRDOT Collector y descarga el binario adecuado para tu sistema.
Paso 2. Inicie el recolector
$nrdot-collector --config ~/opentelemetry/collector-kafka-config.yamlEl recolector comenzará a recopilar métricas de Kafka y a enviarlas a New Relic en unos minutos.
Paso 1. Descargue e instale el binario
$# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version$OTEL_VERSION="<collector_version>"$ARCH="amd64"$
$curl -L -o otelcol-contrib.tar.gz \> "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"$
$tar -xzf otelcol-contrib.tar.gz$sudo mv otelcol-contrib /usr/local/bin/$otelcol-contrib --versionPara otros sistemas operativos, visite la página de lanzamientos de OpenTelemetry Collector.
Paso 2. Inicie el recolector
$otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yamlEl recolector comenzará a recopilar métricas de Kafka y a enviarlas a New Relic en unos minutos.
(Opcional) Instrumentar aplicaciones productoras o consumidoras
Importante
Soporte de lenguajes: actualmente, solo se admiten aplicaciones Java para la instrumentación de clientes Kafka mediante el agente de Java de OpenTelemetry.
Para recopilar telemetría a nivel de aplicación de sus aplicaciones de productor y consumidor de Kafka, descargue el agente de Java de OpenTelemetry si aún no lo ha hecho:
$mkdir -p ~/opentelemetry$curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \> https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarInicie su aplicación con el agente:
$OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"$
$java \> -javaagent:$OTEL_AGENT \> -Dotel.service.name="order-process-service" \> -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \> -Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \> -Dotel.exporter.otlp.protocol="grpc" \> -Dotel.metrics.exporter="otlp" \> -Dotel.traces.exporter="otlp" \> -Dotel.logs.exporter="otlp" \> -Dotel.instrumentation.kafka.experimental-span-attributes="true" \> -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \> -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \> -Dotel.instrumentation.kafka.enabled="true" \> -Dotel.instrumentation.runtime-telemetry.enabled="false" \> -jar your-kafka-application.jarParámetro de configuración
La siguiente tabla describe los parámetros de configuración clave:
| Parámetro | Descripción |
|---|---|
service.name | Reemplace con un nombre único para su aplicación de productor o consumidor, por ejemplo order-process-service |
kafka.cluster.name | Reemplace con el mismo nombre de clúster utilizado en la configuración de su recolector, por ejemplo my-kafka-cluster |
otlp.endpoint | Reemplace con el nombre de host o la IP del host que ejecuta su OpenTelemetry Collector, por ejemplo http://collector-host-ip:4317 |
Sugerencia
La configuración anterior envía telemetría a un OpenTelemetry Collector que se ejecuta en collector-host-ip:4317. Si desea un recolector independiente dedicado a la telemetría de la aplicación, cree uno con la siguiente configuración:
El agente de Java proporciona instrumentación de Kafka lista para usar sin cambios de código, capturando latencias de solicitudes, métricas de rendimiento, tasas de errores y rastreo distribuido. Para una configuración avanzada, consulte la documentación de instrumentación de Kafka.
(Opcional) Enviar logs del broker de Kafka
Para recopilar logs del broker de Kafka y enviarlos a New Relic, configure el receptor filelog en su OpenTelemetry Collector.
Encuentra tus datos
Después de unos minutos, sus datos de Kafka deberían aparecer en New Relic. Consulta Encuentra tus datos para obtener instrucciones detalladas sobre cómo explorar tus datos de Kafka en las diferentes vistas de la UI de New Relic.
La siguiente tabla resume dónde se almacena cada tipo de señal. Reemplace my-kafka-cluster por su valor KAFKA_CLUSTER_NAME en todas las consultas a continuación:
| Señal | Tipo de evento | Qué incluye |
|---|---|---|
| Métrica | Metric | Métricas de broker, topic, partición, grupo de consumidores y JVM |
| Logs | Log | Logs de aplicaciones de productor y consumidor (mediante el agente de Java de OTel) y logs del broker recopilados mediante el agente de Java |
| Traza | Span | Spans de productor y consumidor, incluidas las operaciones publish y receive por mensaje en todos los temas |
Métrica
Las métricas de broker, tópico, partición, grupo de consumidores y JVM se almacenan en el tipo de evento Metric. Reemplace my-kafka-cluster con su valor KAFKA_CLUSTER_NAME:
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoLogs
Los logs de las aplicaciones de productor y consumidor instrumentadas con el agente de Java de OpenTelemetry, y los logs del broker cuando -Dotel.logs.exporter=otlp está configurado, se almacenan en el tipo de evento Log:
FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoTraza
Los spans de productor y consumidor, incluidas las operaciones publish y receive por mensaje en todos los temas, se almacenan en el tipo de evento Span:
FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoEjemplo
Un ejemplo funcional completo con la configuración de Docker Compose, la configuración de OTel Collector, la configuración del agente de Java de OTel y aplicaciones de muestra de productor/consumidor está disponible en el respositorio de New Relic OpenTelemetry Examples.
Resolución de problemas
Próximos pasos
- Explora las métricas de Kafka - Consulta la referencia completa de métricas
- Crear dashboards personalizados - Construir visualizaciones para sus datos de Kafka
- Configurar alertas - Monitorea métricas críticas como el retraso del consumidor y las particiones subreplicadas