Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
K
KafkaConector30Dias
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Alonso Ballesteros Torres
KafkaConector30Dias
Commits
f21ae81e
Commit
f21ae81e
authored
Jun 28, 2023
by
Alonso Ballesteros Torres
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
- Corrección del rango de fechas
- Corrección de error al convertir 'FECHA_DEF' a datetimes
parent
091cf0da
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
20 additions
and
18 deletions
+20
-18
conector.py
+10
-11
general.py
+5
-7
modelos.py
+5
-0
No files found.
conector.py
View file @
f21ae81e
...
@@ -105,10 +105,12 @@ def insertar_registros_locales(insertados: pd.DataFrame):
...
@@ -105,10 +105,12 @@ def insertar_registros_locales(insertados: pd.DataFrame):
ids_eliminados
=
no_positivos
[
'ID_REGISTRO'
]
.
to_list
()
ids_eliminados
=
no_positivos
[
'ID_REGISTRO'
]
.
to_list
()
eliminar
:
sa
.
sql
.
Delete
=
sa
.
delete
(
RegistroLocal
)
eliminar
:
sa
.
sql
.
Delete
=
sa
.
delete
(
RegistroLocal
)
CONEXION_LOCAL
.
sesion
.
execute
(
eliminar
.
where
(
RegistroLocal
.
ID_REGISTRO
.
in_
(
ids_eliminados
)))
CONEXION_LOCAL
.
sesion
.
execute
(
eliminar
.
where
(
RegistroLocal
.
ID_REGISTRO
.
in_
(
ids_eliminados
)))
CONEXION_LOCAL
.
sesion
.
flush
()
# Se insertan unicamente los registros positivos
# Se insertan unicamente los registros positivos
if
not
positivos
.
empty
:
if
not
positivos
.
empty
:
CONEXION_LOCAL
.
sesion
.
execute
(
sa
.
insert
(
RegistroLocal
),
positivos
.
to_dict
(
orient
=
'records'
))
CONEXION_LOCAL
.
sesion
.
execute
(
sa
.
insert
(
RegistroLocal
),
positivos
.
to_dict
(
orient
=
'records'
))
CONEXION_LOCAL
.
sesion
.
flush
()
def
eliminar_registros_locales
(
eliminados
:
pd
.
DataFrame
):
def
eliminar_registros_locales
(
eliminados
:
pd
.
DataFrame
):
''' Elimina los registros individuales de la lista ELIMINADOS de la base de datos local '''
''' Elimina los registros individuales de la lista ELIMINADOS de la base de datos local '''
...
@@ -120,6 +122,7 @@ def eliminar_registros_locales(eliminados: pd.DataFrame):
...
@@ -120,6 +122,7 @@ def eliminar_registros_locales(eliminados: pd.DataFrame):
ids_eliminados
=
eliminados
[
'ID_REGISTRO'
]
.
to_list
()
ids_eliminados
=
eliminados
[
'ID_REGISTRO'
]
.
to_list
()
eliminar
:
sa
.
sql
.
Delete
=
sa
.
delete
(
RegistroLocal
)
eliminar
:
sa
.
sql
.
Delete
=
sa
.
delete
(
RegistroLocal
)
CONEXION_LOCAL
.
sesion
.
execute
(
eliminar
.
where
(
RegistroLocal
.
ID_REGISTRO
.
in_
(
ids_eliminados
)))
CONEXION_LOCAL
.
sesion
.
execute
(
eliminar
.
where
(
RegistroLocal
.
ID_REGISTRO
.
in_
(
ids_eliminados
)))
CONEXION_LOCAL
.
sesion
.
flush
()
def
eliminar_registros_anteriores
():
def
eliminar_registros_anteriores
():
''' Elimina los registros de la base local que tengan una antigüedad mayor a 30 días '''
''' Elimina los registros de la base local que tengan una antigüedad mayor a 30 días '''
...
@@ -152,18 +155,12 @@ def insertar_registros_remotos():
...
@@ -152,18 +155,12 @@ def insertar_registros_remotos():
# Se cargan los registros almacenados localmente
# Se cargan los registros almacenados localmente
registros
=
pd
.
read_sql
(
CONEXION_LOCAL
.
sesion
.
query
(
RegistroLocal
)
.
statement
,
CONEXION_LOCAL
.
enlace
)
registros
=
pd
.
read_sql
(
CONEXION_LOCAL
.
sesion
.
query
(
RegistroLocal
)
.
statement
,
CONEXION_LOCAL
.
enlace
)
# Se agrega el campo de resultado
registros
[
'RESULTADO'
]
=
'POSITIVO'
# Se convierten las fechas a formato datetime
# Se convierten las fechas a formato datetime
registros
[
'FECHA_SINTOMAS'
]
=
pd
.
to_datetime
(
registros
[
'FECHA_SINTOMAS'
])
registros
[
'FECHA_SINTOMAS'
]
=
pd
.
to_datetime
(
registros
[
'FECHA_SINTOMAS'
])
registros
[
'FECHA_DEF'
]
=
pd
.
to_datetime
(
registros
[
'FECHA_DEF'
],
errors
=
'coerce'
)
registros
[
'FECHA_DEF'
]
=
pd
.
to_datetime
(
registros
[
'FECHA_DEF'
],
format
=
'
%
Y-
%
m-
%
d'
,
errors
=
'coerce'
)
# Se convierten los valores NaT a None
registros
.
replace
({
pd
.
NaT
:
None
},
inplace
=
True
)
# Se guardan los registros en la base de datos remota
# Se guardan los registros en la base de datos remota
general
.
cargar_valores_reales
(
CONEXION_REMOTA
,
registros
,
FECHA_LOCAL
.
actual
-
pd
.
Timedelta
(
days
=
1
)
)
general
.
cargar_valores_reales
(
CONEXION_REMOTA
,
registros
,
FECHA_LOCAL
.
actual
)
# ------------------------- Funciones del consumidor --------------------------
# ------------------------- Funciones del consumidor --------------------------
...
@@ -315,6 +312,7 @@ def ciclo():
...
@@ -315,6 +312,7 @@ def ciclo():
# Se hacen efectivos los cambios en la base de datos local
# Se hacen efectivos los cambios en la base de datos local
CONEXION_LOCAL
.
sesion
.
commit
()
CONEXION_LOCAL
.
sesion
.
commit
()
CONEXION_LOCAL
.
limpiar
()
# Se marca la transacción como finalizada
# Se marca la transacción como finalizada
TRANSACCION_FINALIZADA
=
True
TRANSACCION_FINALIZADA
=
True
...
@@ -332,17 +330,18 @@ def ciclo():
...
@@ -332,17 +330,18 @@ def ciclo():
# Se actualiza la base de datos remota hasta que se alcance el dia actual
# Se actualiza la base de datos remota hasta que se alcance el dia actual
while
FECHA_LOCAL
<
DIA_ACTUAL
:
while
FECHA_LOCAL
<
DIA_ACTUAL
:
print
(
'Día local: '
,
FECHA_LOCAL
.
actual
.
date
())
print
(
'Día local: '
,
FECHA_LOCAL
.
actual
.
date
())
print
(
'Actualizando la base de datos remota...'
)
print
(
'Actualizando la base de datos
local y
remota...'
)
# Se actualizan los datos remotos
# Se actualizan los datos
locales y
remotos
eliminar_registros_anteriores
()
eliminar_registros_anteriores
()
insertar_registros_remotos
()
insertar_registros_remotos
()
print
(
'Persistiendo los cambios en las bases de datos...'
)
print
(
'Persistiendo los cambios en las bases de datos...'
)
# Se hacen efectivos los cambios en las bases de datos
# Se hacen efectivos los cambios en las bases de datos
CONEXION_LOCAL
.
sesion
.
commit
()
CONEXION_REMOTA
.
sesion
.
commit
()
CONEXION_REMOTA
.
sesion
.
commit
()
CONEXION_LOCAL
.
sesion
.
commit
()
CONEXION_LOCAL
.
limpiar
()
print
(
' Base de datos remota actualizada!
\n
'
)
print
(
' Base de datos remota actualizada!
\n
'
)
...
...
general.py
View file @
f21ae81e
...
@@ -30,9 +30,10 @@ def generar_dataframe(registros: List[RegistroKafka]):
...
@@ -30,9 +30,10 @@ def generar_dataframe(registros: List[RegistroKafka]):
# Se convierten los campos de fecha
# Se convierten los campos de fecha
df
[
'FECHA_ACTUALIZACION'
]
=
pd
.
to_datetime
(
df
[
'FECHA_ACTUALIZACION'
])
df
[
'FECHA_ACTUALIZACION'
]
=
pd
.
to_datetime
(
df
[
'FECHA_ACTUALIZACION'
])
df
[
'FECHA_SINTOMAS'
]
=
pd
.
to_datetime
(
df
[
'FECHA_SINTOMAS'
])
df
[
'FECHA_SINTOMAS'
]
=
pd
.
to_datetime
(
df
[
'FECHA_SINTOMAS'
])
df
[
'FECHA_DEF'
]
=
pd
.
to_datetime
(
df
[
'FECHA_DEF'
],
format
=
'AAAA-MM-DD'
,
errors
=
'coerce'
)
df
[
'FECHA_DEF'
]
=
pd
.
to_datetime
(
df
[
'FECHA_DEF'
],
format
=
'
%
Y-
%
m-
%
d'
,
errors
=
'coerce'
)
df
[
'FECHA_DEF'
]
.
replace
({
pd
.
NaT
:
None
},
inplace
=
True
)
# Función 'resultado' que interpreta la columna de 'CLASIFICACION_FINAL'
y 'FECHA_DEF'
# Función 'resultado' que interpreta la columna de 'CLASIFICACION_FINAL'
def
resultado
(
valor
):
def
resultado
(
valor
):
if
valor
in
[
1
,
2
,
3
]:
if
valor
in
[
1
,
2
,
3
]:
return
'CONFIRMADO'
return
'CONFIRMADO'
...
@@ -54,9 +55,6 @@ def generar_dataframe(registros: List[RegistroKafka]):
...
@@ -54,9 +55,6 @@ def generar_dataframe(registros: List[RegistroKafka]):
etiquetas
=
[
str
(
grupo
)
for
grupo
in
GRUPO_EDAD
]
etiquetas
=
[
str
(
grupo
)
for
grupo
in
GRUPO_EDAD
]
df
[
'GRUPO_EDAD'
]
=
pd
.
cut
(
df
[
'EDAD'
],
grupos
,
labels
=
etiquetas
)
df
[
'GRUPO_EDAD'
]
=
pd
.
cut
(
df
[
'EDAD'
],
grupos
,
labels
=
etiquetas
)
# Se reemplazan todos los valores de 'NaT' por nulos
df
.
replace
({
pd
.
NaT
:
None
},
inplace
=
True
)
return
df
return
df
def
calcular_confirmados
(
registros
:
pd
.
DataFrame
,
tipo
:
TIPO_VALOR
,
fecha
:
pd
.
Timestamp
):
def
calcular_confirmados
(
registros
:
pd
.
DataFrame
,
tipo
:
TIPO_VALOR
,
fecha
:
pd
.
Timestamp
):
...
@@ -66,11 +64,11 @@ def calcular_confirmados(registros: pd.DataFrame, tipo: TIPO_VALOR, fecha: pd.Ti
...
@@ -66,11 +64,11 @@ def calcular_confirmados(registros: pd.DataFrame, tipo: TIPO_VALOR, fecha: pd.Ti
confirmados
=
pd
.
DataFrame
()
confirmados
=
pd
.
DataFrame
()
fecha_inicial
=
fecha
-
pd
.
Timedelta
(
days
=
30
)
fecha_inicial
=
fecha
-
pd
.
Timedelta
(
days
=
30
)
datos_periodo
=
registros
[
registros
[
tipo
.
value
]
>=
fecha_inicial
]
datos_periodo
=
registros
[
registros
[
tipo
.
value
]
>=
fecha_inicial
]
positivos
=
datos_periodo
[
datos_periodo
[
'RESULTADO'
]
==
'POSITIVO'
]
datos_periodo
=
datos_periodo
[
datos_periodo
[
tipo
.
value
]
<=
fecha
]
# Se calcula el número de registros confirmados agrupados por entidad y municipio
# Se calcula el número de registros confirmados agrupados por entidad y municipio
for
grupo
in
GRUPO_EDAD
:
for
grupo
in
GRUPO_EDAD
:
grupo_confirmados
=
positivos
[
positivos
[
'GRUPO_EDAD'
]
==
str
(
grupo
)]
grupo_confirmados
=
datos_periodo
[
datos_periodo
[
'GRUPO_EDAD'
]
==
str
(
grupo
)]
grupo_confirmados
=
grupo_confirmados
.
groupby
([
'ENTIDAD_RES'
,
'MUNICIPIO_RES'
])
grupo_confirmados
=
grupo_confirmados
.
groupby
([
'ENTIDAD_RES'
,
'MUNICIPIO_RES'
])
grupo_confirmados
=
grupo_confirmados
.
size
()
.
reset_index
(
name
=
'CONFIRMADOS'
)
grupo_confirmados
=
grupo_confirmados
.
size
()
.
reset_index
(
name
=
'CONFIRMADOS'
)
grupo_confirmados
[
'GRUPO_EDAD'
]
=
str
(
grupo
)
grupo_confirmados
[
'GRUPO_EDAD'
]
=
str
(
grupo
)
...
...
modelos.py
View file @
f21ae81e
...
@@ -58,6 +58,11 @@ class ConexionSQLite(Conexion):
...
@@ -58,6 +58,11 @@ class ConexionSQLite(Conexion):
cadena_conexion
=
f
'sqlite:///{RUTA / base_de_datos}'
cadena_conexion
=
f
'sqlite:///{RUTA / base_de_datos}'
super
()
.
__init__
(
cadena_conexion
,
SQLite
.
metadata
)
super
()
.
__init__
(
cadena_conexion
,
SQLite
.
metadata
)
def
limpiar
(
self
):
''' Libera el espacio no utilizado en la base de datos '''
self
.
sesion
.
execute
(
sa
.
text
(
'VACUUM'
))
class
ConexionPostgreSQL
(
Conexion
):
class
ConexionPostgreSQL
(
Conexion
):
''' Representa una conexión a una base de datos de PostgreSQL '''
''' Representa una conexión a una base de datos de PostgreSQL '''
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment