ETL para carga Dimension Producto. Mas ejemplos de Talend. Uso de logs, metricas y estadisticas.

El siguiente proceso ETL que vamos a abordar es el de la carga de la Dimensión Producto. En esta dimensión se ubicarán, como ya vimos, los diferentes atributos relacionados con el producto que utilizaremos en nuestro sistema BI. Después de la revisión de la estructura física, teniendo en cuenta los origenes de datos, la estructura física de la tabla es la siguiente:

Diseño Fisico Tabla Dimensión Producto en MySql

Los procesos que tendremos que implementar utilizando Talend serán los siguientes (tal y como vimos en una entrada anterior del Blog):

Transformaciones para la creación de la Dimensión Producto

Para el desarrollo del proceso con Talend, vamos a ir un paso mas alla, definiendo información de logs, registro de estadísticas y métricas. Esto nos permitira conocer otra funcionalidad de Talend, y orquestar la ejecución de los jobs (para permitir, por ejemplo, la recuperación de la fecha de ultima ejecución del job para poder buscar los cambios en el maestro de materiales desde la ultima ejecución del proceso de traspaso), controlar los errores de ejecución y notificar por email en el caso de que se produzca algún problema.

El esquema completo del job con todos los pasos sería el siguiente (a continuación veremos de forma detallada cada uno de los pasos incluidos):

Proceso ETL en Talend completo para la dimensión Producto

Los pasos del proceso ETL para llenar la Dimensión Producto serán los siguientes:

1) Ejecución de un prejob que recuperará la fecha de ultima ejecución del proceso y registrara en el log el inicio de la ejecución del job. Además, se lanzará un logCatcher (para recoger las excepciones Java o errores en el proceso), que generará el envio de un email de aviso en el caso de que se produzca algún problema en cualquier paso del job.

2) A partir de la fecha de ultima ejecución, recuperamos del maestro de materiales en el ERP, todas las modificaciones o altas de productos realizadas en la tabla desde dicha fecha.

3) Realizamos una sustitución de valores erroneos o en blanco en los registros seleccionados según los criterios establecidos.

4) Completamos el mapeo de dimensión Producto, llenando el resto de campos que provienen de otras tablas en la base de datos (los campos de lookup),  con sus correspondientes consultas SQL.

5) En el mapeo se podrán generar materiales erróneos que habrá que verificar y corregir en el sistema origen (por tener valores incorrectos). La lista de materiales se introduciran en un fichero excel. Igualmente, contaremos el número de registros leidos desde el sistema origen correctos para guardarlos en las tablas de metricas.

6) Verificamos que realmente haya modificaciones con los datos existentes en la base de datos del DW, y para los registros que si tienen modificaciones (o si son nuevos registros), realizamos la actualización. En principio, no vamos a realizar gestión de Dimensiones Lentamente Cambiantes, sino que siempre tendremos la foto de los datos tal y como están los ficheros maestros en el momento actual (mas adelante si gestionaremos una dimensión con esta casuistica para conocer los componentes de los que dispone Talend para este cometido).

7) Concluimos el proceso guardandonos en el log el correspondiente mensaje de finalización correcta del proceso.

Antes de profundizar y ver en detalle cada unos de los pasos, vamos a ver la forma de activar en Talend la gestión de logs, estadisticas y metricas. El significado de cada uno de estos elementos es el siguiente:

  • Logs: sería el registro de los mensajes de error generados por los procesos, las excepciones Java al ejecutar o los mensajes generados por nosotros utilizando los componentes tDie o tWarn (para generar diferentes tipos de mensajes).
  • Stats (estadísticas): es el registro de la información estadística de los diferentes pasos y componentes que forman un job. Para cada paso, se puede activar el registro de estadísticas seleccionando la opción tStatCatcher Statistics. Se guarda información de cada paso, cuando empieza, cuando termina, su status de ejecución, duración, etc.
  • Meter, volumetrics  (métricas): es el registro de metricas (numero de registros de un flujo) que podemos intercalar en los procesos utilizando el control tFlowMeter.

Talend nos permite trabajar de dos maneras con todos estos registros (se pueden combinar): una sería gestionando nosotros mismos los logs, stats y meters que se generan (recolectandolas en tiempo de ejecución con los componentes tLogCatcher, tStatcatcher o tFlowMeterCatcher en cada job y dandoles el tratamiento oportuno) o bien activar la gestión automática a nivel de cada proyecto o job, seleccionando en que lugar queremos guardar los registros generados(con tres opciones: visualización en consola, registro en ficheros de texto planos o registro en base de datos). Con esta ultima opción, toda la información que se genere quedara registrada en tablas y será mas fácil explotarla para revisión de procesos realizados, orquestación de procesos, verificación de cuellos de botella, corrección de errores, etc.

Para activar la gestión a nivel de proyecto seleccionaremos la opción de menú Archivo –> Edit Project Properties, opción Job Settings, Stats & Logs (tal y como vemos en la imagen):

Opciones de estadísticas y logs a nivel de Proyecto

Estas propiedades quedan habilitadas para todos los jobs del proyecto, aunque luego se pueden ajustar en cada job según nos interese (puede haber jobs para los que no queremos guardar nada de información). Esto lo modificaremos en las propiedades de cada job, en la pestaña Stats & Logs. En principio, mandan las propiedades establecidas en el proyecto, pero se pueden generar excepciones (llevar los logs a un sitio diferente o no generar) para un job en concreto:

Opciones de estadísticas y logs a nivel de Job

Una vez realizada esta aclaración, veamos en detalle cada uno de los pasos del ETL:

1) Ejecución de un prejob que recuperará la fecha de ultima ejecución del proceso y registrara en el log el inicio de la ejecución del job. Además, se lanzará un logCatcher (para recoger las excepciones Java o errores en el proceso), que generará el envio de un email de aviso en el caso de que se produzca algún problema en cualquier paso del job. Los componentes utilizados en este paso inicial son los siguientes:

  • Lanzador Prejob (componente tPrejob): sirve para realizar el lanzamiento de un pretrabajo, anterior al proceso principal.
  • Ultima Fecha Ejecución (componente tMySqlInput): recuperamos de la tabla de registro de logs, la ultima fecha de ejecución correcta del job. Esta fecha nos servira de fecha de referencia para buscar las altas/modificaciones de productos en el ERP desde la ultima ejecución. Sino hay una ejecución anterior del job, construimos una fecha de referencia para lo que sería la carga inicial de la dimensión producto.
  • Mensaje Log Inicio (componente tWarn): genera un mensaje de log indicando que se comienza la ejecución del job.
  • Set Variable Fecha (componente tSetGlobalVar): la fecha de ultima ejecución recuperada en el paso anterior se registra en una variable global que se podrá utilizar posteriormente en cualquier paso del job.
  • Control Errores (componente tLogCatcher): activamos el componente que “escuchara” durante toda la ejecución del job, esperando que se produzca algún tipo de error. En ese momento se activara para recuperar el error y pasarlo al componente siguiente para el envio de un email de notificación.
  • tFlowtoIterate: convertimos el flujo de registros de log a una iteración para poder realizar el envio del correo electrónico.
  • Envio Email Notif (componente tSendMail): generamos el envio de un email de notificación de errores, incluyendo el paso donde se paro el proceso, y el mesaje de error generado. Es una forma de avisar que ha fallado algo en el proceso.

Para ver en detalle como hemos definido cada componente, podeis pulsar en el link de cada componente o para ver la documentación HTML completa generada por Talend pulsando aquí.

2) A partir de la fecha de ultima ejecución, recuperamos del maestro de materiales en el ERP, todas las modificaciones o altas de productos realizadas en la tabla desde dicha fecha.

Job DimProducto - Lectura Maestro Productos Sap

Para ello, utilizamos el componente LECT_MAESTRO_PRODUC TO del tipo tOracleInput (ver detalle aqui ), que nos permite acceder a la base de datos de Sap, que en este caso es Oracle. También podiamos haber realizado la lectura de Sap utilizando los componentes tSAPConnectiony tSAPInput). Observar la sentencia SQL ejecutada en el componente, y ver como combinamos el texto escrito con el uso de una variable global y el uso de funciones de Taled (que también podrían ser Java).

3) Realizamos una sustitución de valores erroneos o en blanco en los registros seleccionados según los criterios establecidos. Utilizando el componente AJUSTE_CAMPOS, del tipo tReplace (ver detalle aqui), con el que podemos buscar valores existentes en los registros recuperados (utilizando cadena regulares) e indicar una cadena de sustitución. Puede ser util para cualquier transformación que haya que realizar para determinados valores, para normalizar, para corregir registros erróneos, etc. En nuestro caso, la vamos a utilizar para sustituir registros sin valor (se podría haber optado también por rechazar dichos registros).

Job DimProducto - Ajuste Campos

4) Completamos el mapeo de dimensión Producto, llenando el resto de campos que provienen de otras tablas en la base de datos (los campos de lookup),  con sus correspondientes consultas SQL. Para ello utilizamos el componente MAPEO_PRODUCTO del tipo tMap (ver detalle aqui ).

Job DimProducto - Mapeo Producto

Con este componente, juntamos en un paso los registros leidos del maestro de materiales, y los valores adicionales que estan en otras tablas del ERP que nos permiten completar cada registro (como son los valores de descripciones, unidades de medida, calculo de equivalencia de unidades de medida, etc).

En este mapeo, utilizamos una funcionalidad de Talend que no habiamos visto, que es realizar un filtrado selectivo de los registros. Esto nos permite generar en el paso dos flujos de datos, uno con los registros correctos, y otro con los registros que no cumplan unas determinadas condiciones. En nuestro caso, los registros que tengan un valor NULL en el campo umren, serán rechazados y pasados a un paso posterior donde serán grabados en un fichero excel para su revisión.

5) En el mapeo se podrán generar materiales erróneos que habrá que verificar por parte de algún usuario y corregir en el sistema origen (por tener valores incorrectos). La lista de materiales se introducira en un fichero excel a través del control  REGISTROS_ERRONEOS del tipo tFileOutputExcel (ver aqui) . Igualmente, utilizando el componente CUENTA_LEIDOS_SAP del tipo tFlowMeter (ver aqui ),contaremos el número de registros correctos leidos desde el sistema origen para guardarlos en las tablas de metricas (para luego poder otener información estadística de los procesos realizados).

6) Verificamos que realmente haya modificaciones con los datos existentes en la base de datos del DW , y para los registros que si tienen modificaciones (o si son nuevos registros), realizamos la actualización. Para ello, utilizamos el componente VERIF_MODIFICACIONES del tipo tMap (ver aqui ) que recibira como flujo principal los registros que hemos leido desde Sap totalmente completados. Por otro lado, recibira como flujo de lookup (ver aqui) los registros que ya tenemos cargados en Myql en la tabla DWD_PRODUCTO (la tabla de la dimensión Producto). Con esta información, podremos realizar la comparación en el mapeo,  realizarando un filtrado selectivo de los registros, pasando al siguiente componente solo aquellos que tienen modificaciones (hay alguna diferencia en alguno de los campos).

Job DimProducto - Verificacion Modificaciones

El código utilizado en el expression filter, que es la condición de filtrado, es el siguiente:

!row11.material_desc.equals(row15.material_desc) ||
row11.familia_id!=row15.familia_id||
!row11.familia_desc.equals(row15.familia_desc)||
!row11.denom_id.equals(row15.denom_id)||
!row11.variet_id.equals(row15.variet_id)||
!row11.formato_id.equals(row15.formato_id)||
!row11.um_id.equals(row15.um_id)||
row11.litros_id!=row15.litros_id||
row11.linprod_id!=row15.linprod_id||
!row11.linprod_desc.equals(row15.linprod_desc)||
!row11.target_id.equals(row15.target_id)

Observar como vamos realizando la comparación campo a campo de los dos flujos, para determinar si hay algún cambio entre los datos ya cargados y los provenientes del ERP.

En principio, no vamos a realizar gestión de Dimensiones Lentamente Cambiantes, sino que siempre tendremos la foto de los datos tal y como están los ficheros maestros en el momento actual (mas adelante si gestionaremos una dimensión con esta casuistica para conocer los componentes de los que dispone Talend para este cometido). Los registros que superan el mapeo, son registrados en la base de datos Mysql utilizando el control ACTUALIZA_DWD_PRODUCTO, del tipo tMySqlOutput (ver detalle aqui ), grabandose como ya hemos dicho en la tabla DWD_PRODUCTO.

Con el componente tFlowMeter, en el paso CUENTA_MODIFICADOS (ver detalle aquí ), registramos en las tablas de metricas el número de registros que van a generar actualización.

7) Concluimos el proceso guardándonos en el log el correspondiente mensaje de finalización correcta del proceso, con el componente MENSAJE_LOG_FIN del tipo tWarn (ver detalle aqui).

Para ver en detalle como hemos definido cada componente, podeís acceder a la documentación HTML completa generada por Talend aquí. Podeis descargaros el fichero zip que contiene dicha documentación aquí.

Conclusiones

Hemos realizado nuestro primer trabajo ETL complejo utilizando Talend. Ha sido un poco complicado entender como funciona la herramienta, como se vinculan y encadenan los diferentes componentes, la forma de pasar la información y los registros entre ellos, como se gestionan los errores, etc. Pero una vez comprendidos los conceptos básicos, se observa el gran potencial que tiene Talend (que ademas puede ser completado con el uso del lenguaje Java por todas partes).  Os recomiendo si quereis ampliar el conocimiento de la herramienta, la lectura del Manual de Usuario de la herramienta y la Guia de Referencia de Componentes (ambos en inglés) en este link y visitar la web del proyecto Open TalendForge, donde podeis encontrar Tutoriales, Foros, gestión de bugs, etc.

También he visto lo importante que es el proceso de análisis de los origenes de información y el detallado de todas las transformaciones que hay que realizar, las excepciones, que criterio se sigue para ellas, que sustituciones realizamos, etc. Quizas deberiamos de haber realizado ese paso mas en profundidad, pues luego al realizar los procesos ETL han aparecido situaciones que no habiamos previsto. Lo tenemos en cuenta para nuestro próximo proyecto.

Igualmente hemos visto que es importante establecer un mecanismo para la gestión de todos los logs de la ejecución de procesos, estadísticas y metricas, pues luego nos serán de utilidad para montar los procesos de automatización, seguimiento y depuración de errores necesarios para la puesta en productivo del proyecto y para su mantenimiento a lo largo del tiempo.

Y como no, la documentación. El hecho de ir documentando y explicando cada componente que utilizamos en Talend (nombrado de los pasos, texto explicativo en cada componente, comentarios en los metadatos, etc), ha permitido que utilizando una funcionalidad estandar de Talend, la generación de documentación HTML, disponer de un repositorio donde poder consultar como estan construidos los procesos, de una forma muy completa y detallada, de cara a la comprensión de como están montados los procesos por parte de terceras personas, para su posterior modificación o mantenimiento en el futuro.

En la siguiente entrada del Blog, detallaremos los procesos ETL para la carga del resto de dimensiones de nuestro proyecto. Posteriormente, abordaremos el proceso ETL para la carga de la tabla de Hechos, que será la mas compleja y para la que habrá que establecer un mecanismo de orquestación para su automatización. En dicha entrada incluiremos igualmente el estudio del particionado de tablas utilizando Mysql, que veremos en profundidad.