Supervise su clúster de Apache Kafka autohospedado instalando el recopilador de OpenTelemetry directamente en los hosts de Linux.
Antes de que empieces
Asegúrese de tener:
Una cuenta de New Relic con un
OpenJDK instalado en el host de monitoreo
JMX habilitado en los brokers de Kafka (típicamente en el puerto 9999)
Acceso a la red desde el recopilador a los brokers de Kafka:
- Puerto del servidor de arranque (normalmente 9092)
- Puerto JMX (típicamente 9999)
Paso 1: Instale el recopilador OpenTelemetry
Descargue e instale el binario de OpenTelemetry Collector Contrib para el sistema operativo de su host desde las versiones de OpenTelemetry Collector.
Paso 2: Descargue el scraper de JMX
El scraper JMX recopila métricas detalladas de los MBeans del broker de Kafka:
$# Create directory in user home (no sudo needed)$mkdir -p ~/opentelemetry$curl -L -o ~/opentelemetry/opentelemetry-jmx-scraper.jar \> https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v1.52.0/opentelemetry-jmx-scraper.jarImportante
Compatibilidad de versiones: Esta guía utiliza JMX Scraper 1.52.0. Es posible que las versiones anteriores del OpenTelemetry Collector no incluyan el hash de este scraper en su lista de compatibilidad. Para obtener los mejores resultados, utilice la última versión de OpenTelemetry Collector, que incluye soporte para esta versión de JMX Scraper.
Paso 3: Crear la configuración de métricas personalizadas JMX
Cree un archivo de configuración JMX personalizado para recopilar métricas adicionales de Kafka que no están incluidas en el sistema de destino predeterminado.
Cree el archivo ~/opentelemetry/kafka-jmx-config.yaml con la siguiente configuración:
---rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages in per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)Sugerencia
Personalizar la recopilación de métricas: Puede extraer métricas de Kafka adicionales agregando reglas MBean personalizadas al archivo kafka-jmx-config.yaml:
Encuentre los nombres de MBean disponibles en la documentación de monitoreo de Kafka
Esto le permite recopilar cualquier métrica JMX expuesta por los brokers de Kafka en función de sus necesidades específicas de monitoreo.
Paso 4: Crear la configuración del recopilador
Cree la configuración principal de OpenTelemetry Collector en ~/opentelemetry/config.yaml.
receivers: # Kafka metrics receiver for cluster-level metrics kafkametrics: brokers: - ${env:KAFKA_BROKER_ADDRESS} protocol_version: 2.8.0 scrapers: - brokers - topics - consumers collection_interval: 30s topic_match: ".*" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
# JMX receiver for broker-specific metrics jmx/kafka_broker-1: jar_path: ${env:HOME}/opentelemetry/opentelemetry-jmx-scraper.jar endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS} target_system: kafka collection_interval: 30s jmx_configs: ${env:HOME}/opentelemetry/kafka-jmx-config.yaml resource_attributes: broker.id: "1" broker.endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
processors: batch/aggregation: send_batch_size: 1024 timeout: 30s
resourcedetection: detectors: [env, ec2, system] system: resource_attributes: host.name: enabled: true host.id: enabled: true
resource: attributes: - action: insert key: kafka.cluster.name value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id: metric_statements: - context: resource statements: - delete_key(attributes, "broker.id")
filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
transform/des_units: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [ topic ] aggregation_type: sum
exporters: otlp/newrelic: endpoint: https://otlp.nr-data.net:4317 headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} compression: gzip timeout: 30s
service: pipelines: metrics/brokers-cluster-topics: receivers: [jmx/kafka_broker-1, kafkametrics] processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, batch/aggregation] exporters: [otlp/newrelic]
metrics/jmx-cluster: receivers: [jmx/kafka_broker-1] processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/des_units, cumulativetodelta, batch/aggregation] exporters: [otlp/newrelic]Notas de configuración:
- Punto final OTLP: Usa
https://otlp.nr-data.net:4317(región de EE. UU.) ohttps://otlp.eu01.nr-data.net:4317(región de la UE). Consulte Configure su punto final OTLP para otras regiones
Importante
Para múltiples brokers, agregue receptores JMX adicionales con diferentes puntos finales e ID de broker para monitorear cada broker en su clúster.
Paso 5: Establecer variables de entorno
Establezca las variables de entorno requeridas:
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BROKER_ADDRESS="localhost:9092"$export KAFKA_BROKER_JMX_ADDRESS="localhost:9999"Reemplazar:
YOUR_LICENSE_KEYcon su clave de licencia de New Relicmy-kafka-clustercon un nombre único para su clúster de Kafkalocalhost:9092con la dirección de su servidor de arranque de Kafkalocalhost:9999con su punto final JMX del broker de Kafka
Paso 6: Inicie el recopilador
Ejecute el recopilador directamente (no se necesita sudo):
$# Start the collector with your config$otelcol-contrib --config ~/opentelemetry/config.yamlEl recopilador comenzará a enviar métricas de Kafka a New Relic en unos minutos.
Cree un servicio systemd para la ejecución persistente (requiere sudo para la configuración única):
$# Create systemd service file$sudo tee /etc/systemd/system/otelcol-contrib.service > /dev/null <<EOF$[Unit]$Description=OpenTelemetry Collector for Kafka$After=network.target$
$[Service]$Type=simple$User=$USER$WorkingDirectory=$HOME/opentelemetry$ExecStart=/usr/local/bin/otelcol-contrib --config $HOME/opentelemetry/config.yaml$Restart=on-failure$Environment="NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY"$Environment="KAFKA_CLUSTER_NAME=my-kafka-cluster"$Environment="KAFKA_BROKER_ADDRESS=localhost:9092"$Environment="KAFKA_BROKER_JMX_ADDRESS=localhost:9999"$
$[Install]$WantedBy=multi-user.target$EOFReemplace YOUR_LICENSE_KEY y otros valores, luego habilite e inicie el servicio:
$sudo systemctl daemon-reload$sudo systemctl enable otelcol-contrib$sudo systemctl start otelcol-contrib$sudo systemctl status otelcol-contribPaso 7: (Opcional) Instrumente las aplicaciones de productor o consumidor
Para recopilar telemetría a nivel de aplicación de sus aplicaciones de productor y consumidor de Kafka, use el Agente Java de OpenTelemetry:
Descargue el agente Java:
bash$mkdir -p ~/otel-java$curl -L -o ~/otel-java/opentelemetry-javaagent.jar \>https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarInicie su aplicación con el agente:
bash$java \>-javaagent:~/otel-java/opentelemetry-javaagent.jar \>-Dotel.service.name="kafka-producer-1" \>-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \>-Dotel.exporter.otlp.endpoint=https://otlp.nr-data.net:4317 \>-Dotel.exporter.otlp.protocol="grpc" \>-Dotel.metrics.exporter="otlp" \>-Dotel.traces.exporter="otlp" \>-Dotel.logs.exporter="otlp" \>-Dotel.instrumentation.kafka.experimental-span-attributes="true" \>-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \>-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \>-Dotel.instrumentation.kafka.enabled="true" \>-jar your-kafka-application.jar
Reemplazar:
kafka-producer-1con un nombre único para su aplicación de productor o consumidormy-kafka-clustercon el mismo nombre de clúster utilizado en la configuración de su recolectorhttps://otlp.nr-data.net:4317con su punto final OTLP de New Relic (usehttps://otlp.eu01.nr-data.net:4317para la región de la UE). Para otros endpoints y opciones de configuración, consulte Configure su endpoint OTLP.
El agente Java proporciona instrumentación de Kafka lista para usar sin cambios de código, capturando:
- Latencias de solicitud
- Métricas de rendimiento
- Tasas de error
- Rastreo distribuido
Para una configuración avanzada, consulte la documentación de instrumentación de Kafka.
Paso 6: (Opcional) Reenviar los logs del broker Kafka
Para recopilar logs de agente de Kafka de sus hosts y enviarlos a New Relic, configure el receptor de log de archivos en su OpenTelemetry Collector.
Encuentra tus datos
Después de unos minutos, sus métricas de Kafka deberían aparecer en New Relic. Consulte Encuentre sus datos para obtener instrucciones detalladas sobre cómo explorar sus métricas de Kafka en diferentes vistas en la interfaz de usuario de New Relic.
También puede consultar sus datos con NRQL:
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'Resolución de problemas
Próximos pasos
- Explorar las métricas de Kafka - Vea la referencia completa de métricas
- Crear dashboards personalizados - Cree visualizaciones para sus datos de Kafka
- Configure alertas - Monitoree métricas críticas como el retraso del consumidor y las particiones subreplicadas