commit inicial

parents
# Consumidor de Datos de Kafka para la Plataforma EpI-PUMA
## Descripción
Este consumidor de Kafka fue creado como parte de la plataforma EpI-PUMA y tiene como objetivo reemplazar el proceso de carga de datos actuales de la plataforma. El proceso actual de carga de datos se realiza de la siguiente manera:
- Bajar diariamente el archivo actualizado con los casos de COVID-19 desde el sitio de la Dirección General de Epidemiología.
- Realizar un proceso de ETL y reconstruir la base de datos completa.
Este proceso presentó dos problemas importantes:
- El proceso de ETL es lento y requiere de mucho tiempo de procesamiento (al rededor de 5 horas)
- La DGE hace cambios en el conjunto de datos que incluye en sus archivos sin previo aviso, lo cual rompe el proceso de ETL y provoca que falten registros en la base de datos de la plataforma EpI-PUMA.
Por otro lado, la solución propuesta por este consumidor de Kafka realiza las siguientes tareas:
- Tomar los datos de los casos de COVID-19 en México desde un tópico de Apache Kafka, el cual debe ser alimentado diariamente mediante el productor [KafkaCovid19](https://git.c3.unam.mx/alonso.ballesteros/KafkaCovid19).
- Cargar los registros en la base de datos de PostgreSQL de la plataforma EpI-PUMA conforme se van generando en el tópico de Kafka.
El proceso propuesto tiene las siguientes ventajas respecto al proceso actual:
- El proceso de carga de datos es mucho más rápido (al rededor de 5 minutos).
- Los cambios en el contenido de los archivos de datos no afectan el proceso de carga de los registros, ya que son manejados independientemente por el productor [KafkaCovid19](https://git.c3.unam.mx/alonso.ballesteros/KafkaCovid19).
- Al consumir de un tópico de Kafka, cualquier aplicación que requiera datos de este tipo, puede tener acceso a los datos de los casos de COVID-19 en México sin realizar un costoso proceso de ETL.
## Requisitos
Para poder ejecutar el proceso de carga de datos en Kafka es necesario tener instalados los siguientes paquetes en el sistema donde se ejecutará el proceso:
- Python 3 y Pip
Además, en el mismo sistema o en uno dedicado se debe tener instalado y configurado un servidor de Apache Kafka con un tópico alimentado por el productor [KafkaCovid19](https://git.c3.unam.mx/alonso.ballesteros/KafkaCovid19).
## Instalación
Basta con ejecutar el siguiente comando para instalar las librerías necesarias para ejecutar el proceso de carga de datos en Kafka:
```sh
pip install -r requirements.txt
```
## Configuración
Los ajustes del conector se realizan en el archivo **conector.py**. Estos ajustes deben realizarse antes de ejecutar el conector, ya que de lo contrario fallará con una excepción.
### Base de datos de PostgreSQL
- Debe ser una base de datos de PostgreSQL con el esquema de la plataforma EpI-PUMA.
```python
# --------- Datos PostgreSQL ----------
PG_HOST = 'base.datos.com'
PG_PUERTO = 5433
PG_USUARIO = 'usuario'
PG_CLAVE = 'clave'
PG_BASE_DATOS = 'epi_puma'
```
### Apache Kafka
- En el campo de **KAFKA_BOOTSTRAP_SERVER** se debe omitir el protocolo **http://**.
- El campo **KAFKA_TOPIC** debe coincidir con el nombre del tópico que alimenta el productor [KafkaCovid19](https://git.c3.unam.mx/alonso.ballesteros/KafkaCovid19).
- Se sugiere que el campo **KAFKA_GROUP_ID** sea único para este consumidor de Kafka.
```python
# ------------ Datos Kafka ------------
KAFKA_BOOTSTRAP_SERVER = 'servidor.kafka:9092'
KAFKA_SCHEMA_REGISTRY = 'http://servidor.kafka:8081'
KAFKA_GROUP_ID = 'covid19_epi_puma'
KAFKA_TOPIC = 'covid19'
KAFKA_PARTITION = 0
```
### Conector
- El campo **TIEMPO_PURGA_REGISTROS** denota el tiempo que esperará el consumidor sin recibir nuevos registros antes de guardar los registros que tiene en memoria en la base de datos. El valor se expresa en segundos.
- El campo **LIMITE_CARGA_REGISTROS** denota el número máximo de registros que se guardarán en memoria antes de hacerlos persistentes en la base de datos, esto se realiza de manera transaccional. El valor se expresa en número de registros.
```python
TIEMPO_PURGA_REGISTROS = 60
LIMITE_CARGA_REGISTROS = 100000
```
## Conector
El conector posee una interfaz de línea de comandos que permite ejecutarlo de manera sencilla. La interfaz de línea de comandos se muestra a continuación:
```sh
python3 conector.py [-h] {estatus,iniciar,detener}
positional arguments:
{estatus,iniciar,detener}
optional arguments:
-h, --help show this help message and exit
```
## Ejecución
Para ejecutar el consumidor de Kafka basta con ingresar el siguiente comando:
```sh
python3 conector.py iniciar
```
Con este comando el consumidor de Kafka se ejecutará en segundo plano y se mantendrá en ejecución hasta que se ingrese el siguiente comando:
```sh
python3 conector.py detener
```
Además, se puede obtener el estatus del consumidor de Kafka con el siguiente comando:
```sh
python3 conector.py estatus
```
El conector guardará un archivo de registros en el archivo **conector.log**. Este archivo se encuentra en el directorio donde se ejecuta el conector.
\ No newline at end of file
This diff is collapsed. Click to expand it.
ID_ESTADO,ESTADO,LATITUD,LONGITUD
1,AGUASCALIENTES,21.875624,-102.29252
2,BAJA CALIFORNIA,31.871824,-116.602212
3,BAJA CALIFORNIA SUR,24.175677,-110.290567
4,CAMPECHE,19.817206,-90.526163
5,COAHUILA DE ZARAGOZA,25.521479,-103.446084
6,COLIMA,19.242548,-103.731729
7,CHIAPAS,16.750743,-93.129922
8,CHIHUAHUA,28.621999,-106.082803
9,CIUDAD DE MÉXICO,19.413606,-99.106485
10,DURANGO,24.000158,-104.68754
11,GUANAJUATO,21.114075,-101.677286
12,GUERRERO,17.543334,-99.507486
13,HIDALGO,20.111499,-98.727457
14,JALISCO,20.677866,-103.341715
15,MÉXICO,19.273662,-99.655802
16,MICHOACÁN DE OCAMPO,19.703472,-101.193888
17,MORELOS,18.915488,-99.232828
18,NAYARIT,21.503013,-104.879801
19,NUEVO LEÓN,25.665149,-100.314982
20,OAXACA,17.060831,-96.71696
21,PUEBLA,19.034964,-98.194621
22,QUERÉTARO,20.590472,-100.386393
23,QUINTANA ROO,19.568954,-88.048747
24,SAN LUIS POTOSÍ,22.14554,-100.968669
25,SINALOA,24.795564,-107.395671
26,SONORA,29.069472,-110.955241
27,TABASCO,17.998432,-92.899211
28,TAMAULIPAS,23.723857,-99.150432
29,TLAXCALA,19.322913,-98.228267
30,VERACRUZ DE IGNACIO DE LA LLAVE,19.534984,-96.924327
31,YUCATÁN,20.959942,-89.601939
32,ZACATECAS,22.767738,-102.581599
36,ESTADOS UNIDOS MEXICANOS,23.634501,-102.552784
97,NO APLICA,0,0
98,SE IGNORA,0,0
99,NO ESPECIFICADO,0,0
This source diff could not be displayed because it is too large. You can view the blob instead.
''' Clase para facilitar la creación de demonios '''
import os
import sys
import log
import time
import signal
import daemon
import argparse
from enum import Enum
from pid import PidFile
from pathlib import Path
from typing import Callable
class Resultado(Enum):
EXITO = 0
ERROR = 1
@classmethod
def negar(cls, resultado: 'Resultado'):
return cls.EXITO if resultado == cls.ERROR else cls.ERROR
class Demonio:
def __init__(self, inicio: Callable, ciclo: Callable, archivo_pid: str, archivo_log: str):
self.ruta = Path(__file__).resolve().parent
self.archivo_pid = archivo_pid
self.archivo_log = archivo_log
# La funcion de inicio se ejecuta una sola vez al iniciar el proceso
self.inicio = inicio
# La funcion de ciclo se ejecuta en un ciclo infinito
self.ciclo = ciclo
@property
def corriendo(self):
return (self.ruta / self.archivo_pid).exists()
def estatus(self) -> Resultado:
if self.corriendo:
print('El proceso se esta ejecutando')
return Resultado.EXITO
else:
print('El proceso no se esta ejecutando')
return Resultado.ERROR
def iniciar(self) -> Resultado:
if self.corriendo:
return Resultado.negar(self.estatus())
demonio = daemon.DaemonContext \
(
stdout= open(self.ruta / self.archivo_log, 'a+'),
stderr= open(self.ruta / self.archivo_log, 'a+'),
pidfile= PidFile(self.archivo_pid, self.ruta)
)
with demonio:
# Se redirigen los logs
log.configurar_logs()
sys.stdout = log.LogHandle(log.INFO)
sys.stderr = log.LogHandle(log.ERROR)
# Se muestra el mensaje de inicio
print('Proceso ejecutandose en segundo plano')
print(f'ID del proceso: {os.getpid()}')
# Se ejecuta la función de inicio
self.inicio()
# Se ejecuta la función de ciclo
while True:
try:
self.ciclo()
except KeyboardInterrupt:
print('Proceso detenido por el usuario')
return Resultado.EXITO
except Exception as e:
log.error('Proceso detenido debido a un error')
log.error(e)
return Resultado.ERROR
def detener(self):
if not self.corriendo:
return self.estatus()
# Se obtiene el PID del proceso
with (self.ruta / self.archivo_pid).open() as archivo:
pid = archivo.read()
print('Deteniendo proceso...')
# Se detiene el proceso
while self.corriendo:
try:
os.kill(int(pid), signal.SIGINT)
time.sleep(1)
except:
log.error('No se pudo detener el proceso')
return Resultado.ERROR
print('Proceso detenido')
return Resultado.EXITO
def interfaz(self):
parser = argparse.ArgumentParser()
parser.add_argument('accion', choices= ['estatus', 'iniciar', 'detener'])
args = parser.parse_args()
if args.accion == 'estatus':
resultado = self.estatus()
elif args.accion == 'iniciar':
resultado = self.iniciar()
elif args.accion == 'detener':
resultado = self.detener()
sys.exit(resultado.value)
\ No newline at end of file
''' Funciones para cargar o crear una especie en la base de datos '''
import pandas as pd
from typing import Optional, Union, Dict
# SQLAlchemy
import sqlalchemy as sa
import sqlalchemy.orm as orm
# Modelos
from modelos import Conexion, Especie
from modelos import GridGeo64km, GridGeo32km, GridGeo16km, GridGeo8km
# Variables globales de conexión a la base de datos
MOTOR_SQL: sa.engine.Engine
SESION: orm.Session
def usar_conexion(conexion: Conexion):
''' Usa el motor y la sesión de una conexión a la base de datos '''
global MOTOR_SQL, SESION
MOTOR_SQL = conexion.motor_sql
SESION = conexion.sesion
def cargar_especie(grupo_edad: str, sexo: str, tipo: str) -> Optional[Especie]:
''' Carga una especie desde la base de datos '''
conjunto, clasificacion = tipo.split(' ')
especie: Optional[Especie] = SESION \
.query(Especie) \
.filter(Especie.clasificacion == clasificacion) \
.filter(Especie.grupo_edad == grupo_edad) \
.filter(Especie.conjunto == conjunto) \
.filter(Especie.sexo == sexo) \
.first()
return especie
def crear_especie(grupo_edad: str, sexo: str, tipo: str):
''' Crea una especie y la inserta en la base de datos '''
conjunto, clasificacion = tipo.split(' ')
especie = Especie \
(
grupo_edad = grupo_edad,
sexo = sexo,
conjunto = conjunto,
clasificacion = clasificacion,
tipo = tipo,
validacion = 2,
cells_64km = [],
cells_32km = [],
cells_16km = [],
cells_8km = []
)
SESION.add(especie)
SESION.flush()
return especie
def cargar_o_crear_especie(grupo_edad, sexo, tipo: str):
''' Carga una especie de la base de datos, o la crea si no existe '''
especie = cargar_especie(grupo_edad, sexo, tipo)
return crear_especie(grupo_edad, sexo, tipo) if especie is None else especie
def agregar_celdas_especie(registros: pd.DataFrame, especie: pd.Series, resolucion):
'''
Dados los registros de una especie, obtiene las celdas de cierta resolución
y las agrega al conjunto de celdas de esa especie conservando unicamente
las celdas únicas
'''
nueva_especie: pd.DataFrame = registros[registros['spid'] == especie['spid']]
nuevas_celdas = nueva_especie[f'gridid_{resolucion}km'].unique().tolist()
celdas = especie[f'cells_{resolucion}km'] + nuevas_celdas
return pd.unique(celdas).tolist()
def actualizar_celdas_especie(registros: pd.DataFrame, especie: pd.Series):
''' Actualiza las celdas de todas las resoluciones de una especie con los nuevos registros '''
SESION.query(Especie) \
.filter(Especie.spid == especie['spid']) \
.update \
({
Especie.cells_64km: agregar_celdas_especie(registros, especie, 64),
Especie.cells_32km: agregar_celdas_especie(registros, especie, 32),
Especie.cells_16km: agregar_celdas_especie(registros, especie, 16),
Especie.cells_8km : agregar_celdas_especie(registros, especie, 8),
})
SESION.flush()
return especie
def actualizar_celdas_regiones(especie: Especie):
''' Actualiza las celdas de todas las regiones de una especie '''
resoluciones: Dict[int, Union[GridGeo64km, GridGeo32km, GridGeo16km, GridGeo8km]] = \
{
64: GridGeo64km,
32: GridGeo32km,
16: GridGeo16km,
8 : GridGeo8km
}
regiones = SESION.query(GridGeo64km.footprint_region).all()
for region, in regiones:
for resolucion, GridGeo in resoluciones.items():
nuevas_celdas: sa.Column = getattr(Especie, f'cells_{resolucion}km')
celdas_region: sa.Column = getattr(Especie, f'cells_{resolucion}km_{region}')
interseccion: sa.Column = nuevas_celdas.op('&')(GridGeo.cells).label('interseccion')
# Se obtiene la intersección de las celdas de la especie con las celdas de la región
subconsulta: orm.Query = SESION.query(Especie.spid, interseccion) \
.join(GridGeo, GridGeo.footprint_region == region) \
.filter(Especie.spid == especie.spid) \
.subquery()
# Se actualiza la especie con la intersección de las celdas
SESION.query(Especie) \
.filter(Especie.spid == subconsulta.c.spid) \
.update({celdas_region: subconsulta.c.interseccion}, synchronize_session = False)
SESION.flush()
return especie
\ No newline at end of file
import pandas as pd
from typing import List
from pathlib import Path
# Modelos
from modelos import RegistroKafka
# Nivel geográfico
NIVEL_GEOGRAFICO = 'municipio'
# Ruta del archivo actual
RUTA = Path(__file__).resolve().parent
# Ruta del archivo de estados
RUTA_ESTADOS = RUTA / 'datos/estados.csv'
# Ruta del archivo de municipios
RUTA_MUNICIPIOS = RUTA / 'datos/municipios.csv'
# Diccionario de sexos
SEXOS = \
{
1: 'M',
2: 'H',
99: 'NO ESPECIFICADO'
}
# Diccionario de nacionalidades
NACIONALIDADES = \
{
1: 'MEXICANA',
2: 'EXTRANJERA',
99: 'NO ESPECIFICADO'
}
# Diccionario de estados
ESTADOS = pd.read_csv(RUTA_ESTADOS, dtype =
{
'ID_ESTADO': int,
'ESTADO': str,
'LATITUD': float,
'LONGITUD': float
}) \
.set_index('ID_ESTADO') \
.to_dict(orient = 'index')
# Diccionario de municipios
MUNICIPIOS = pd.read_csv(RUTA_MUNICIPIOS, dtype =
{
'ID_MUNICIPIO': int,
'ID_ESTADO': int,
'MUNICIPIO': str,
'LATITUD': float,
'LONGITUD': float
}) \
.set_index \
([
'ID_MUNICIPIO',
'ID_ESTADO'
]) \
.to_dict(orient = 'index')
# Lista de grupos de edad
GRUPOS_EDAD = \
[
'0-18',
'19-30',
'31-40',
'41-49',
'50-55',
'56-60',
'61-65',
'66-70',
'70-75',
'76-120'
]
def inicializar_dataframe(registros: List[RegistroKafka]):
''' Inicializa el dataframe con los valores de una lista de registros '''
# Si no hay registros, se regresa un dataframe vacío
if len(registros) == 0:
return pd.DataFrame()
# Se crea el dataframe con los registros y las columnas especificadas
df = pd.DataFrame \
(
[registro.__dict__ for registro in registros],
columns =
[
'ID_REGISTRO',
'FECHA_ACTUALIZACION',
'ENTIDAD_RES', 'MUNICIPIO_RES',
'SEXO', 'FECHA_SINTOMAS', 'FECHA_DEF',
'EDAD', 'CLASIFICACION_FINAL', 'NACIONALIDAD'
]
)
# Se cambia el nombre de la columna 'ID_REGISTRO' a 'id'
df.rename(columns={'ID_REGISTRO': 'id'}, inplace= True)
# Se crea una nueva columna 'grupo_edad' con los grupos de edad
grupos = [-1, 18, 30, 40, 49, 55, 60, 65, 70, 75, 1000000]
df['grupo_edad'] = pd.cut(df['EDAD'], grupos, labels = GRUPOS_EDAD)
del df['EDAD']
# Se crea una nueva columna 'sexo'
df['sexo'] = df['SEXO'].apply(lambda id: SEXOS[id])
del df['SEXO']
# Se crea una nueva columna 'conjunto' con el valor 'COVID-19'
df['conjunto'] = 'COVID-19'
# Función 'clasificación' que interpreta la columna de 'CLASIFICACION_FINAL' y 'FECHA_DEF'
def clasificacion(valor):
if valor in [1, 2, 3]:
return 'CONFIRMADO'
elif valor in [4, 5]:
return 'INVIABLE'
elif valor in [6]:
return 'SOSPECHOSO'
elif valor in [7]:
return 'NEGATIVO'
else:
return 'DESCONOCIDO'
# Se crea una nueva columna 'clasificación'
df['clasificacion'] = df['CLASIFICACION_FINAL'].apply(clasificacion)
del df['CLASIFICACION_FINAL']
# Se crea una nueva columna 'tipo' y 'copia_tipo'
df['tipo'] = df['conjunto'] + ' ' + df['clasificacion']
df['copia_tipo'] = df['tipo']
# Se crea una nueva columna nacionalidad
df['nacionalidad'] = df['NACIONALIDAD'].apply(lambda id: NACIONALIDADES[id])
del df['NACIONALIDAD']
# Se crean las columnas de 'pais_geo', 'estado_geo', 'latitud' y 'longitud'
df['pais_geo'] = 'México'
if NIVEL_GEOGRAFICO == 'estado':
df['estado_geo'] = df['ENTIDAD_RES'].apply(lambda id: ESTADOS[id]['ESTADO'])
df['latitud'] = df['ENTIDAD_RES'].apply(lambda id: ESTADOS[id]['LATITUD'])
df['longitud'] = df['ENTIDAD_RES'].apply(lambda id: ESTADOS[id]['LONGITUD'])
elif NIVEL_GEOGRAFICO == 'municipio':
# Se combinan las columnas de 'MUNICIPIO_RES' y 'ENTIDAD_RES' en una sola
df['mun_ent'] = df['MUNICIPIO_RES'].astype(str) + ',' + df['ENTIDAD_RES'].astype(str)
df['mun_ent'] = df['mun_ent'].apply(lambda ren: tuple([int(x) for x in ren.split(',')]))
# Función para cargar cierto valor asociado a la columna de 'mun_ent'
def valor_mun_ent(id: tuple, tipo: str):
# Se intenta cargar la llave (ID_MUNICIPIO, ID_ESTADO)
if id in MUNICIPIOS:
return MUNICIPIOS[id][tipo]
# Se intenta cargar la llave (999, ID_ESTADO)
elif (999, id[1]) in MUNICIPIOS:
return MUNICIPIOS[(999, id[1])][tipo]
# Se regresa el valor de la llave (999, 99)
else:
return MUNICIPIOS[(999, 99)][tipo]
# Se crean las columnas de 'estado_geo', 'latitud' y 'longitud'
df['estado_geo'] = df['mun_ent'].apply(lambda id: valor_mun_ent(id, 'MUNICIPIO'))
df['latitud'] = df['mun_ent'].apply(lambda id: valor_mun_ent(id, 'LATITUD'))
df['longitud'] = df['mun_ent'].apply(lambda id: valor_mun_ent(id, 'LONGITUD'))
# Se elimina la columna de 'mun_ent'
del df['mun_ent']
# Se eliminan las columnas de 'ENTIDAD_RES' y 'MUNICIPIO_RES'
del df['ENTIDAD_RES']
del df['MUNICIPIO_RES']
# Se convierten los campos de fecha a datetime
df['FECHA_ACTUALIZACION'] = pd.to_datetime(df['FECHA_ACTUALIZACION'])
df['FECHA_SINTOMAS'] = pd.to_datetime(df['FECHA_SINTOMAS'])
df['FECHA_DEF'] = pd.to_datetime(df['FECHA_DEF'], format='AAAA-MM-DD', errors= 'coerce')
# Se cargan los valores de las columnas de fecha de ingreso
df['dia_ingreso'] = df['FECHA_ACTUALIZACION'].dt.day
df['mes_ingreso'] = df['FECHA_ACTUALIZACION'].dt.month
df['anio_ingreso'] = df['FECHA_ACTUALIZACION'].dt.year
del df['FECHA_ACTUALIZACION']
# Se cargan los valores de las columnas de fecha de síntomas
df['dia_sintomas_def'] = df['FECHA_SINTOMAS'].dt.day
df['mes_sintomas_def'] = df['FECHA_SINTOMAS'].dt.month
df['anio_sintomas_def'] = df['FECHA_SINTOMAS'].dt.year
del df['FECHA_SINTOMAS']
# Se crean nuevos registros para los casos fallecidos que son confirmados
fallecidos = df[df['FECHA_DEF'].notnull() & (df['clasificacion'] == 'CONFIRMADO')].copy()
fallecidos['clasificacion'] = 'FALLECIDO'
# Se modifica el 'id' de los fallecidos agregando '_f'
fallecidos['id'] = fallecidos['id'] + '_f'
# Se actualizan las columnas de 'tipo' y 'copia_tipo'
fallecidos['tipo'] = fallecidos['conjunto'] + ' ' + fallecidos['clasificacion']
fallecidos['copia_tipo'] = fallecidos['tipo']
# Se cargan los valores de las columnas de fecha de defunción
fallecidos['dia_sintomas_def'] = fallecidos['FECHA_DEF'].dt.day
fallecidos['mes_sintomas_def'] = fallecidos['FECHA_DEF'].dt.month
fallecidos['anio_sintomas_def'] = fallecidos['FECHA_DEF'].dt.year
del fallecidos['FECHA_DEF']
del df['FECHA_DEF']
# Se regresa el dataframe original concatenado con los fallecidos
return pd.concat([df, fallecidos])
\ No newline at end of file
''' Funciones para cargar o crear una geolocalización en la base de datos '''
from typing import Optional
# SQLAlchemy
import sqlalchemy as sa
import sqlalchemy.orm as orm
from geoalchemy2.functions import ST_GeomFromText
# Modelos
from modelos import Conexion
from modelos import Aoi, Grid8km, Geolocalizacion
from modelos import GridStateAoi, GridMunAoi, GridAgebAoi
# Variables globales de conexión a la base de datos
MOTOR_SQL: sa.engine.Engine
SESION: orm.Session
def usar_conexion(conexion: Conexion):
''' Usa el motor y la sesión de una conexión a la base de datos '''
global MOTOR_SQL, SESION
MOTOR_SQL = conexion.motor_sql
SESION = conexion.sesion
def cargar_geolocalizacion(longitud: float, latitud: float) -> Optional[Geolocalizacion]:
''' Carga una geolocalización desde la base de datos '''
the_geom = f'SRID=4326;POINT({longitud} {latitud})'
geolocalizacion = SESION \
.query(Geolocalizacion) \
.filter(Geolocalizacion.the_geom.ST_Equals(the_geom)) \
.first()
return geolocalizacion
def crear_geolocalizacion(longitud: float, latitud: float):
''' Crea una geolocalización y la inserta en la base de datos '''
the_geom = f'SRID=4326;POINT({longitud} {latitud})'
geom_m = f'SRID=900913;POINT({longitud} {latitud})'
# Se carga el campo 'gid' de la tabla 'aoi'
datos_aoi: Optional[Aoi] = SESION \
.query(Aoi.gid) \
.filter(Aoi.geom.ST_Intersects(ST_GeomFromText(the_geom))) \
.first()
# Se cargan las llaves foráneas 'gridid_8km', 'gridid_16km', 'gridid_32km' y 'gridid_64km' de la tabla 'grid_8km_aoi'
datos_grid: Optional[Grid8km] = SESION \
.query(Grid8km.gridid_8km, Grid8km.gridid_16km, Grid8km.gridid_32km, Grid8km.gridid_64km) \
.filter(Grid8km.the_geom.ST_Intersects(ST_GeomFromText(the_geom))) \
.first()
# Se carga la llave foránea 'gridid_statekm' de la tabla 'grid_statekm_aoi'
datos_statekm: Optional[GridStateAoi] = SESION \
.query(GridStateAoi.gridid_statekm) \
.filter(GridStateAoi.the_geom.ST_Intersects(ST_GeomFromText(the_geom))) \
.first()
# Se carga la llave foránea 'gridid_munkm' de la tabla 'grid_munkm_aoi'
datos_munkm: Optional[GridMunAoi] = SESION \
.query(GridMunAoi.gridid_munkm) \
.filter(GridMunAoi.the_geom.ST_Intersects(ST_GeomFromText(the_geom))) \
.first()
# Se carga la llave foránea 'gridid_agebkm' de la tabla 'grid_agebkm_aoi'
datos_agebkm: Optional[GridAgebAoi] = SESION \
.query(GridAgebAoi.gridid_agebkm) \
.filter(GridAgebAoi.the_geom.ST_Intersects(ST_GeomFromText(the_geom))) \
.first()
# Función utilizada para asegurar que las llaves foráneas sean siempre valores enteros
def llave_foranea(resultado: Optional[sa.engine.Row], campo: str):
try:
return int(resultado[campo])
except:
return 0
# Se crea el registro en la tabla 'geo_snib'
geolocalizacion = Geolocalizacion \
(
longitud = longitud,
latitud = latitud,
the_geom = the_geom,
geom_m = geom_m,
# Campos obtenidos de la tabla 'aoi'
gid = llave_foranea(datos_aoi, 'gid'),
# Campos obtenidos de la tabla 'grid_8km_aoi'
gridid_8km = llave_foranea(datos_grid, 'gridid_8km'),
gridid_16km = llave_foranea(datos_grid, 'gridid_16km'),
gridid_32km = llave_foranea(datos_grid, 'gridid_32km'),
gridid_64km = llave_foranea(datos_grid, 'gridid_64km'),
# Campos obtenidos de la tablas 'grid_statekm_aoi', 'grid_munkm_aoi' y 'grid_agebkm_aoi'
gridid_statekm = llave_foranea(datos_statekm, 'gridid_statekm'),
gridid_munkm = llave_foranea(datos_munkm, 'gridid_munkm'),
gridid_agebkm = llave_foranea(datos_agebkm, 'gridid_agebkm')
)
# Se guarda el registro en la base de datos
SESION.add(geolocalizacion)
SESION.flush()
return geolocalizacion
def cargar_o_crear_geolocalizacion(longitud: float, latitud: float):
''' Carga una geolocalización desde la base de datos o la crea si no existe '''
geolocation = cargar_geolocalizacion(longitud, latitud)
return crear_geolocalizacion(longitud, latitud) if geolocation is None else geolocation
\ No newline at end of file
''' Contiene la clase 'Conexión' que representa la conexión de un consumidor de Kafka a un tópico '''
from typing import Optional, Callable, Any
# Confluent Kafka
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka import Consumer, TopicPartition, OFFSET_STORED
from confluent_kafka.serialization import SerializationContext, MessageField
class Conexion:
''' Representa la conexión de un consumidor de Kafka a un tópico '''
def __init__ \
(
self,
bootrap_servers: str,
group_id: str,
schema_registry_url: str,
topico_kafka: str,
particion_kafka: int,
from_dict: Callable[[dict, Any], Optional[Any]]
):
# Se inicializa el esquema y el deserializador de Avro
self.schema_registry_client = SchemaRegistryClient({'url': schema_registry_url})
self.avro_deserializer = AvroDeserializer(self.schema_registry_client, from_dict = from_dict)
# Se crea el consumidor de Kafka
self.consumer_settings = \
{
'bootstrap.servers': bootrap_servers,
'group.id': group_id,
'auto.offset.reset': 'earliest'
}
self.consumer = Consumer(self.consumer_settings)
# Se asigna el consumidor a la partición del tópico y se crea el contexto de serialización
self.topic_partition = TopicPartition(topico_kafka, particion_kafka, OFFSET_STORED)
self.serialization_context = SerializationContext(self.topic_partition.topic, MessageField.VALUE)
self.consumer.assign([self.topic_partition])
# Se usa para almacenar el indice local del primer mensaje leído por el consumidor de Kafka
self.indice_local = None
def cerrar(self):
''' Cierra el consumidor de Kafka '''
self.consumer.close()
def reasignar(self):
''' Reasigna el consumidor de Kafka si se ha perdido la conexión '''
try:
# Esta función devuelve un 'RuntimeError' si el consumidor de Kafka se ha desconectado
self.consumer.consumer_group_metadata()
except RuntimeError as e:
# Basta con crear un nuevo consumidor de Kafka para reasignarlo
self.consumer = Consumer(self.consumer_settings)
self.consumer.assign([self.topic_partition])
def mover_indice_remoto(self, indice: Optional[int]):
''' Posiciona el consumidor de Kafka en el indice indicado '''
# No se hace nada si el indice es nulo
if indice is None:
return
self.reasignar()
self.consumer.unassign()
self.topic_partition.offset = indice
self.consumer.commit(offsets=[self.topic_partition])
self.consumer.assign([self.topic_partition])
def indice_mensaje_remoto(self) -> Optional[int]:
'''
Obtiene el indice del ultimo mensaje leído por consumidor de Kafka
- Si no se ha leído ningún mensaje, devuelve nulo
'''
self.reasignar()
indice = self.consumer.position([self.topic_partition])[0].offset
return indice if indice > 0 else None
def regresar_indice_remoto(self):
'''
Regresa el indice remoto del consumidor para que coincida con el indice local
- Se usa para revertir el flujo de datos al punto de partida (indice local)
'''
self.mover_indice_remoto(self.indice_local)
def avanzar_indice_local(self):
'''
Avanza el indice local del consumidor para que coincida con el indice remoto
- Se usa para actualizar el punto de partida (indice local) del flujo de datos
'''
self.indice_local = self.indice_mensaje_remoto()
def obtener_registro(self) -> Optional[Any]:
''' Obtiene el siguiente registro desde el consumidor de Kafka '''
self.reasignar()
# Se lee el siguiente mensaje del consumidor de Kafka
mensaje = self.consumer.poll(1)
# Se regresa nulo si no se ha leído ningún mensaje o si el mensaje contiene un error
if mensaje is None or mensaje.error() is not None:
return None
# Se actualiza el indice del primer mensaje leído
if self.indice_local is None:
self.indice_local = self.indice_mensaje_remoto() - 1
# Se deserializa el mensaje y se regresa el registro
return self.avro_deserializer(mensaje.value(), self.serialization_context)
\ No newline at end of file
''' Utilidades para el manejo de logs '''
import logging
# Niveles de logs
INFO = logging.INFO
ERROR = logging.ERROR
class LogHandle:
''' Permite redirigir la salida estándar y de error a un logger '''
def __init__(self, level: int):
self.level = level
self.message = ''
def write(self, message: str):
self.message = self.message + message
while '\n' in self.message:
index = self.message.index('\n')
logging.log(self.level, self.message[:index])
self.message = self.message[(index + 1):]
def flush(self):
pass
def configurar_logs():
''' Configura los logs con el formato deseado '''
# Se configura el logger
logging.basicConfig \
(
format = '[%(asctime)-19s] [%(levelname)-8s]: %(message)s',
datefmt= '%d/%m/%Y %H:%M:%S',
level= logging.DEBUG
)
# Se desactivan los logs para la librería 'pid'
logging.getLogger('PidFile').setLevel(logging.WARNING)
# Se desactivan los logs de la librería 'requests'
logging.getLogger('urllib3').setLevel(logging.WARNING)
def error(mensaje: str):
''' Muestra un mensaje de error '''
logging.error(mensaje)
\ No newline at end of file
This diff is collapsed. Click to expand it.
pandas==2.0.2
fastavro==1.7.4
SQLAlchemy==2.0.16
GeoAlchemy2==0.13.3
psycopg2-binary==2.9.6
confluent-kafka==2.1.1
python-daemon==3.0.1
pid==3.0.4
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment