commit inicial

parents
# Consumidor de Datos de Kafka para el Cálculo Diario de Casos de COVID-19 para los Últimos 30 Días
## Descripción
Este consumidor de Kafka fue creado como parte de la plataforma EpI-PUMA y tiene como objetivo proveer una base de datos que contenga únicamente los casos confirmados de COVID-19 en México para los últimos 30 días agrupados por fecha, municipio, grupo de edad y tipo de caso.
La base de datos generada por este conector fue pensada como una herramienta para demostrar la flexibilidad del sistema Apache Kafka, así como para facilitar la comparación de las predicciones realizadas por los modelos estadísticos de la plataforma EpI-PUMA con los datos reales de los casos de COVID-19 en México.
## Base de Datos
Este conector genera una base de datos de PostgreSQL de manera automática. La base de datos generada posee la siguiente estructura:
![Screenshot](datos/database.png)
## Requisitos
Para poder ejecutar el proceso de carga de datos en Kafka es necesario tener instalados los siguientes paquetes en el sistema donde se ejecutará el proceso:
- Python 3 y Pip
Además, en el mismo sistema o en uno dedicado se debe tener instalado y configurado un servidor de Apache Kafka con un tópico alimentado por el productor [KafkaCovid19](https://git.c3.unam.mx/alonso.ballesteros/KafkaCovid19).
## Instalación
Basta con ejecutar el siguiente comando para instalar las librerías necesarias para ejecutar el proceso de carga de datos en Kafka:
```sh
pip install -r requirements.txt
```
## Configuración
Los ajustes del conector se realizan en el archivo **conector.py**. Estos ajustes deben realizarse antes de ejecutar el conector, ya que de lo contrario fallará con una excepción.
### Base de datos de SQLite
- Es simplemente el nombre del archivo que se usará para guardar la base de datos local. Esta base se emplea como un buffer para guardar los registros que se reciben de Kafka antes de guardarlos en la base de datos de PostgreSQL.
- No es necesario crear el archivo de la base de datos, puesto que el conector lo creará de manera automática.
```python
# ----------- Datos SQLite ------------
SQ_ARCHIVO = 'datos.db'
```
### Base de datos de PostgreSQL
- Debe ser una base de datos de PostgreSQL completamente vacía, puesto que el conector creará las tablas necesarias de manera automática.
```python
# --------- Datos PostgreSQL ----------
PG_HOST = 'base.datos.com'
PG_PUERTO = 5433
PG_USUARIO = 'usuario'
PG_CLAVE = 'clave'
PG_BASE_DATOS = 'epi_puma'
```
### Apache Kafka
- En el campo de **KAFKA_BOOTSTRAP_SERVER** se debe omitir el protocolo **http://**.
- El campo **KAFKA_TOPIC** debe coincidir con el nombre del tópico que alimenta el productor [KafkaCovid19](https://git.c3.unam.mx/alonso.ballesteros/KafkaCovid19).
- Se sugiere que el campo **KAFKA_GROUP_ID** sea único para este consumidor de Kafka.
```python
# ------------ Datos Kafka ------------
KAFKA_BOOTSTRAP_SERVER = 'servidor.kafka:9092'
KAFKA_SCHEMA_REGISTRY = 'http://servidor.kafka:8081'
KAFKA_GROUP_ID = 'covid19_epi_puma_30d'
KAFKA_TOPIC = 'covid19'
KAFKA_PARTITION = 0
```
### Conector
- El campo **ARCHIVO_FECHA** denota el nombre del archivo que se usará para guardar la fecha del último registro que se guardó en la base de datos de PostgreSQL. Se recomienda no modificar este campo.
- El campo **FECHA_INICIAL** denota la fecha a partir de la cual se comenzarán a guardar los registros en la base de datos de PostgreSQL. El valor se expresa en el formato **AAAA-MM-DD**.
- El campo **TIEMPO_PURGA_REGISTROS** denota el tiempo que esperará el consumidor sin recibir nuevos registros antes de guardar los registros que tiene en memoria en la base de datos. El valor se expresa en segundos.
- El campo **LIMITE_CARGA_REGISTROS** denota el número máximo de registros que se guardarán en memoria antes de hacerlos persistentes en la base de datos, esto se realiza de manera transaccional. El valor se expresa en número de registros.
```python
ARCHIVO_FECHA = 'fecha.txt'
FECHA_INICIAL = '2021-03-01'
TIEMPO_PURGA_REGISTROS = 60
LIMITE_CARGA_REGISTROS = 100000
```
## Conector
El conector posee una interfaz de línea de comandos que permite ejecutarlo de manera sencilla. La interfaz de línea de comandos se muestra a continuación:
```sh
python3 conector.py [-h] {estatus,iniciar,detener}
positional arguments:
{estatus,iniciar,detener}
optional arguments:
-h, --help show this help message and exit
```
## Ejecución
Para ejecutar el consumidor de Kafka basta con ingresar el siguiente comando:
```sh
python3 conector.py iniciar
```
Con este comando el consumidor de Kafka se ejecutará en segundo plano y se mantendrá en ejecución hasta que se ingrese el siguiente comando:
```sh
python3 conector.py detener
```
Además, se puede obtener el estatus del consumidor de Kafka con el siguiente comando:
```sh
python3 conector.py estatus
```
El conector guardará un archivo de registros en el archivo **conector.log**. Este archivo se encuentra en el directorio donde se ejecuta el conector.
\ No newline at end of file
This diff is collapsed. Click to expand it.
''' Clase para facilitar la creación de demonios '''
import os
import sys
import log
import time
import signal
import daemon
import argparse
from enum import Enum
from pid import PidFile
from pathlib import Path
from typing import Callable
class Resultado(Enum):
EXITO = 0
ERROR = 1
@classmethod
def negar(cls, resultado: 'Resultado'):
return cls.EXITO if resultado == cls.ERROR else cls.ERROR
class Demonio:
def __init__(self, inicio: Callable, ciclo: Callable, archivo_pid: str, archivo_log: str):
self.ruta = Path(__file__).resolve().parent
self.archivo_pid = archivo_pid
self.archivo_log = archivo_log
# La funcion de inicio se ejecuta una sola vez al iniciar el proceso
self.inicio = inicio
# La funcion de ciclo se ejecuta en un ciclo infinito
self.ciclo = ciclo
@property
def corriendo(self):
return (self.ruta / self.archivo_pid).exists()
def estatus(self) -> Resultado:
if self.corriendo:
print('El proceso se esta ejecutando')
return Resultado.EXITO
else:
print('El proceso no se esta ejecutando')
return Resultado.ERROR
def iniciar(self) -> Resultado:
if self.corriendo:
return Resultado.negar(self.estatus())
demonio = daemon.DaemonContext \
(
stdout= open(self.ruta / self.archivo_log, 'a+'),
stderr= open(self.ruta / self.archivo_log, 'a+'),
pidfile= PidFile(self.archivo_pid, self.ruta)
)
with demonio:
# Se redirigen los logs
log.configurar_logs()
sys.stdout = log.LogHandle(log.INFO)
sys.stderr = log.LogHandle(log.ERROR)
# Se muestra el mensaje de inicio
print('Proceso ejecutandose en segundo plano')
print(f'ID del proceso: {os.getpid()}')
# Se ejecuta la función de inicio
self.inicio()
# Se ejecuta la función de ciclo
while True:
try:
self.ciclo()
except KeyboardInterrupt:
print('Proceso detenido por el usuario')
return Resultado.EXITO
except Exception as e:
log.error('Proceso detenido debido a un error')
log.error(e)
return Resultado.ERROR
def detener(self):
if not self.corriendo:
return self.estatus()
# Se obtiene el PID del proceso
with (self.ruta / self.archivo_pid).open() as archivo:
pid = archivo.read()
print('Deteniendo proceso...')
# Se detiene el proceso
while self.corriendo:
try:
os.kill(int(pid), signal.SIGINT)
time.sleep(1)
except:
log.error('No se pudo detener el proceso')
return Resultado.ERROR
print('Proceso detenido')
return Resultado.EXITO
def interfaz(self):
parser = argparse.ArgumentParser()
parser.add_argument('accion', choices= ['estatus', 'iniciar', 'detener'])
args = parser.parse_args()
if args.accion == 'estatus':
resultado = self.estatus()
elif args.accion == 'iniciar':
resultado = self.iniciar()
elif args.accion == 'detener':
resultado = self.detener()
sys.exit(resultado.value)
\ No newline at end of file
''' Permite llevar el control de una fecha mediante un archivo de texto '''
# Librerías
import pandas as pd
from pathlib import Path
class Fecha:
''' Representa una fecha almacenada en un archivo de texto '''
def __init__(self, archivo: str, fecha = pd.Timestamp.now()):
ruta = Path(__file__).resolve().parent
self.archivo = ruta / archivo
if not self.archivo.exists():
self.guardar(fecha)
def __lt__(self, other: pd.Timestamp):
''' Regresa verdadero si la fecha almacenada en el archivo es menor a la fecha dada '''
return self.actual.date() < other.date()
@property
def actual(self):
''' Regresa la fecha almacenada en el archivo '''
return pd.Timestamp(self.archivo.read_text())
def guardar(self, fecha: pd.Timestamp):
''' Guarda la fecha dada en el archivo '''
self.archivo.write_text(str(fecha.strftime('%Y-%m-%d')))
def avanzar(self, dias: int = 1):
''' Avanza la fecha almacenada en el archivo '''
self.guardar(self.actual + pd.Timedelta(days = dias))
\ No newline at end of file
import pandas as pd
import sqlalchemy as sa
from typing import List
# Módulos locales
from modelos import Conexion, RegistroKafka, GRUPO_EDAD, TIPO_VALOR
from modelos import Estado, Municipio, GrupoEdad, TipoValor, DetalleValor, ValorReal
def generar_dataframe(registros: List[RegistroKafka]):
''' Genera un dataframe a partir de los valores de una lista de registros '''
# Si no hay registros, se regresa un dataframe vacío
if len(registros) == 0:
return pd.DataFrame()
# Se crea el dataframe con los registros y las columnas especificadas
df = pd.DataFrame \
(
[registro.__dict__ for registro in registros],
columns =
[
'ID_REGISTRO',
'FECHA_ACTUALIZACION',
'ENTIDAD_RES', 'MUNICIPIO_RES',
'FECHA_SINTOMAS', 'FECHA_DEF',
'EDAD', 'CLASIFICACION_FINAL'
]
)
# Se convierten los campos de fecha
df['FECHA_ACTUALIZACION'] = pd.to_datetime(df['FECHA_ACTUALIZACION'])
df['FECHA_SINTOMAS'] = pd.to_datetime(df['FECHA_SINTOMAS'])
df['FECHA_DEF'] = pd.to_datetime(df['FECHA_DEF'], format='AAAA-MM-DD', errors = 'coerce')
# Función 'resultado' que interpreta la columna de 'CLASIFICACION_FINAL' y 'FECHA_DEF'
def resultado(valor):
if valor in [1, 2, 3]:
return 'CONFIRMADO'
elif valor in [4, 5]:
return 'INVIABLE'
elif valor in [6]:
return 'SOSPECHOSO'
elif valor in [7]:
return 'NEGATIVO'
else:
return 'DESCONOCIDO'
# Se crea una nueva columna 'RESULTADO' y se elimina la columna 'CLASIFICACION_FINAL'
df['RESULTADO'] = df['CLASIFICACION_FINAL'].apply(resultado)
del df['CLASIFICACION_FINAL']
# Se crea una nueva columna 'GRUPO_EDAD' con los grupos de edad
grupos = [17, 29, 39, 49, 59, 1000000]
etiquetas = [str(grupo) for grupo in GRUPO_EDAD]
df['GRUPO_EDAD'] = pd.cut(df['EDAD'], grupos, labels = etiquetas)
# Se reemplazan todos los valores de 'NaT' por nulos
df.replace({pd.NaT: None}, inplace = True)
return df
def calcular_confirmados(registros: pd.DataFrame, tipo: TIPO_VALOR, fecha: pd.Timestamp):
''' Calcula el número de confirmados en los últimos 30 días a partir de la fecha dada '''
# Se filtran los datos positivos para la fecha dada según el pivote
confirmados = pd.DataFrame()
fecha_inicial = fecha - pd.Timedelta(days = 30)
datos_periodo = registros[registros[tipo.value] >= fecha_inicial]
positivos = datos_periodo[datos_periodo['RESULTADO'] == 'POSITIVO']
# Se calcula el número de registros confirmados agrupados por entidad y municipio
for grupo in GRUPO_EDAD:
grupo_confirmados = positivos[positivos['GRUPO_EDAD'] == str(grupo)]
grupo_confirmados = grupo_confirmados.groupby(['ENTIDAD_RES', 'MUNICIPIO_RES'])
grupo_confirmados = grupo_confirmados.size().reset_index(name = 'CONFIRMADOS')
grupo_confirmados['GRUPO_EDAD'] = str(grupo)
confirmados = pd.concat([confirmados, grupo_confirmados])
return confirmados
def obtener_foraneas_general(conexion: Conexion, datos: pd.DataFrame):
'''
Obtiene las llaves foráneas de los siguientes campos:
- ENTIDAD_RES -> 'fk_estado'
- MUNICIPIO_RES -> 'fk_municipio'
- GRUPO_EDAD -> 'fk_grupo_edad'
- TIPO_VALOR -> 'fk_tipo_valor'
'''
# Se obtienen las llaves foráneas de los estados
estados = pd.read_sql(conexion.sesion.query(Estado).statement, conexion.enlace)
estados = estados[['id', 'id_dge']]
estados.columns = ['fk_estado', 'ENTIDAD_RES']
datos = pd.merge(datos, estados, on = 'ENTIDAD_RES', how = 'left')
# Se obtienen las llaves foráneas de los municipios
municipios = pd.read_sql(conexion.sesion.query(Municipio).statement, conexion.enlace)
municipios = municipios[['id', 'id_dge', 'fk_estado']]
municipios.columns = ['fk_municipio', 'MUNICIPIO_RES', 'fk_estado']
datos = pd.merge(datos, municipios, on = ['fk_estado', 'MUNICIPIO_RES'], how = 'left')
# Se obtienen las llaves foráneas de los grupos de edad
grupos_edad = pd.read_sql(conexion.sesion.query(GrupoEdad).statement, conexion.enlace)
grupos_edad = grupos_edad[['id', 'nombre']]
grupos_edad.columns = ['fk_grupo_edad', 'GRUPO_EDAD']
datos = pd.merge(datos, grupos_edad, on = 'GRUPO_EDAD', how = 'left')
# Se obtienen las llaves foráneas de los tipos de valor
tipos_valor = pd.read_sql(conexion.sesion.query(TipoValor).statement, conexion.enlace)
tipos_valor = tipos_valor[['id', 'tipo']]
tipos_valor.columns = ['fk_tipo_valor', 'TIPO_VALOR']
datos = pd.merge(datos, tipos_valor, on = 'TIPO_VALOR', how = 'left')
# Se eliminan los renglones cuyo municipio no se encuentran en la base de datos
datos.dropna(subset = ['fk_municipio'], inplace = True)
# Se asigna la columna de 'valor' con los valores de la columna 'CONFIRMADOS'
datos['valor'] = datos['CONFIRMADOS']
return datos
def obtener_foraneas_detalles(conexion: Conexion, datos: pd.DataFrame):
''' Obtiene la llave foránea de la tabla de detalles '''
# Se obtienen las llaves foráneas de la tabla de detalles
detalle_valores = pd.read_sql(conexion.sesion.query(DetalleValor).statement, conexion.enlace)
detalle_valores.rename(columns = {'id': 'fk_detalle_valor'}, inplace = True)
detalle_valores['fecha'] = pd.to_datetime(detalle_valores['fecha'])
interseccion = ['fecha', 'fk_municipio', 'fk_grupo_edad', 'fk_tipo_valor']
return pd.merge(datos, detalle_valores, on = interseccion, how = 'left')
def cargar_valores_reales(conexion: Conexion, registros: pd.DataFrame, fecha: pd.Timestamp):
''' Carga los valores reales en la base de datos '''
datos = []
# Se obtienen los casos confirmados para cada tipo de valor
for tipo in TIPO_VALOR:
confirmados = calcular_confirmados(registros, tipo, fecha)
confirmados['TIPO_VALOR'] = str(tipo)
datos.append(confirmados)
# Se concatenan los datos de los diferentes tipos de valor y se agrega la fecha
confirmados = pd.concat(datos)
confirmados['fecha'] = fecha
# Se obtienen las llaves foráneas generales
confirmados = obtener_foraneas_general(conexion, confirmados)
# Se insertan los datos de la tabla de detalles en la base de datos
conexion.sesion.execute(sa.insert(DetalleValor), confirmados.to_dict(orient = 'records'))
# Se obtienen las llaves foráneas de la tabla de detalles
confirmados = obtener_foraneas_detalles(conexion, confirmados)
# Se insertan los valores reales en la base de datos
conexion.sesion.execute(sa.insert(ValorReal), confirmados.to_dict(orient = 'records'))
\ No newline at end of file
''' Contiene la clase 'Conexión' que representa la conexión de un consumidor de Kafka a un tópico '''
from typing import Optional, Callable, Any
# Confluent Kafka
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka import Consumer, TopicPartition, OFFSET_STORED
from confluent_kafka.serialization import SerializationContext, MessageField
class Conexion:
''' Representa la conexión de un consumidor de Kafka a un tópico '''
def __init__ \
(
self,
bootrap_servers: str,
group_id: str,
schema_registry_url: str,
topico_kafka: str,
particion_kafka: int,
from_dict: Callable[[dict, Any], Optional[Any]]
):
# Se inicializa el esquema y el deserializador de Avro
self.schema_registry_client = SchemaRegistryClient({'url': schema_registry_url})
self.avro_deserializer = AvroDeserializer(self.schema_registry_client, from_dict = from_dict)
# Se crea el consumidor de Kafka
self.consumer_settings = \
{
'bootstrap.servers': bootrap_servers,
'group.id': group_id,
'auto.offset.reset': 'earliest'
}
self.consumer = Consumer(self.consumer_settings)
# Se asigna el consumidor a la partición del tópico y se crea el contexto de serialización
self.topic_partition = TopicPartition(topico_kafka, particion_kafka, OFFSET_STORED)
self.serialization_context = SerializationContext(self.topic_partition.topic, MessageField.VALUE)
self.consumer.assign([self.topic_partition])
# Se usa para almacenar el indice local del primer mensaje leído por el consumidor de Kafka
self.indice_local = None
def cerrar(self):
''' Cierra el consumidor de Kafka '''
self.consumer.close()
def reasignar(self):
''' Reasigna el consumidor de Kafka si se ha perdido la conexión '''
try:
# Esta función devuelve un 'RuntimeError' si el consumidor de Kafka se ha desconectado
self.consumer.consumer_group_metadata()
except RuntimeError as e:
# Basta con crear un nuevo consumidor de Kafka para reasignarlo
self.consumer = Consumer(self.consumer_settings)
self.consumer.assign([self.topic_partition])
def mover_indice_remoto(self, indice: Optional[int]):
''' Posiciona el consumidor de Kafka en el indice indicado '''
# No se hace nada si el indice es nulo
if indice is None:
return
self.reasignar()
self.consumer.unassign()
self.topic_partition.offset = indice
self.consumer.commit(offsets=[self.topic_partition])
self.consumer.assign([self.topic_partition])
def indice_mensaje_remoto(self) -> Optional[int]:
'''
Obtiene el indice del ultimo mensaje leído por consumidor de Kafka
- Si no se ha leído ningún mensaje, devuelve nulo
'''
self.reasignar()
indice = self.consumer.position([self.topic_partition])[0].offset
return indice if indice > 0 else None
def regresar_indice_remoto(self):
'''
Regresa el indice remoto del consumidor para que coincida con el indice local
- Se usa para revertir el flujo de datos al punto de partida (indice local)
'''
self.mover_indice_remoto(self.indice_local)
def avanzar_indice_local(self):
'''
Avanza el indice local del consumidor para que coincida con el indice remoto
- Se usa para actualizar el punto de partida (indice local) del flujo de datos
'''
self.indice_local = self.indice_mensaje_remoto()
def obtener_registro(self) -> Optional[Any]:
''' Obtiene el siguiente registro desde el consumidor de Kafka '''
self.reasignar()
# Se lee el siguiente mensaje del consumidor de Kafka
mensaje = self.consumer.poll(1)
# Se regresa nulo si no se ha leído ningún mensaje o si el mensaje contiene un error
if mensaje is None or mensaje.error() is not None:
return None
# Se actualiza el indice del primer mensaje leído
if self.indice_local is None:
self.indice_local = self.indice_mensaje_remoto() - 1
# Se deserializa el mensaje y se regresa el registro
return self.avro_deserializer(mensaje.value(), self.serialization_context)
\ No newline at end of file
''' Utilidades para el manejo de logs '''
import logging
# Niveles de logs
INFO = logging.INFO
ERROR = logging.ERROR
class LogHandle:
''' Permite redirigir la salida estándar y de error a un logger '''
def __init__(self, level: int):
self.level = level
self.message = ''
def write(self, message: str):
self.message = self.message + message
while '\n' in self.message:
index = self.message.index('\n')
logging.log(self.level, self.message[:index])
self.message = self.message[(index + 1):]
def flush(self):
pass
def configurar_logs():
''' Configura los logs con el formato deseado '''
# Se configura el logger
logging.basicConfig \
(
format = '[%(asctime)-19s] [%(levelname)-8s]: %(message)s',
datefmt= '%d/%m/%Y %H:%M:%S',
level= logging.DEBUG
)
# Se desactivan los logs para la librería 'pid'
logging.getLogger('PidFile').setLevel(logging.WARNING)
# Se desactivan los logs de la librería 'requests'
logging.getLogger('urllib3').setLevel(logging.WARNING)
def error(mensaje: str):
''' Muestra un mensaje de error '''
logging.error(mensaje)
\ No newline at end of file
This diff is collapsed. Click to expand it.
pandas==2.0.2
fastavro==1.7.4
openpyxl==3.1.2
SQLAlchemy==2.0.16
GeoAlchemy2==0.13.3
psycopg2-binary==2.9.6
confluent-kafka==2.1.1
python-daemon==3.0.1
pid==3.0.4
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment