Monitorea tu clúster de Kafka que se ejecuta en Kubernetes con el operador Strimzi implementando el recopilador de OpenTelemetry. El recopilador descubre automáticamente los pods del broker de Kafka y recopila métricas completas.
Antes de que empieces
Asegúrese de tener:
- Una cuenta de New Relic con un
- Clúster de Kubernetes con acceso kubectl
- Kafka implementado a través del operador Strimzi con JMX habilitado
Habilite JMX en Strimzi Kafka
Asegúrese de que su clúster de Kafka tenga JMX habilitado en el recurso de Kafka de Strimzi:
apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-cluster namespace: kafkaspec: kafka: jmxOptions: {} # Enables JMX with default settings # ...other broker configurationPaso 1: Crear espacio de nombres
Cree un espacio de nombres dedicado para el OpenTelemetry Collector (o use su espacio de nombres de Kafka existente):
$kubectl create namespace kafkaPaso 2: Cree un secreto con la clave de licencia
Guarde su clave de licencia de New Relic como un secreto de Kubernetes:
$kubectl create secret generic nr-license-key \> --from-literal=NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY \> -n kafkaReemplace YOUR_LICENSE_KEY con su clave de licencia real de New Relic.
Paso 3: Implemente el recolector de OpenTelemetry
3.1 Construir imagen de recopilador personalizada
Cree una imagen personalizada de OpenTelemetry Collector con tiempo de ejecución de Java y el scraper JMX.
Importante
Compatibilidad de versiones: Esta guía utiliza JMX Scraper 1.52.0 y OpenTelemetry Collector 0.143.1. Es posible que las versiones anteriores del recopilador no incluyan el hash de este scraper en su lista de compatibilidad. Para obtener los mejores resultados, utilice las últimas versiones como se muestra en esta guía.
Arquitectura de destino: Consulte la página de versiones de OpenTelemetry Collector para encontrar el binario correcto para la arquitectura de su sistema (por ejemplo, linux_amd64, linux_arm64, darwin_amd64). Actualice la variable TARGETARCH en el Dockerfile en consecuencia.
Guardar como Dockerfile:
# Multi-stage build for OpenTelemetry Collector with Java support for JMX receiver# This image bundles the OTEL Collector with Java 17 runtime and JMX scraper JAR
FROM alpine:latest as prep
# OpenTelemetry Collector BinaryARG OTEL_VERSION=0.143.1ARG TARGETARCH=linux_amd64ADD "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_${TARGETARCH}.tar.gz" /otelcontribcolRUN tar -zxvf /otelcontribcol
# JMX Scraper JAR (for JMX receiver with YAML-based configuration)ARG JMX_SCRAPER_JAR_VERSION=1.52.0ADD https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v${JMX_SCRAPER_JAR_VERSION}/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
# Set permissions for nonroot user (uid 65532)ARG USER_UID=65532RUN chown ${USER_UID} /opt/opentelemetry-jmx-scraper.jar
# Final minimal image with Java runtimeFROM openjdk:17-jre-slim
COPY /opt/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jarCOPY /otelcol-contrib /otelcol-contrib
EXPOSE 4317 4318 8888ENTRYPOINT ["/otelcol-contrib"]CMD ["--config", "/conf/otel-agent-config.yaml"]Construya y empuje la imagen:
$docker build -t your-registry/otel-collector-kafka:latest .$docker push your-registry/otel-collector-kafka:latest3.2 Crear el ConfigMap de métricas personalizadas JMX
Primero, cree un ConfigMap con la configuración de métricas JMX personalizadas. Guardar como jmx-kafka-config.yaml:
apiVersion: v1kind: ConfigMapmetadata: name: jmx-kafka-config namespace: kafkadata: jmx-kafka-config.yaml: | --- rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages in per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)Sugerencia
Personalizar la recopilación de métricas: Puede extraer métricas de Kafka adicionales agregando reglas MBean personalizadas al archivo kafka-jmx-config.yaml:
Encuentre los nombres de MBean disponibles en la documentación de monitoreo de Kafka
Esto le permite recopilar cualquier métrica JMX expuesta por los brokers de Kafka en función de sus necesidades específicas de monitoreo.
Aplicar el ConfigMap JMX:
$kubectl apply -f jmx-kafka-config.yaml3.3 Crear ConfigMap del recopilador
Cree un ConfigMap con la configuración de OpenTelemetry Collector. Guarde como otel-kafka-config.yaml:
---apiVersion: v1kind: ConfigMapmetadata: name: otel-collector-config namespace: kafka labels: app: otel-collectordata: otel-collector-config.yaml: | receivers: # Kafka cluster-level metrics (runs once per OTEL collector) kafkametrics/cluster: brokers: - "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092" protocol_version: 2.8.0 scrapers: - brokers - topics - consumers collection_interval: 30s metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
# Receiver creator for dynamic per-broker JMX receivers receiver_creator: watch_observers: [k8s_observer] receivers: # JMX receiver template (created per discovered broker pod) jmx: rule: type == "pod" && labels["strimzi.io/kind"] == "Kafka" && labels["strimzi.io/cluster"] == "my-cluster" && labels["strimzi.io/name"] == "my-cluster-kafka" config: endpoint: 'service:jmx:rmi:///jndi/rmi://`endpoint`:9999/jmxrmi' jar_path: /opt/opentelemetry-jmx-scraper.jar target_system: kafka jmx_configs: /conf-jmx/jmx-kafka-config.yaml collection_interval: 30s # Set dynamic resource attributes from discovered pod resource_attributes: broker.endpoint: '`endpoint`'
exporters: otlp: endpoint: https://otlp.nr-data.net:4317 tls: insecure: false sending_queue: num_consumers: 12 queue_size: 5000 retry_on_failure: enabled: true headers: api-key: ${NEW_RELIC_LICENSE_KEY}
processors: # Batch processor for efficiency batch/aggregation: send_batch_size: 1024 timeout: 30s
# Memory limiter to prevent OOM memory_limiter: limit_percentage: 80 spike_limit_percentage: 30 check_interval: 1s
# Detect system resources resourcedetection: detectors: [env, docker, system] timeout: 5s override: false
# Add Kafka cluster metadata resource/kafka_metadata: attributes: - key: kafka.cluster.name value: my-cluster action: upsert
# Extract Kubernetes attributes k8sattributes: auth_type: serviceAccount passthrough: false extract: metadata: - k8s.pod.name - k8s.pod.uid - k8s.namespace.name - k8s.node.name labels: - tag_name: strimzi.cluster key: strimzi.io/cluster from: pod - tag_name: strimzi.kind key: strimzi.io/kind from: pod
# Transform metrics for New Relic UI transform: metric_statements: - context: metric statements: # Clean up descriptions and units - set(description, "") where description != "" - set(unit, "") where unit != ""
- context: resource statements: # Extract broker.id from k8s.pod.name: my-cluster-kafka-0 -> 0 (supports multi-digit) - set(attributes["broker.id"], ExtractPatterns(attributes["k8s.pod.name"], ".*-(?P<broker_id>\\d+)$")["broker_id"]) where attributes["k8s.pod.name"] != nil
# Remove broker.id for cluster-level metrics transform/remove_broker_id: metric_statements: - context: resource statements: - delete_key(attributes, "broker.id") - delete_key(attributes, "broker.endpoint") - delete_key(attributes, "k8s.pod.name")
# Topic sum aggregation for replicas_in_sync metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [ topic ] aggregation_type: sum
# Filter to include only cluster-level metrics filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
# Filter to exclude cluster-level metrics from broker pipeline filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
# Convert cumulative metrics to delta for New Relic cumulativetodelta:
extensions: # K8s observer extension k8s_observer: auth_type: serviceAccount observe_pods: true observe_nodes: false
service: extensions: [k8s_observer]
pipelines: # Per-broker metrics pipeline (with broker.id) metrics/broker: receivers: - receiver_creator - kafkametrics/cluster processors: - memory_limiter - resourcedetection - resource/kafka_metadata - k8sattributes - filter/exclude_cluster_metrics - transform - metricstransform/kafka_topic_sum_aggregation - cumulativetodelta - batch/aggregation exporters: [otlp]
# Cluster-level metrics pipeline (without broker.id, aggregated) metrics/cluster: receivers: - receiver_creator processors: - memory_limiter - resourcedetection - resource/kafka_metadata - k8sattributes - filter/include_cluster_metrics - transform/remove_broker_id - metricstransform/kafka_topic_sum_aggregation - cumulativetodelta - batch/aggregation exporters: [otlp]Notas de configuración:
- Reemplace
my-cluster-kafka-bootstrapcon el nombre de su servicio Strimzi Kafka - Reemplace
my-clusterenruleykafka.cluster.namecon el nombre de su clúster - Actualice el espacio de nombres si es diferente de
kafka - Punto final OTLP: Usa
https://otlp.nr-data.net:4317(región de EE. UU.) ohttps://otlp.eu01.nr-data.net:4317(región de la UE). Consulte Configure su punto final OTLP para otras regiones - El
receiver_creatordescubre automáticamente los pods del broker Kafka utilizando las etiquetas Strimzi
Aplique el ConfigMap:
$kubectl apply -f otel-kafka-config.yaml3.4 Implementar el recopilador
Cree la implementación. Guardar como otel-collector-deployment.yaml:
apiVersion: apps/v1kind: Deploymentmetadata: name: otel-collector namespace: kafkaspec: replicas: 1 selector: matchLabels: app: otel-collector template: metadata: labels: app: otel-collector spec: serviceAccountName: otel-collector containers: - name: otel-collector image: your-registry/otel-collector-kafka:latest env: - name: NEW_RELIC_LICENSE_KEY valueFrom: secretKeyRef: name: nr-license-key key: NEW_RELIC_LICENSE_KEY resources: limits: cpu: "1" memory: "2Gi" requests: cpu: "500m" memory: "1Gi" volumeMounts: - name: vol-kafka-test-cluster mountPath: /conf - name: jmx-config mountPath: /conf-jmx ports: - containerPort: 4317 # OTLP gRPC - containerPort: 4318 # OTLP HTTP - containerPort: 8888 # Metrics volumes: - name: vol-kafka-test-cluster configMap: name: otel-collector-config items: - key: otel-collector-config.yaml path: otel-agent-config.yaml - name: jmx-config configMap: name: jmx-kafka-config items: - key: jmx-kafka-config.yaml path: jmx-kafka-config.yaml---apiVersion: v1kind: ServiceAccountmetadata: name: otel-collector namespace: kafka---apiVersion: rbac.authorization.k8s.io/v1kind: ClusterRolemetadata: name: otel-collectorrules: - apiGroups: [""] resources: ["pods", "nodes"] verbs: ["get", "list", "watch"]---apiVersion: rbac.authorization.k8s.io/v1kind: ClusterRoleBindingmetadata: name: otel-collectorsubjects: - kind: ServiceAccount name: otel-collector namespace: kafkaroleRef: kind: ClusterRole name: otel-collector apiGroup: rbac.authorization.k8s.ioConfiguración de recursos:
- Los límites de recursos anteriores son adecuados para clústeres de Kafka de tamaño mediano (5-10 agentes, 20-100 temas)
Aplique la implementación:
$kubectl apply -f otel-collector-deployment.yamlVerifique que el recolector se esté ejecutando:
$kubectl get pods -n kafka -l app=otel-collector$kubectl logs -n kafka -l app=otel-collector -fPaso 4: (Opcional) Instrumente las aplicaciones de productor o consumidor
Para recopilar telemetría a nivel de aplicación de las aplicaciones de productor y consumidor de Kafka que se ejecutan en Kubernetes, instrúmenlas con el Agente Java de OpenTelemetry.
Instrumente su aplicación Kafka
Para instrumentar sus aplicaciones de productor o consumidor de Kafka, agregue el agente Java de OpenTelemetry a su implementación existente:
Descargue el agente Java: Agregue un contenedor init para descargar el JAR del agente:
initContainers:- name: download-otel-agentimage: busybox:latestcommand:- sh- -c- |wget -O /otel/opentelemetry-javaagent.jar \https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarvolumeMounts:- name: otel-agentmountPath: /otelConfigure el agente Java: Agregue variables de entorno al contenedor de su aplicación:
env:- name: JAVA_TOOL_OPTIONSvalue: >--javaagent:/otel/opentelemetry-javaagent.jar-Dotel.service.name="kafka-producer"-Dotel.resource.attributes="kafka.cluster.name=my-cluster"-Dotel.exporter.otlp.endpoint="http://localhost:4317"-Dotel.exporter.otlp.protocol="grpc"-Dotel.metrics.exporter="otlp"-Dotel.traces.exporter="otlp"-Dotel.logs.exporter="otlp"-Dotel.instrumentation.kafka.experimental-span-attributes="true"-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true"-Dotel.instrumentation.kafka.producer-propagation.enabled="true"-Dotel.instrumentation.kafka.enabled="true"volumeMounts:- name: otel-agentmountPath: /otelAgregar el volumen: Incluya la definición del volumen:
volumes:- name: otel-agentemptyDir: {}
Reemplazar:
kafka-producercon un nombre único para su aplicaciónmy-clustercon el nombre de su clúster Kafka
Sugerencia
La configuración anterior envía telemetría a un recopilador OpenTelemetry que se ejecuta en localhost:4317. Implemente su propio recopilador con esta configuración:
receivers: otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
exporters: otlp/newrelic: endpoint: https://otlp.nr-data.net:4317 headers: api-key: "${NEW_RELIC_LICENSE_KEY}" compression: gzip timeout: 30s
service: pipelines: traces: receivers: [otlp] exporters: [otlp/newrelic] metrics: receivers: [otlp] exporters: [otlp/newrelic] logs: receivers: [otlp] exporters: [otlp/newrelic]Esto le permite personalizar el procesamiento, agregar filtros o enrutar a múltiples backends. Para otras configuraciones de endpoints, consulte Configure su endpoint OTLP.
El agente Java proporciona instrumentación de Kafka lista para usar sin cambios de código, capturando:
- Latencias de solicitud
- Métricas de rendimiento
- Tasas de error
- Rastreo distribuido
Para una configuración avanzada, consulte la documentación de instrumentación de Kafka.
Paso 5: (Opcional) Reenvíe los logs del broker de Kafka
Para recopilar los logs del broker de Kafka de sus pods de Kubernetes y enviarlos a New Relic, configure el receptor de log de archivos en su OpenTelemetry Collector.
Encuentra tus datos
Después de unos minutos, sus métricas de Kafka deberían aparecer en New Relic. Consulte Encuentre sus datos para obtener instrucciones detalladas sobre cómo explorar sus métricas de Kafka en diferentes vistas en la interfaz de usuario de New Relic.
También puede consultar sus datos con NRQL:
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'Resolución de problemas
Próximos pasos
- Explorar las métricas de Kafka - Vea la referencia completa de métricas
- Crear dashboards personalizados - Cree visualizaciones para sus datos de Kafka
- Configure alertas - Monitoree métricas críticas como el retraso del consumidor y las particiones subreplicadas