Nuestra dbt Cloud integración con Airflow monitorea el estado de sus dbt Cloud trabajos y recursos, ayudándolo a identificar problemas como cuando fallan ejecuciones, modelos o pruebas.
Esta integración se ejecuta en Apache Airflow y consulta a Snowflake sobre cualquier prueba fallida si está configurada para hacerlo.
Requisitos previos
- Cuenta de dbt Cloud con API habilitadas y usando Snowflake como base de datos.
- Acceso a la cuenta Snowflake donde se ejecuta la cuenta dbt Cloud .
- Entorno Airflow existente versión 2.8.1 o superior, o capacidad para ejecutar Docker Compose.
Instalar la integración
Puedes instalar la integración de New Relic dbt Cloud con Airflow de la siguiente manera:
- Instalación en su entorno Airflow existente. Esto se recomienda para entorno de producción.
- Instalación con docker Compose. Esto es adecuado para pruebas de concepto rápidas.
Seleccione la opción más adecuada a sus necesidades haciendo clic en su pestaña:
Cerciorar de tener el proveedor Snowflake y luego clone el repositorio newrelic-dbt-cloud-integration
ejecutando estos comandos:
$pip install apache-airflow-providers-snowflake>=3.0.0
$git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git
Copie el contenido de airflow/dags
a la raíz de su carpeta Airflow dags
Cree las cinco conexiones de flujo de aire necesarias para el DAG. La siguiente tabla proporciona el nombre de la conexión y la información para configurarla. Tenga en cuenta que para todos estos, el tipo es http
:
Nombre de la conexión | Descripción | Tipo | Host y contraseña | |
---|---|---|---|---|
| Le permite conectarse a la API de administración de dbt Cloud con |
| Host: https://cloud.getdbt.com/api/v2/accounts/ACCOUNT\_ID/ (Reemplace Contraseña: Su token de API de dbt Cloud (Configuración de perfil) o un token de cuenta de servicio | |
| Le permite conectarse a la API de descubrimiento de dbt |
| Host: https://metadata.cloud.getdbt.com/graphql Contraseña: token de cuenta de servicios en Dbt Cloud | |
| Permite subir eventos personalizados a New Relic |
| Host: https://insights-collector.newrelic.com/v1/accounts/ACCOUNT\_ID/events (Reemplace Contraseña: Su NR información valiosa insertar clave de API | |
| Te permite consultar el evento New Relic personalizado |
| Host: https://insights-api.newrelic.com/v1/accounts/ACCOUNT\_ID/query (Reemplace Contraseña: Su NR información valiosa consulta clave de API |
Una vez que configuró los cuatro anteriores, deberá configurar la conexión Snowflake. Snowflake le permite consultar filas de prueba fallidas. Hay muchas formas de configurar una conexión de copo de nieve. Para configurar usando un par de claves privadas, complete el siguiente atributo:
Type
: Copo de nieveLogin
: Su nombre de usuario de SnowflakeAccount
: Su cuenta de SnowflakeWarehouse
: Su almacén de SnowflakeRole
: Tu papel de Copo de nieve. El rol debe tener acceso a todas las bases de datos empleadas en dbt Cloud para obtener todas las filas de prueba fallidas.Private Key Text
: la clave privada completa empleada para esta conexión.Password
: frase de contraseña para la clave privada si está cifrada. En blanco si no está cifrado.
Complete la configuración habilitando el new_relic_data_pipeline_observability_get_dbt_run_metadata2
DAG.
Ejecute el siguiente comando para clonar el repositorio newrelic-dbt-cloud-integration
:
$git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git
Luego cd
en el directorio Airflow:
$cd newrelic-dbt-cloud-integration/airflow
Luego inicialice y ejecute docker Compose ejecutando los siguientes comandos:
$docker-compose up airflow-init
$docker-compose up
lanza la UI de Airflow: http://localhost:8080
Cree las cinco conexiones de flujo de aire necesarias para el DAG. La siguiente tabla proporciona el nombre de la conexión y la información para configurarla. Tenga en cuenta que para todos estos, el tipo es http
:
Nombre de la conexión | Descripción | Tipo | Host y contraseña | |
---|---|---|---|---|
| Le permite conectarse a la API de administración de dbt Cloud con |
| Host: https://cloud.getdbt.com/api/v2/accounts/ACCOUNT\_ID/ (Reemplace Contraseña: Su token de API de dbt Cloud (Configuración de perfil) o un token de cuenta de servicio | |
| Le permite conectarse a la API de descubrimiento de dbt |
| Host: https://metadata.cloud.getdbt.com/graphql Contraseña: token de cuenta de servicios en Dbt Cloud | |
| Permite subir eventos personalizados a New Relic |
| Host: https://insights-collector.newrelic.com/v1/accounts/ACCOUNT\_ID/events (Reemplace Contraseña: Su NR información valiosa insertar clave de API | |
| Te permite consultar el evento New Relic personalizado |
| Host: https://insights-api.newrelic.com/v1/accounts/ACCOUNT\_ID/query (Reemplace Contraseña: Su NR información valiosa consulta clave de API |
Una vez que configuró los cuatro anteriores, deberá configurar la conexión Snowflake. Snowflake le permite consultar filas de prueba fallidas. Hay muchas formas de configurar una conexión de copo de nieve. Para configurar usando un par de claves privadas, complete el siguiente atributo:
Type
: Copo de nieveLogin
: Su nombre de usuario de SnowflakeAccount
: Su cuenta de SnowflakeWarehouse
: Su almacén de SnowflakeRole
: Tu papel de Copo de nieve. El rol debe tener acceso a todas las bases de datos empleadas en dbt Cloud para obtener todas las filas de prueba fallidas.Private Key Text
: la clave privada completa empleada para esta conexión.Password
: frase de contraseña para la clave privada si está cifrada. En blanco si no está cifrado.
Complete la configuración habilitando el new_relic_data_pipeline_observability_get_dbt_run_metadata2
DAG.
Encuentra tus datos
Esta integración crea y reporta tres eventos personalizados a New Relic:
Configuración DAG
Conexiones:
Este DAG está diseñado para ejecutar tal como está sin configuración. Al mismo tiempo, sabemos que su compañía puede tener sus propias convenciones de nomenclatura para las conexiones. Como tal, tenemos una configuración simple dentro de dag_config.yml
donde puedes establecer el nombre de las distintas conexiones.
connections: dbt_cloud_admin_api: dbt_cloud_admin_api dbt_cloud_discovery_api: dbt_cloud_discovery_api nr_insights_query: nr_insights_query nr_insights_insert: nr_insights_insert snowflake_api: SNOWFLAKE
Ejecutar equipo:
Los trabajos de dbt pueden ser propiedad de diferentes equipos, pero no hay ningún lugar para configurarlos dentro de dbt Cloud. Podemos usar código Python para configurar dinámicamente el equipo. Para escribir su propio código, modifique airflow/dags/nr_utils/nr_utils.py
y coloque la lógica necesaria en get_team_from_run()
. Los datos de ejecución pasados a esa función tienen acceso al siguiente atributo.
- nombre del proyecto
- nombre_entorno
- Todos los campos enumerados en la API de dbt Cloud v2 para ejecuciones. Todos los atributos van precedidos de "run_"
Aquí hay una función de ejemplo:
def get_team_from_run(run: dict) -> str: team = 'Data Engineering' if run['project_id'] == '11111' and run['environment_id'] in ['55555', '33333']: team = 'Platform' if re.match(r'Catch-all', run['job_name']): team = 'Project Catch All' return team
configuración del proyecto dbt
Dentro del proyecto Dbt, podemos usar la metaconfiguración para establecer un equipo adicional y configuraciones específicas de prueba.
Team
: Si bienrun_team determines
es el propietario de los trabajos, a veces necesitamos equipos ascendentes o descendentes para recibir alertas sobre recursos fallidos, como pruebas y modelos. Configurar el equipo nos ayuda a lograrlo.alert_failed_test_rows
: Configurar enTrue
habilitará las filas de pruebas fallidas donde ejecutamos la consulta para pruebas fallidas y enviamos hasta las primeras 10 columnas a New Relicfailed_test_rows_limit
: Número máximo de filas de prueba fallidas para enviar a New Relic. Tenemos un límite codificado de 100 filas para evitar situaciones en las que enviemos cantidades irrazonables a New Relic.slack_mentions
: si habilita las alertas de holgura, este campo le permite establecer quién debe mencionar en el mensaje.
Configurar esto en dbt_project.yml
configuraría el equipo en "Ingeniería de datos" y habilitaría filas de prueba fallidas.
models: dbt_fake_company: +meta: nr_config: team: 'Data Engineering' alert_failed_test_rows: False failed_test_rows_limit: 5 slack_mentions: '@channel, @business_users'
Podemos agregar otro atributo llamado mensaje a los recursos. En la siguiente configuración, un equipo empresarial asociado puede recibir alertas sobre pruebas fallidas específicas. Además, podemos configurar alertas en las propias filas de prueba fallidas.
models: - name: important_business_model tests: - some_custom_test: config: meta: nr_config: team: 'Upstream Business Team' alert_failed_test_rows: true failed_test_rows_limit: 10 slack_mentions: '@channel, @business_user1, @engineer1' message: 'Important business process produced invalid data. Please check X tool'
Resolución de problemas
Diferentes versiones de Airflow combinadas con diferentes versiones de proveedores pueden inducir cambios importantes. En algunos casos, es posible que necesite modificar el código para que coincida con las versiones específicas de su entorno Airflow. Realizamos un seguimiento de los problemas conocidos en nuestro repositorio de Github.