• /
  • EnglishEspañolFrançais日本語한국어Português
  • Inicia sesiónComenzar ahora

Te ofrecemos esta traducción automática para facilitar la lectura.

En caso de que haya discrepancias entre la versión en inglés y la versión traducida, se entiende que prevalece la versión en inglés. Visita esta página para obtener más información.

Crea una propuesta

Monitorear Kafka en Kubernetes (Strimzi) con OpenTelemetry

Monitorea tu clúster de Kafka que se ejecuta en Kubernetes con el operador Strimzi implementando el recopilador de OpenTelemetry. El recopilador descubre automáticamente los pods del broker de Kafka y recopila métricas completas.

Antes de que empieces

Asegúrese de tener:

Habilite JMX en Strimzi Kafka

Asegúrese de que su clúster de Kafka tenga JMX habilitado en el recurso de Kafka de Strimzi:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka
spec:
kafka:
jmxOptions: {} # Enables JMX with default settings
# ...other broker configuration

Paso 1: Crear espacio de nombres

Cree un espacio de nombres dedicado para el OpenTelemetry Collector (o use su espacio de nombres de Kafka existente):

bash
$
kubectl create namespace kafka

Paso 2: Cree un secreto con la clave de licencia

Guarde su clave de licencia de New Relic como un secreto de Kubernetes:

bash
$
kubectl create secret generic nr-license-key \
>
--from-literal=NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY \
>
-n kafka

Reemplace YOUR_LICENSE_KEY con su clave de licencia real de New Relic.

Paso 3: Implemente el recolector de OpenTelemetry

3.1 Construir imagen de recopilador personalizada

Cree una imagen personalizada de OpenTelemetry Collector con tiempo de ejecución de Java y el scraper JMX.

Importante

Compatibilidad de versiones: Esta guía utiliza JMX Scraper 1.52.0 y OpenTelemetry Collector 0.143.1. Es posible que las versiones anteriores del recopilador no incluyan el hash de este scraper en su lista de compatibilidad. Para obtener los mejores resultados, utilice las últimas versiones como se muestra en esta guía.

Arquitectura de destino: Consulte la página de versiones de OpenTelemetry Collector para encontrar el binario correcto para la arquitectura de su sistema (por ejemplo, linux_amd64, linux_arm64, darwin_amd64). Actualice la variable TARGETARCH en el Dockerfile en consecuencia.

Guardar como Dockerfile:

# Multi-stage build for OpenTelemetry Collector with Java support for JMX receiver
# This image bundles the OTEL Collector with Java 17 runtime and JMX scraper JAR
FROM alpine:latest as prep
# OpenTelemetry Collector Binary
ARG OTEL_VERSION=0.143.1
ARG TARGETARCH=linux_amd64
ADD "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_${TARGETARCH}.tar.gz" /otelcontribcol
RUN tar -zxvf /otelcontribcol
# JMX Scraper JAR (for JMX receiver with YAML-based configuration)
ARG JMX_SCRAPER_JAR_VERSION=1.52.0
ADD https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v${JMX_SCRAPER_JAR_VERSION}/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
# Set permissions for nonroot user (uid 65532)
ARG USER_UID=65532
RUN chown ${USER_UID} /opt/opentelemetry-jmx-scraper.jar
# Final minimal image with Java runtime
FROM openjdk:17-jre-slim
COPY --from=prep /opt/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
COPY --from=prep /otelcol-contrib /otelcol-contrib
EXPOSE 4317 4318 8888
ENTRYPOINT ["/otelcol-contrib"]
CMD ["--config", "/conf/otel-agent-config.yaml"]

Construya y empuje la imagen:

bash
$
docker build -t your-registry/otel-collector-kafka:latest .
$
docker push your-registry/otel-collector-kafka:latest

3.2 Crear el ConfigMap de métricas personalizadas JMX

Primero, cree un ConfigMap con la configuración de métricas JMX personalizadas. Guardar como jmx-kafka-config.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
name: jmx-kafka-config
namespace: kafka
data:
jmx-kafka-config.yaml: |
---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages in per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: current heap usage
type: gauge
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

Sugerencia

Personalizar la recopilación de métricas: Puede extraer métricas de Kafka adicionales agregando reglas MBean personalizadas al archivo kafka-jmx-config.yaml:

Aplicar el ConfigMap JMX:

bash
$
kubectl apply -f jmx-kafka-config.yaml

3.3 Crear ConfigMap del recopilador

Cree un ConfigMap con la configuración de OpenTelemetry Collector. Guarde como otel-kafka-config.yaml:

---
apiVersion: v1
kind: ConfigMap
metadata:
name: otel-collector-config
namespace: kafka
labels:
app: otel-collector
data:
otel-collector-config.yaml: |
receivers:
# Kafka cluster-level metrics (runs once per OTEL collector)
kafkametrics/cluster:
brokers:
- "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
protocol_version: 2.8.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
# Receiver creator for dynamic per-broker JMX receivers
receiver_creator:
watch_observers: [k8s_observer]
receivers:
# JMX receiver template (created per discovered broker pod)
jmx:
rule: type == "pod" && labels["strimzi.io/kind"] == "Kafka" && labels["strimzi.io/cluster"] == "my-cluster" && labels["strimzi.io/name"] == "my-cluster-kafka"
config:
endpoint: 'service:jmx:rmi:///jndi/rmi://`endpoint`:9999/jmxrmi'
jar_path: /opt/opentelemetry-jmx-scraper.jar
target_system: kafka
jmx_configs: /conf-jmx/jmx-kafka-config.yaml
collection_interval: 30s
# Set dynamic resource attributes from discovered pod
resource_attributes:
broker.endpoint: '`endpoint`'
exporters:
otlp:
endpoint: https://otlp.nr-data.net:4317
tls:
insecure: false
sending_queue:
num_consumers: 12
queue_size: 5000
retry_on_failure:
enabled: true
headers:
api-key: ${NEW_RELIC_LICENSE_KEY}
processors:
# Batch processor for efficiency
batch/aggregation:
send_batch_size: 1024
timeout: 30s
# Memory limiter to prevent OOM
memory_limiter:
limit_percentage: 80
spike_limit_percentage: 30
check_interval: 1s
# Detect system resources
resourcedetection:
detectors: [env, docker, system]
timeout: 5s
override: false
# Add Kafka cluster metadata
resource/kafka_metadata:
attributes:
- key: kafka.cluster.name
value: my-cluster
action: upsert
# Extract Kubernetes attributes
k8sattributes:
auth_type: serviceAccount
passthrough: false
extract:
metadata:
- k8s.pod.name
- k8s.pod.uid
- k8s.namespace.name
- k8s.node.name
labels:
- tag_name: strimzi.cluster
key: strimzi.io/cluster
from: pod
- tag_name: strimzi.kind
key: strimzi.io/kind
from: pod
# Transform metrics for New Relic UI
transform:
metric_statements:
- context: metric
statements:
# Clean up descriptions and units
- set(description, "") where description != ""
- set(unit, "") where unit != ""
- context: resource
statements:
# Extract broker.id from k8s.pod.name: my-cluster-kafka-0 -> 0 (supports multi-digit)
- set(attributes["broker.id"], ExtractPatterns(attributes["k8s.pod.name"], ".*-(?P<broker_id>\\d+)$")["broker_id"]) where attributes["k8s.pod.name"] != nil
# Remove broker.id for cluster-level metrics
transform/remove_broker_id:
metric_statements:
- context: resource
statements:
- delete_key(attributes, "broker.id")
- delete_key(attributes, "broker.endpoint")
- delete_key(attributes, "k8s.pod.name")
# Topic sum aggregation for replicas_in_sync
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [ topic ]
aggregation_type: sum
# Filter to include only cluster-level metrics
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
# Filter to exclude cluster-level metrics from broker pipeline
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
# Convert cumulative metrics to delta for New Relic
cumulativetodelta:
extensions:
# K8s observer extension
k8s_observer:
auth_type: serviceAccount
observe_pods: true
observe_nodes: false
service:
extensions: [k8s_observer]
pipelines:
# Per-broker metrics pipeline (with broker.id)
metrics/broker:
receivers:
- receiver_creator
- kafkametrics/cluster
processors:
- memory_limiter
- resourcedetection
- resource/kafka_metadata
- k8sattributes
- filter/exclude_cluster_metrics
- transform
- metricstransform/kafka_topic_sum_aggregation
- cumulativetodelta
- batch/aggregation
exporters: [otlp]
# Cluster-level metrics pipeline (without broker.id, aggregated)
metrics/cluster:
receivers:
- receiver_creator
processors:
- memory_limiter
- resourcedetection
- resource/kafka_metadata
- k8sattributes
- filter/include_cluster_metrics
- transform/remove_broker_id
- metricstransform/kafka_topic_sum_aggregation
- cumulativetodelta
- batch/aggregation
exporters: [otlp]

Notas de configuración:

  • Reemplace my-cluster-kafka-bootstrap con el nombre de su servicio Strimzi Kafka
  • Reemplace my-cluster en rule y kafka.cluster.name con el nombre de su clúster
  • Actualice el espacio de nombres si es diferente de kafka
  • Punto final OTLP: Usa https://otlp.nr-data.net:4317 (región de EE. UU.) o https://otlp.eu01.nr-data.net:4317 (región de la UE). Consulte Configure su punto final OTLP para otras regiones
  • El receiver_creator descubre automáticamente los pods del broker Kafka utilizando las etiquetas Strimzi

Aplique el ConfigMap:

bash
$
kubectl apply -f otel-kafka-config.yaml

3.4 Implementar el recopilador

Cree la implementación. Guardar como otel-collector-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
name: otel-collector
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: otel-collector
template:
metadata:
labels:
app: otel-collector
spec:
serviceAccountName: otel-collector
containers:
- name: otel-collector
image: your-registry/otel-collector-kafka:latest
env:
- name: NEW_RELIC_LICENSE_KEY
valueFrom:
secretKeyRef:
name: nr-license-key
key: NEW_RELIC_LICENSE_KEY
resources:
limits:
cpu: "1"
memory: "2Gi"
requests:
cpu: "500m"
memory: "1Gi"
volumeMounts:
- name: vol-kafka-test-cluster
mountPath: /conf
- name: jmx-config
mountPath: /conf-jmx
ports:
- containerPort: 4317 # OTLP gRPC
- containerPort: 4318 # OTLP HTTP
- containerPort: 8888 # Metrics
volumes:
- name: vol-kafka-test-cluster
configMap:
name: otel-collector-config
items:
- key: otel-collector-config.yaml
path: otel-agent-config.yaml
- name: jmx-config
configMap:
name: jmx-kafka-config
items:
- key: jmx-kafka-config.yaml
path: jmx-kafka-config.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: otel-collector
namespace: kafka
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: otel-collector
rules:
- apiGroups: [""]
resources: ["pods", "nodes"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: otel-collector
subjects:
- kind: ServiceAccount
name: otel-collector
namespace: kafka
roleRef:
kind: ClusterRole
name: otel-collector
apiGroup: rbac.authorization.k8s.io

Configuración de recursos:

  • Los límites de recursos anteriores son adecuados para clústeres de Kafka de tamaño mediano (5-10 agentes, 20-100 temas)

Aplique la implementación:

bash
$
kubectl apply -f otel-collector-deployment.yaml

Verifique que el recolector se esté ejecutando:

bash
$
kubectl get pods -n kafka -l app=otel-collector
$
kubectl logs -n kafka -l app=otel-collector -f

Paso 4: (Opcional) Instrumente las aplicaciones de productor o consumidor

Para recopilar telemetría a nivel de aplicación de las aplicaciones de productor y consumidor de Kafka que se ejecutan en Kubernetes, instrúmenlas con el Agente Java de OpenTelemetry.

Instrumente su aplicación Kafka

Para instrumentar sus aplicaciones de productor o consumidor de Kafka, agregue el agente Java de OpenTelemetry a su implementación existente:

  1. Descargue el agente Java: Agregue un contenedor init para descargar el JAR del agente:

    initContainers:
    - name: download-otel-agent
    image: busybox:latest
    command:
    - sh
    - -c
    - |
    wget -O /otel/opentelemetry-javaagent.jar \
    https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
    volumeMounts:
    - name: otel-agent
    mountPath: /otel
  2. Configure el agente Java: Agregue variables de entorno al contenedor de su aplicación:

    env:
    - name: JAVA_TOOL_OPTIONS
    value: >-
    -javaagent:/otel/opentelemetry-javaagent.jar
    -Dotel.service.name="kafka-producer"
    -Dotel.resource.attributes="kafka.cluster.name=my-cluster"
    -Dotel.exporter.otlp.endpoint="http://localhost:4317"
    -Dotel.exporter.otlp.protocol="grpc"
    -Dotel.metrics.exporter="otlp"
    -Dotel.traces.exporter="otlp"
    -Dotel.logs.exporter="otlp"
    -Dotel.instrumentation.kafka.experimental-span-attributes="true"
    -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true"
    -Dotel.instrumentation.kafka.producer-propagation.enabled="true"
    -Dotel.instrumentation.kafka.enabled="true"
    volumeMounts:
    - name: otel-agent
    mountPath: /otel
  3. Agregar el volumen: Incluya la definición del volumen:

    volumes:
    - name: otel-agent
    emptyDir: {}

Reemplazar:

  • kafka-producer con un nombre único para su aplicación
  • my-cluster con el nombre de su clúster Kafka

Sugerencia

La configuración anterior envía telemetría a un recopilador OpenTelemetry que se ejecuta en localhost:4317. Implemente su propio recopilador con esta configuración:

receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
exporters:
otlp/newrelic:
endpoint: https://otlp.nr-data.net:4317
headers:
api-key: "${NEW_RELIC_LICENSE_KEY}"
compression: gzip
timeout: 30s
service:
pipelines:
traces:
receivers: [otlp]
exporters: [otlp/newrelic]
metrics:
receivers: [otlp]
exporters: [otlp/newrelic]
logs:
receivers: [otlp]
exporters: [otlp/newrelic]

Esto le permite personalizar el procesamiento, agregar filtros o enrutar a múltiples backends. Para otras configuraciones de endpoints, consulte Configure su endpoint OTLP.

El agente Java proporciona instrumentación de Kafka lista para usar sin cambios de código, capturando:

  • Latencias de solicitud
  • Métricas de rendimiento
  • Tasas de error
  • Rastreo distribuido

Para una configuración avanzada, consulte la documentación de instrumentación de Kafka.

Paso 5: (Opcional) Reenvíe los logs del broker de Kafka

Para recopilar los logs del broker de Kafka de sus pods de Kubernetes y enviarlos a New Relic, configure el receptor de log de archivos en su OpenTelemetry Collector.

Encuentra tus datos

Después de unos minutos, sus métricas de Kafka deberían aparecer en New Relic. Consulte Encuentre sus datos para obtener instrucciones detalladas sobre cómo explorar sus métricas de Kafka en diferentes vistas en la interfaz de usuario de New Relic.

También puede consultar sus datos con NRQL:

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'

Resolución de problemas

Próximos pasos

Copyright © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.