- Se añadio coloración a los mensajes de error y de información

- Los logs ya no se guardan en "/var/log/syslog", ahora se guardan en "instalacion/Script.log"
- Se modifico el script de diferencias para que elimine los archivos descargados si hubo un error durante su descarga
parent dcf9d7ab
import sys import Log
import json import json
import tempfile import tempfile
from typing import Dict from typing import Dict
from pathlib import Path from pathlib import Path
# Codigos de salida
SALIR_EXITO = 0
SALIR_ERROR = 1
# Carpetas globales # Carpetas globales
CARPETA_TEMPORAL = Path(tempfile.gettempdir()) CARPETA_TEMPORAL = Path(tempfile.gettempdir())
CARPETA_ACTUAL = Path(__file__).parent.resolve() CARPETA_ACTUAL = Path(__file__).parent.resolve()
...@@ -19,15 +15,6 @@ ARCHIVO_AJUSTES = CARPETA_ACTUAL / 'Ajustes.json' ...@@ -19,15 +15,6 @@ ARCHIVO_AJUSTES = CARPETA_ACTUAL / 'Ajustes.json'
with open(ARCHIVO_AJUSTES, 'r') as archivo_json: with open(ARCHIVO_AJUSTES, 'r') as archivo_json:
AJUSTES: Dict[str, object] = json.load(archivo_json) AJUSTES: Dict[str, object] = json.load(archivo_json)
def error(mensaje: str, salir = True):
''' Muestra un mensaje de error y opcionalmente termina el programa '''
print(f'ERROR: {mensaje}')
# Se termina el programa si se solicita
if salir:
sys.exit(SALIR_ERROR)
def ajuste(ruta_ajuste: str) -> object: def ajuste(ruta_ajuste: str) -> object:
''' Regresa el valor de un ajuste ''' ''' Regresa el valor de un ajuste '''
...@@ -40,7 +27,7 @@ def ajuste(ruta_ajuste: str) -> object: ...@@ -40,7 +27,7 @@ def ajuste(ruta_ajuste: str) -> object:
return ajuste return ajuste
except: except:
error(f'El ajuste \'{ruta_ajuste}\' no existe') Log.error(f'El ajuste \'{ruta_ajuste}\' no existe')
def ruta_kafka(servicio: str): def ruta_kafka(servicio: str):
''' Regresa el url del servicio de Kafka solicitado ''' ''' Regresa el url del servicio de Kafka solicitado '''
...@@ -52,6 +39,7 @@ CARPETA_CSVDIFF = CARPETA_ACTUAL / 'CSVDiff' ...@@ -52,6 +39,7 @@ CARPETA_CSVDIFF = CARPETA_ACTUAL / 'CSVDiff'
CARPETA_INSTALACION = Path(ajuste('instalacion')).expanduser() CARPETA_INSTALACION = Path(ajuste('instalacion')).expanduser()
# Archivos locales # Archivos locales
ARCHIVO_LOG_PY = CARPETA_ACTUAL / 'Log.py'
ARCHIVO_SCRIPT = CARPETA_ACTUAL / 'Script.py' ARCHIVO_SCRIPT = CARPETA_ACTUAL / 'Script.py'
ARCHIVO_AJUSTES_PY = CARPETA_ACTUAL / 'Ajustes.py' ARCHIVO_AJUSTES_PY = CARPETA_ACTUAL / 'Ajustes.py'
ARCHIVO_FILEPULSE = CARPETA_ACTUAL / 'FilePulse.json' ARCHIVO_FILEPULSE = CARPETA_ACTUAL / 'FilePulse.json'
...@@ -61,5 +49,6 @@ ARCHIVO_COVID_BASE = CARPETA_ACTUAL / 'datos_abiertos_20000101.zip' ...@@ -61,5 +49,6 @@ ARCHIVO_COVID_BASE = CARPETA_ACTUAL / 'datos_abiertos_20000101.zip'
INSTALACION_BUILD = CARPETA_INSTALACION / 'build' INSTALACION_BUILD = CARPETA_INSTALACION / 'build'
INSTALACION_CSVDIFF = INSTALACION_BUILD / 'CSVDiff' INSTALACION_CSVDIFF = INSTALACION_BUILD / 'CSVDiff'
INSTALACION_SCRIPT = CARPETA_INSTALACION / 'Script.py' INSTALACION_SCRIPT = CARPETA_INSTALACION / 'Script.py'
INSTALACION_SCRIPT_LOG = CARPETA_INSTALACION / 'Script.log'
INSTALACION_DATOS_COVID = CARPETA_INSTALACION / 'datos_covid' INSTALACION_DATOS_COVID = CARPETA_INSTALACION / 'datos_covid'
INSTALACION_DIFERENCIAS = CARPETA_INSTALACION / 'diferencias' INSTALACION_DIFERENCIAS = CARPETA_INSTALACION / 'diferencias'
\ No newline at end of file
...@@ -2,21 +2,24 @@ ...@@ -2,21 +2,24 @@
import sys import sys
sys.dont_write_bytecode = True sys.dont_write_bytecode = True
# Librerías
import subprocess
# Ajustes # Ajustes
from Ajustes import * from Ajustes import *
# Se configuran los logs
Log.configurar_logs()
# Se elimina la carpeta de instalación # Se elimina la carpeta de instalación
subprocess.run(f'rm -rf {CARPETA_INSTALACION}', shell=True) print('Eliminando la carpeta de instalación...')
Log.subproceso(f'rm -rf {CARPETA_INSTALACION}', True, False)
# Se elimina el cronjob # Se elimina el cronjob
subprocess.run(f'crontab -l | grep -v "{ajuste("cronjob.nombre")}" | crontab -', shell=True) print('Eliminando el cronjob...')
Log.subproceso(f'crontab -l | grep -v "# {ajuste("cronjob.nombre")}" | crontab -', True, False)
# Se desinstalan los paquetes de Python # Se desinstalan los paquetes de Python
subprocess.run('pip3 uninstall zipfile_deflate64 python-crontab requests -y', shell=True, stderr=subprocess.DEVNULL) print('Desinstalando paquetes de Python...')
Log.subproceso('pip3 uninstall zipfile_deflate64 python-crontab requests -y', True, False)
# Se elimina el conector de FilePulse # Se elimina el conector de FilePulse
subprocess.run(f'curl -X DELETE {ruta_kafka("kafka_connect")}/connectors/{ajuste("kafka.conector")}', shell=True) print('Eliminando el conector de FilePulse...')
print() Log.subproceso(f'curl -X DELETE {ruta_kafka("kafka_connect")}/connectors/{ajuste("kafka.conector")}', True, False)
\ No newline at end of file \ No newline at end of file
...@@ -5,7 +5,6 @@ sys.dont_write_bytecode = True ...@@ -5,7 +5,6 @@ sys.dont_write_bytecode = True
# Librerías # Librerías
import shutil import shutil
import getpass import getpass
import subprocess
import importlib.util import importlib.util
# Ajustes # Ajustes
...@@ -17,7 +16,7 @@ def comprobar_comando(comando: str, sugerencia: str = None): ...@@ -17,7 +16,7 @@ def comprobar_comando(comando: str, sugerencia: str = None):
if shutil.which(comando) is None: if shutil.which(comando) is None:
if sugerencia is not None: if sugerencia is not None:
print(f'Se recomienda el comando \'{sugerencia}\'') print(f'Se recomienda el comando \'{sugerencia}\'')
error(f'\'{comando}\' no está instalado') Log.error(f'\'{comando}\' no está instalado')
def instalar_paquete(paquete: str, nombre_import: str = None): def instalar_paquete(paquete: str, nombre_import: str = None):
''' Instala un paquete de Python si no está instalado ''' ''' Instala un paquete de Python si no está instalado '''
...@@ -29,11 +28,9 @@ def instalar_paquete(paquete: str, nombre_import: str = None): ...@@ -29,11 +28,9 @@ def instalar_paquete(paquete: str, nombre_import: str = None):
# Se comprueba que el paquete esté instalado # Se comprueba que el paquete esté instalado
if importlib.util.find_spec(nombre_import) is None: if importlib.util.find_spec(nombre_import) is None:
print(f'El paquete \'{paquete}\' no está instalado. Instalando...') print(f'El paquete \'{paquete}\' no está instalado. Instalando...')
sub = subprocess.run(['pip3', 'install', paquete],
stdout = subprocess.DEVNULL, stderr = subprocess.STDOUT)
if sub.returncode != 0: if not Log.subproceso(['pip3', 'install', paquete]):
error(f'No se pudo instalar el paquete \'{paquete}\'') Log.error(f'No se pudo instalar el paquete \'{paquete}\'')
def crear_carpeta(carpeta: Path): def crear_carpeta(carpeta: Path):
''' Crea una carpeta si no existe ''' ''' Crea una carpeta si no existe '''
...@@ -52,14 +49,12 @@ def compilar_csvdiff(): ...@@ -52,14 +49,12 @@ def compilar_csvdiff():
''' Compila el programa CSVDiff ''' ''' Compila el programa CSVDiff '''
print('Compilando CSVDiff...') print('Compilando CSVDiff...')
sub_cmake = subprocess.run(['cmake', '-S', str(CARPETA_CSVDIFF), '-B', str(INSTALACION_BUILD)], exito_cmake = Log.subproceso(['cmake', '-S', str(CARPETA_CSVDIFF), '-B', str(INSTALACION_BUILD)])
stdout = subprocess.DEVNULL, stderr = subprocess.STDOUT) exito_make = Log.subproceso(['make', '-C', str(INSTALACION_BUILD)])
sub_make = subprocess.run(['make', '-C', str(INSTALACION_BUILD)],
stdout = subprocess.DEVNULL, stderr = subprocess.STDOUT)
# Se comprueba que la compilación haya sido exitosa # Se comprueba que la compilación haya sido exitosa
if sub_cmake.returncode != SALIR_EXITO or sub_make.returncode != SALIR_EXITO: if not (exito_cmake and exito_make):
error('No se pudo compilar CSVDiff') Log.error('No se pudo compilar CSVDiff')
def crear_topico(): def crear_topico():
''' Crea el tópico de Kafka ''' ''' Crea el tópico de Kafka '''
...@@ -68,21 +63,17 @@ def crear_topico(): ...@@ -68,21 +63,17 @@ def crear_topico():
print('Creando tópico de Kafka...') print('Creando tópico de Kafka...')
# Se manda una solicitud para consultar los tópicos de Kafka # Se manda una solicitud para consultar los tópicos de Kafka
respuesta = requests.get(ruta_kafka('rest_proxy') + '/topics') respuesta = requests.get(ruta_kafka('rest_proxy') + '/topics/' + ajuste('kafka.topic'))
# Se comprueba la respuesta de la solicitud
if not respuesta.ok:
error('No se pudiieron consultar los tópicos de Kafka')
# Solo se crea el tópico si no existe # Solo se crea el tópico si no existe
if ajuste('kafka.topic') not in respuesta.json(): if not respuesta.ok:
# Se manda una solicitud para obtener el cluster de Kafka # Se manda una solicitud para obtener el cluster de Kafka
respuesta = requests.get(ruta_kafka('rest_proxy') + '/v3/clusters') respuesta = requests.get(ruta_kafka('rest_proxy') + '/v3/clusters')
# Se comprueba la respuesta de la solicitud # Se comprueba la respuesta de la solicitud
if not respuesta.ok: if not respuesta.ok:
error('No se pudo obtener el cluster de Kafka') Log.error('No se pudo obtener el cluster de Kafka')
# Se obtiene el id del cluster # Se obtiene el id del cluster
cluster_id = respuesta.json()['data'][0]['cluster_id'] cluster_id = respuesta.json()['data'][0]['cluster_id']
...@@ -94,7 +85,7 @@ def crear_topico(): ...@@ -94,7 +85,7 @@ def crear_topico():
# Se comprueba la respuesta de la solicitud # Se comprueba la respuesta de la solicitud
if not respuesta.ok: if not respuesta.ok:
error('No se pudo crear el tópico de Kafka') Log.error('No se pudo crear el tópico de Kafka')
def cambiar_retencion(): def cambiar_retencion():
''' Modifica el tiempo de retención del tópico de Kafka ''' ''' Modifica el tiempo de retención del tópico de Kafka '''
...@@ -107,7 +98,7 @@ def cambiar_retencion(): ...@@ -107,7 +98,7 @@ def cambiar_retencion():
# Se comprueba la respuesta de la solicitud # Se comprueba la respuesta de la solicitud
if not respuesta.ok: if not respuesta.ok:
error('No se pudo obtener el cluster de Kafka') Log.error('No se pudo obtener el cluster de Kafka')
# Se obtiene el id del cluster # Se obtiene el id del cluster
cluster_id = respuesta.json()['data'][0]['cluster_id'] cluster_id = respuesta.json()['data'][0]['cluster_id']
...@@ -119,7 +110,7 @@ def cambiar_retencion(): ...@@ -119,7 +110,7 @@ def cambiar_retencion():
# Se comprueba la respuesta de la solicitud # Se comprueba la respuesta de la solicitud
if not respuesta.ok: if not respuesta.ok:
error('No se pudo modificar el tiempo de retención del tópico de Kafka') Log.error('No se pudo modificar el tiempo de retención del tópico de Kafka')
def crear_conector_filepulse(): def crear_conector_filepulse():
''' Crea el conector de FilePulse ''' ''' Crea el conector de FilePulse '''
...@@ -141,7 +132,7 @@ def crear_conector_filepulse(): ...@@ -141,7 +132,7 @@ def crear_conector_filepulse():
# Se comprueba la respuesta de la solicitud # Se comprueba la respuesta de la solicitud
if not respuesta.ok: if not respuesta.ok:
error('No se pudo crear el conector de FilePulse') Log.error('No se pudo crear el conector de FilePulse')
def crear_cronjob(): def crear_cronjob():
''' Crea el cronjob para ejecutar el script diariamente ''' ''' Crea el cronjob para ejecutar el script diariamente '''
...@@ -154,18 +145,27 @@ def crear_cronjob(): ...@@ -154,18 +145,27 @@ def crear_cronjob():
HORA = int(ajuste('cronjob.hora').split(':')[0]) HORA = int(ajuste('cronjob.hora').split(':')[0])
MINUTO = int(ajuste('cronjob.hora').split(':')[1]) MINUTO = int(ajuste('cronjob.hora').split(':')[1])
# Se obtiene la ruta del ejecutable de Python3 # Se obtiene la ruta de Python3
sub = subprocess.run(['whereis', 'python3'], capture_output = True) resultado = Log.subproceso('whereis python3', regresar_stdout= True)
# Se crea el cronjob si se pudo obtener la ruta de Python3 # Solo se crea el cronjob si se puede obtener la ruta de Python3
if sub.returncode == SALIR_EXITO: if resultado:
cron = crontab.CronTab(user = getpass.getuser()) cron = crontab.CronTab(user = getpass.getuser())
ruta_python3 = sub.stdout.decode('utf-8').split(' ')[1]
tarea = cron.new(f'{ruta_python3} {INSTALACION_SCRIPT} 2>&1 | logger -t {NOMBRE}') # Solo se crea el cronjob si no existe
tarea.hour.on(HORA); tarea.minute.on(MINUTO) if len(list(cron.find_comment(NOMBRE))) == 0:
cron.write() ruta_python3 = resultado.split(' ')[1]
tarea = cron.new(f'{ruta_python3} {INSTALACION_SCRIPT}')
tarea.hour.on(HORA); tarea.minute.on(MINUTO)
tarea.comment = NOMBRE
cron.write()
else:
Log.error('Ya existe un cronjob con ese nombre')
else: else:
error('No se pudo crear el cronjob') Log.error('No se pudo crear el cronjob')
# Se configuran los logs
Log.configurar_logs()
# Se comprueban los comandos de sistema # Se comprueban los comandos de sistema
comprobar_comando('pip3', 'sudo apt install python3-pip') comprobar_comando('pip3', 'sudo apt install python3-pip')
...@@ -184,6 +184,7 @@ crear_carpeta(INSTALACION_DATOS_COVID) ...@@ -184,6 +184,7 @@ crear_carpeta(INSTALACION_DATOS_COVID)
crear_carpeta(INSTALACION_DIFERENCIAS) crear_carpeta(INSTALACION_DIFERENCIAS)
# Se copian los archivos de instalación # Se copian los archivos de instalación
copiar_archivo(ARCHIVO_LOG_PY, CARPETA_INSTALACION)
copiar_archivo(ARCHIVO_SCRIPT, CARPETA_INSTALACION) copiar_archivo(ARCHIVO_SCRIPT, CARPETA_INSTALACION)
copiar_archivo(ARCHIVO_AJUSTES, CARPETA_INSTALACION) copiar_archivo(ARCHIVO_AJUSTES, CARPETA_INSTALACION)
copiar_archivo(ARCHIVO_AJUSTES_PY, CARPETA_INSTALACION) copiar_archivo(ARCHIVO_AJUSTES_PY, CARPETA_INSTALACION)
...@@ -192,6 +193,12 @@ copiar_archivo(ARCHIVO_COVID_BASE, INSTALACION_DATOS_COVID) ...@@ -192,6 +193,12 @@ copiar_archivo(ARCHIVO_COVID_BASE, INSTALACION_DATOS_COVID)
# Se compila CSVDiff # Se compila CSVDiff
compilar_csvdiff() compilar_csvdiff()
# Se crea el tópico de Kafka
crear_topico()
# Se modifica el tiempo de retención del tópico de Kafka
cambiar_retencion()
# Se crea el conector de FilePulse # Se crea el conector de FilePulse
crear_conector_filepulse() crear_conector_filepulse()
......
import sys
import logging
import subprocess
from enum import Enum
from typing import Optional, Union, List
class Resultado(Enum):
''' Resultado de ejecución de un subproceso '''
EXITO = 0
ERROR = 1
@classmethod
def _missing_(cls, _):
return cls.ERROR
def __bool__(self):
return self == Resultado.EXITO
class LogHandle:
''' Permite redirigir la salida estándar y de error a un logger '''
def __init__(self, level: int):
self.level = level
self.message = ''
def write(self, message: str):
self.message = self.message + message
while '\n' in self.message:
index = self.message.index('\n')
logging.log(self.level, self.message[:index])
self.message = self.message[(index + 1):]
def flush(self):
pass
class TerminalColor(Enum):
''' Colores para la terminal '''
GREY = '\x1b[38;5;248m'
BLUE = '\x1b[38;5;39m'
YELLOW = '\x1b[38;5;226m'
RED = '\x1b[38;5;196m'
BOLD_RED = '\x1b[31;1m'
RESET = '\x1b[0m'
def wrap(self, text: str):
return self.value + text + TerminalColor.RESET.value
class ConsoleFormatter(logging.Formatter):
''' Formatea los logs para que se vean bien en la terminal '''
def __init__(self, format: str):
super().__init__()
self.fmt = format
self.formats = \
{
logging.DEBUG: TerminalColor.GREY.wrap(self.fmt),
logging.INFO: TerminalColor.BLUE.wrap(self.fmt),
logging.WARNING: TerminalColor.YELLOW.wrap(self.fmt),
logging.ERROR: TerminalColor.RED.wrap(self.fmt),
logging.CRITICAL: TerminalColor.BOLD_RED.wrap(self.fmt)
}
def format(self, record: logging.LogRecord):
format = self.formats.get(record.levelno)
formatter = logging.Formatter(format)
return formatter.format(record)
def configurar_logs(archivo: Optional[str] = None):
''' Configura los logs para redirigirlos a la salida estándar y a un archivo '''
# Formatos para los logs
FORMATO_TERMINAL = '%(message)s'
FORMATO_ARCHIVO = '[%(asctime)-19s] [%(levelname)-8s]: %(message)s'
# Se redirigen los logs a la salida estándar y se configura el formato
manejadores = [logging.StreamHandler(sys.stdout)]
manejadores[0].setFormatter(ConsoleFormatter(FORMATO_TERMINAL))
# Si se especificó un archivo, se redirigen los logs a él también
if archivo:
manejadores.append(logging.FileHandler(archivo))
# Se configura el logger
logging.basicConfig \
(
datefmt= '%d/%m/%Y %H:%M:%S',
format= FORMATO_ARCHIVO,
handlers= manejadores,
level= logging.DEBUG
)
# Se redirigen las salidas estándar y de error al logger
sys.stdout = LogHandle(logging.DEBUG)
sys.stderr = LogHandle(logging.ERROR)
# Se desactivan los logs de la librería 'requests'
logging.getLogger('urllib3').setLevel(logging.WARNING)
def subproceso \
(
comandos: Union[str, List[str]],
mostrar_stdout: bool = False,
mostrar_stderr: bool = True,
regresar_stdout: bool = False,
redirigir_stderr: bool = False
):
''' Ejecuta un subproceso y redirige su salida a los logs '''
sub = subprocess.run(comandos, capture_output= True, shell= isinstance(comandos, str))
# Se imprimen los logs de la salida estándar
if mostrar_stdout:
for line in sub.stdout.decode('utf-8').split('\n'):
if len(line) > 0:
logging.debug(line)
# Se imprimen los logs de la salida de error
if mostrar_stderr:
for line in sub.stderr.decode('utf-8').split('\n'):
if len(line) > 0:
if not redirigir_stderr:
logging.error(line)
else:
logging.info(line)
# Se retorna la salida estándar si se solicita
if regresar_stdout:
return sub.stdout.decode('utf-8') if sub.returncode == Resultado.EXITO.value else ''
# Si la salida estándar contiene el mensaje 'error_code' se retorna un error
if 'error_code' in sub.stdout.decode('utf-8'):
return Resultado.ERROR
return Resultado(sub.returncode)
def info(mensaje: str):
''' Muestra un mensaje de información '''
logging.info(mensaje)
def error(mensaje: str, salir= True):
''' Muestra un mensaje de error y opcionalmente termina el programa '''
logging.error(mensaje)
# Se termina el programa si se solicita
if salir:
sys.exit(Resultado.ERROR.value)
\ No newline at end of file
...@@ -88,9 +88,9 @@ Una vez que se ha instalado el proceso de carga de datos en Kafka, es el archivo ...@@ -88,9 +88,9 @@ Una vez que se ha instalado el proceso de carga de datos en Kafka, es el archivo
Este archivo tiene una interfaz de línea de comandos que permite ejecutarlo de manera manual para obtener las diferencias entre dos archivos. La sintaxis es la siguiente: Este archivo tiene una interfaz de línea de comandos que permite ejecutarlo de manera manual para obtener las diferencias entre dos archivos. La sintaxis es la siguiente:
```sh ```sh
python3 Script.py [-h] [-b BASE] [-d DIFF] [-s] [-n] python3 Script.py [-h] [-b BASE] [-d DIFF] [-e] [-n]
options: optional arguments:
-h, --help show this help message and exit -h, --help show this help message and exit
-b BASE, --base BASE Archivo base -b BASE, --base BASE Archivo base
-d DIFF, --diff DIFF Archivo diff -d DIFF, --diff DIFF Archivo diff
......
...@@ -7,7 +7,6 @@ import glob ...@@ -7,7 +7,6 @@ import glob
import shutil import shutil
import argparse import argparse
import requests import requests
import subprocess
from os import cpu_count from os import cpu_count
from datetime import date from datetime import date
import zipfile_deflate64 as zipfile import zipfile_deflate64 as zipfile
...@@ -15,41 +14,43 @@ import zipfile_deflate64 as zipfile ...@@ -15,41 +14,43 @@ import zipfile_deflate64 as zipfile
# Ajustes # Ajustes
from Ajustes import * from Ajustes import *
# Se configuran los logs
Log.configurar_logs(INSTALACION_SCRIPT_LOG)
# Interfaz de línea de comandos # Interfaz de línea de comandos
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('-b', '--base', help='Archivo base', type=str) parser.add_argument('-b', '--base', help='Archivo base', type=str)
parser.add_argument('-d', '--diff', help='Archivo diff', type=str) parser.add_argument('-d', '--diff', help='Archivo diff', type=str)
parser.add_argument('-e', '--especial', help='Dia especial', action='store_true') parser.add_argument('-e', '--especial', help='Dia especial', action='store_true')
parser.add_argument('-n', '--no-descargar', help='No descargar archivo de datos del día actual', action='store_true') parser.add_argument('-n', '--no-descargar', help='No descargar archivo de datos del día actual', action='store_true')
parser.add_argument('-f', '--forzar_actual', help='Inicializa el tópico de Kafka con el archivo mas reciente', action='store_true')
argumentos = parser.parse_args() argumentos = parser.parse_args()
# Variables de la interfaz de línea de comandos # Variables de la interfaz de línea de comandos
DIA_ESPECIAL: bool = argumentos.especial DIA_ESPECIAL: bool = argumentos.especial
DESCARGAR_ACTUAL: bool = not argumentos.no_descargar DESCARGAR_ACTUAL: bool = not argumentos.no_descargar
FORZAR_ACTUAL: bool = argumentos.forzar_actual
# Se manda una solicitud a Kafka para obtener el tópico
respuesta = requests.get(ruta_kafka('rest_proxy') + '/topics/' + ajuste('kafka.topic'))
# Si no existe el tópico se termina el script # Se genera la ruta del archivo de datos del día actual
if not respuesta.ok: NOMBRE_ARCHIVO = 'datos_abiertos_' + date.today().strftime('%Y%m%d') + '.zip'
error('No se ha creado el tópico de Kafka') ARCHIVO_DIA_ACTUAL = INSTALACION_DATOS_COVID / NOMBRE_ARCHIVO
# Se genera el nombre del archivo de datos del día actual
ARCHIVO_DIA_ACTUAL = 'datos_abiertos_' + date.today().strftime('%Y%m%d') + '.zip'
# Solo se descarga el archivo de datos del día actual si no existe # Solo se descarga el archivo de datos del día actual si no existe
if DESCARGAR_ACTUAL and not (INSTALACION_DATOS_COVID / ARCHIVO_DIA_ACTUAL).exists(): if DESCARGAR_ACTUAL and not ARCHIVO_DIA_ACTUAL.exists():
# Se descarga el archivo de datos del día actual # Se descarga el archivo de datos del día actual
sub = subprocess.run(['wget', '-O', exito = Log.subproceso(['curl', ajuste('url_datos_dge'),
str(INSTALACION_DATOS_COVID / ARCHIVO_DIA_ACTUAL), '-o', str(ARCHIVO_DIA_ACTUAL)],
ajuste('url_datos_dge')]) True, redirigir_stderr= True)
# Se comprueba que la descarga haya sido exitosa # Se comprueba que la descarga haya sido exitosa
if sub.returncode != SALIR_EXITO: if not exito:
error(f'No se pudo descargar el archivo de datos del día actual')
# Se borra el archivo de datos del día actual ya que probablemente esté corrupto
if ARCHIVO_DIA_ACTUAL.exists():
ARCHIVO_DIA_ACTUAL.unlink()
Log.error(f'No se pudo descargar el archivo de datos del día actual')
else:
print('Archivo de datos del día actual descargado')
# Se obtiene una lista de todos los archivos de datos # Se obtiene una lista de todos los archivos de datos
ARCHIVOS_COVID = sorted(glob.glob(f'{INSTALACION_DATOS_COVID}/datos_abiertos_*.zip')) ARCHIVOS_COVID = sorted(glob.glob(f'{INSTALACION_DATOS_COVID}/datos_abiertos_*.zip'))
...@@ -58,6 +59,9 @@ ARCHIVOS_COVID = sorted(glob.glob(f'{INSTALACION_DATOS_COVID}/datos_abiertos_*.z ...@@ -58,6 +59,9 @@ ARCHIVOS_COVID = sorted(glob.glob(f'{INSTALACION_DATOS_COVID}/datos_abiertos_*.z
ARCHIVO_BASE = ARCHIVOS_COVID[-2] if argumentos.base is None else argumentos.base ARCHIVO_BASE = ARCHIVOS_COVID[-2] if argumentos.base is None else argumentos.base
ARCHIVO_DIFF = ARCHIVOS_COVID[-1] if argumentos.diff is None else argumentos.diff ARCHIVO_DIFF = ARCHIVOS_COVID[-1] if argumentos.diff is None else argumentos.diff
print(f'Archivo base: {ARCHIVO_BASE}')
print(f'Archivo diff: {ARCHIVO_DIFF}')
# Archivos de diferencias # Archivos de diferencias
ARCHIVO_INSERT = CARPETA_TEMPORAL / 'insert.csv' ARCHIVO_INSERT = CARPETA_TEMPORAL / 'insert.csv'
ARCHIVO_DELETE = CARPETA_TEMPORAL / 'delete.csv' ARCHIVO_DELETE = CARPETA_TEMPORAL / 'delete.csv'
...@@ -78,7 +82,7 @@ with zipfile.ZipFile(ARCHIVO_DIFF, 'r') as archivo_zip: ...@@ -78,7 +82,7 @@ with zipfile.ZipFile(ARCHIVO_DIFF, 'r') as archivo_zip:
ARCHIVO_DIFF_TEMPORAL = CARPETA_TEMPORAL / zipinfo.filename ARCHIVO_DIFF_TEMPORAL = CARPETA_TEMPORAL / zipinfo.filename
# Se hace la diferencia de ambos archivos # Se hace la diferencia de ambos archivos
sub = subprocess.run( exito = Log.subproceso(
[ [
INSTALACION_CSVDIFF, '-cfu', INSTALACION_CSVDIFF, '-cfu',
...@@ -99,15 +103,16 @@ sub = subprocess.run( ...@@ -99,15 +103,16 @@ sub = subprocess.run(
# Archivo base y de diferencias # Archivo base y de diferencias
str(ARCHIVO_BASE_TEMPORAL), str(ARCHIVO_BASE_TEMPORAL),
str(ARCHIVO_DIFF_TEMPORAL) str(ARCHIVO_DIFF_TEMPORAL)
]) ],
True)
# Se borran los archivos temporales # Se borran los archivos temporales
ARCHIVO_BASE_TEMPORAL.unlink() ARCHIVO_BASE_TEMPORAL.unlink()
ARCHIVO_DIFF_TEMPORAL.unlink() ARCHIVO_DIFF_TEMPORAL.unlink()
# Se comprueba que la diferencia haya sido exitosa # Se comprueba que la diferencia haya sido exitosa
if sub.returncode != SALIR_EXITO: if not exito:
error('No se pudo hacer la diferencia de los archivos') Log.error('No se pudo obtener la diferencia de los archivos')
# Se autodetecta el día especial # Se autodetecta el día especial
if ajuste('dia_especial.autodetectar') and not DIA_ESPECIAL: if ajuste('dia_especial.autodetectar') and not DIA_ESPECIAL:
...@@ -129,10 +134,10 @@ if any(INSTALACION_DIFERENCIAS.iterdir()): ...@@ -129,10 +134,10 @@ if any(INSTALACION_DIFERENCIAS.iterdir()):
# Se comprueba la respuesta de la solicitud # Se comprueba la respuesta de la solicitud
if not respuesta.ok: if not respuesta.ok:
error('No se pudo reiniciar el conector de FilePulse') Log.error('No se pudo reiniciar el conector de FilePulse')
# Se termina el script con un error # Se termina el script con un error
error('La carpeta de diferencias no esta vacia') Log.error('La carpeta de diferencias no esta vacia')
else: else:
# Se mueven los archivos de diferencias # Se mueven los archivos de diferencias
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment