From 5948ddca1327296ea257f3dcd7f3f3d751c11f73 Mon Sep 17 00:00:00 2001 From: "herve.le-bars" Date: Sat, 30 Nov 2024 21:48:01 +0100 Subject: [PATCH 1/6] feat: historisation des task executions --- .../d9d279892b5c_add_task_execution.py | 54 +++++++++++++++++++ backend/bloom/infra/database/sql_model.py | 7 ++- .../repositories/repository_task_execution.py | 17 ++++-- 3 files changed, 73 insertions(+), 5 deletions(-) create mode 100644 backend/alembic/versions/d9d279892b5c_add_task_execution.py diff --git a/backend/alembic/versions/d9d279892b5c_add_task_execution.py b/backend/alembic/versions/d9d279892b5c_add_task_execution.py new file mode 100644 index 00000000..ffc8381d --- /dev/null +++ b/backend/alembic/versions/d9d279892b5c_add_task_execution.py @@ -0,0 +1,54 @@ +"""add task execution + +Revision ID: d9d279892b5c +Revises: 7ba4634af5ad +Create Date: 2024-11-30 12:54:22.318425 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'd9d279892b5c' +down_revision = '7ba4634af5ad' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute("delete from tasks_executions") + op.drop_constraint(table_name="tasks_executions", + constraint_name="tasks_executions_pkey") + op.add_column( "tasks_executions", + sa.Column("id", + sa.Integer, + sa.Identity(), + primary_key=True, + index=True,)) + op.create_primary_key("tasks_executions_pkey", + "tasks_executions", + ["id"]) + #op.add_column( "tasks_executions", + # sa.Column("id", + # sa.Integer, + # sa.Identity(), + # primary_key=True, + # index=True,)) + op.add_column( "tasks_executions", + sa.Column( + "active", + sa.Boolean, + index=True, + default=False)) + pass + + +def downgrade() -> None: + op.drop_column("tasks_executions","active") + op.drop_column("tasks_executions","id") + op.create_unique_constraint("tasks_executions_pkey", + "tasks_executions", + ["task_name"], + ) + pass diff --git a/backend/bloom/infra/database/sql_model.py b/backend/bloom/infra/database/sql_model.py index 57a3f855..9cfa521a 100644 --- a/backend/bloom/infra/database/sql_model.py +++ b/backend/bloom/infra/database/sql_model.py @@ -10,7 +10,8 @@ Integer, Interval, String, - PrimaryKeyConstraint + PrimaryKeyConstraint, + Identity ) from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.sql import func, select @@ -224,10 +225,12 @@ class Segment(Base): class TaskExecution(Base): __tablename__ = "tasks_executions" - task_name = Column("task_name", String, primary_key=True) + id = Column("id", Integer, Identity(), primary_key=True) + task_name = Column("task_name", String) point_in_time = Column("point_in_time", DateTime(timezone=True)) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + active = Column("active", Boolean, nullable=False) class RelSegmentZone(Base): diff --git a/backend/bloom/infra/repositories/repository_task_execution.py b/backend/bloom/infra/repositories/repository_task_execution.py index b3bdefae..a8094ae0 100644 --- a/backend/bloom/infra/repositories/repository_task_execution.py +++ b/backend/bloom/infra/repositories/repository_task_execution.py @@ -3,13 +3,16 @@ from bloom.infra.database import sql_model from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.sql.expression import update from sqlalchemy.orm import Session class TaskExecutionRepository: @staticmethod def get_point_in_time(session: Session, task_name: str) -> datetime: - stmt = select(sql_model.TaskExecution).where(sql_model.TaskExecution.task_name == task_name) + stmt = select(sql_model.TaskExecution)\ + .where(sql_model.TaskExecution.task_name == task_name)\ + .where(sql_model.TaskExecution.active == True) e = session.execute(stmt).scalar() if not e: return datetime.fromtimestamp(0, timezone.utc) @@ -17,6 +20,14 @@ def get_point_in_time(session: Session, task_name: str) -> datetime: return e.point_in_time def set_point_in_time(session: Session, task_name: str, pit: datetime) -> None: - stmt = insert(sql_model.TaskExecution).values(task_name=task_name, point_in_time=pit).on_conflict_do_update( - index_elements=["task_name"], set_=dict(point_in_time=pit, updated_at=datetime.now(timezone.utc))) + stmt= ( update(sql_model.TaskExecution) + .where(sql_model.TaskExecution.task_name==task_name) + .where(sql_model.TaskExecution.active==True) + .values(active=False) + ) + session.execute(stmt) + stmt = insert(sql_model.TaskExecution).values( + task_name=task_name, + point_in_time=pit, + active=True) session.execute(stmt) From 88089c5fc4a9910e4f90eb24b7d7ac950cd312b2 Mon Sep 17 00:00:00 2001 From: "herve.le-bars" Date: Sat, 30 Nov 2024 21:59:43 +0100 Subject: [PATCH 2/6] feat: add tracing for task load_spire_data_from_api --- .../bloom/infra/repositories/repository_task_execution.py | 3 ++- backend/bloom/tasks/load_spire_data_from_api.py | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/backend/bloom/infra/repositories/repository_task_execution.py b/backend/bloom/infra/repositories/repository_task_execution.py index a8094ae0..6cf01fb4 100644 --- a/backend/bloom/infra/repositories/repository_task_execution.py +++ b/backend/bloom/infra/repositories/repository_task_execution.py @@ -29,5 +29,6 @@ def set_point_in_time(session: Session, task_name: str, pit: datetime) -> None: stmt = insert(sql_model.TaskExecution).values( task_name=task_name, point_in_time=pit, - active=True) + active=True, + updated_at=datetime(timezone.utc)) session.execute(stmt) diff --git a/backend/bloom/tasks/load_spire_data_from_api.py b/backend/bloom/tasks/load_spire_data_from_api.py index 4a6ee28c..6948f9bf 100644 --- a/backend/bloom/tasks/load_spire_data_from_api.py +++ b/backend/bloom/tasks/load_spire_data_from_api.py @@ -9,6 +9,7 @@ from bloom.infra.http.spire_api_utils import map_raw_vessels_to_domain from bloom.logger import logger from pydantic import ValidationError +from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository def run(dump_path: str) -> None: @@ -24,9 +25,10 @@ def run(dump_path: str) -> None: vessels: list[Vessel] = vessel_repository.get_vessels_list(session) if len(vessels) > 0: raw_vessels = spire_traffic_usecase.get_raw_vessels_from_spire(vessels) + current_datetime=datetime.now(timezone.utc) if dump_path is not None: try: - now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + now =current_datetime.strftime("%Y-%m-%dT%H:%M:%S") dump_file = Path(args.dump_path, f"spire_{now}").with_suffix(".json") with dump_file.open("wt") as handle: json.dump(raw_vessels, handle) @@ -38,6 +40,9 @@ def run(dump_path: str) -> None: spire_ais_data, session, ) + TaskExecutionRepository.set_point_in_time(session, + "load_spire_data_from_api", + current_datetime) session.commit() except ValidationError as e: logger.error("Erreur de validation des données JSON") From 855f6225e2f2ef0d2b26589fa36d692dbf60534c Mon Sep 17 00:00:00 2001 From: "herve.le-bars" Date: Sat, 30 Nov 2024 22:39:26 +0100 Subject: [PATCH 3/6] feat: keep data from exiting task_executions table in alembic migration --- .../d9d279892b5c_add_task_execution.py | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/backend/alembic/versions/d9d279892b5c_add_task_execution.py b/backend/alembic/versions/d9d279892b5c_add_task_execution.py index ffc8381d..1aa69a2f 100644 --- a/backend/alembic/versions/d9d279892b5c_add_task_execution.py +++ b/backend/alembic/versions/d9d279892b5c_add_task_execution.py @@ -7,6 +7,7 @@ """ from alembic import op import sqlalchemy as sa +from sqlalchemy.sql import func # revision identifiers, used by Alembic. @@ -17,34 +18,43 @@ def upgrade() -> None: - op.execute("delete from tasks_executions") op.drop_constraint(table_name="tasks_executions", constraint_name="tasks_executions_pkey") - op.add_column( "tasks_executions", - sa.Column("id", - sa.Integer, - sa.Identity(), - primary_key=True, - index=True,)) - op.create_primary_key("tasks_executions_pkey", - "tasks_executions", - ["id"]) - #op.add_column( "tasks_executions", - # sa.Column("id", - # sa.Integer, - # sa.Identity(), - # primary_key=True, - # index=True,)) - op.add_column( "tasks_executions", + op.rename_table('tasks_executions','tasks_executions_tmp') + op.create_table("tasks_executions", + sa.Column("id", sa.Integer(),sa.Identity(), primary_key=True, index=True), + sa.Column("task_name", sa.String), + sa.Column("point_in_time", sa.DateTime(timezone=True)), sa.Column( - "active", - sa.Boolean, - index=True, - default=False)) + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=func.now(), + ), + sa.Column("updated_at", sa.DateTime(timezone=True), onupdate=func.now()), + sa.Column("active", + sa.Boolean, + index=True, + default=False), + ) + op.execute("insert into tasks_executions " + +"(task_name,point_in_time,created_at,updated_at,active) " + +"select task_name,point_in_time,created_at,updated_at,true " + +"from tasks_executions_tmp") + #conn=op.get_bind() + #query=conn.execute("select task_name,point_in_time,created_at,updated_at from tasks_executions_tmp") + #results = query.fetchall() + #executions=[{'task_name': row[0], + # 'point_in_time': row[1], + # 'created_at': row[2], + # 'updated_at': row[3]} for row in results] + #op.bulk_insert('tasks_executions',executions) + op.drop_table('tasks_executions_tmp') pass def downgrade() -> None: + op.execute("delete from tasks_executions where active=False") op.drop_column("tasks_executions","active") op.drop_column("tasks_executions","id") op.create_unique_constraint("tasks_executions_pkey", From 2c321c14ce34070f1aa384b2be0f95dc39336187 Mon Sep 17 00:00:00 2001 From: "herve.le-bars" Date: Sat, 30 Nov 2024 22:50:06 +0100 Subject: [PATCH 4/6] doc: add comments --- .../d9d279892b5c_add_task_execution.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/backend/alembic/versions/d9d279892b5c_add_task_execution.py b/backend/alembic/versions/d9d279892b5c_add_task_execution.py index 1aa69a2f..64a5c2a9 100644 --- a/backend/alembic/versions/d9d279892b5c_add_task_execution.py +++ b/backend/alembic/versions/d9d279892b5c_add_task_execution.py @@ -18,9 +18,17 @@ def upgrade() -> None: + """ + Create a new task_executions table that permit to store historical data + for each task execution + The goal is to be able to detect timeframe not covered by Spire API interrogation + """ + # drop constraint from existing table to free the constraint name op.drop_constraint(table_name="tasks_executions", constraint_name="tasks_executions_pkey") + # rename existing table to keep existing data op.rename_table('tasks_executions','tasks_executions_tmp') + # create the new task_executions table with id and active columns in addition op.create_table("tasks_executions", sa.Column("id", sa.Integer(),sa.Identity(), primary_key=True, index=True), sa.Column("task_name", sa.String), @@ -37,26 +45,23 @@ def upgrade() -> None: index=True, default=False), ) + # copy of existing data to new table with active=True op.execute("insert into tasks_executions " +"(task_name,point_in_time,created_at,updated_at,active) " +"select task_name,point_in_time,created_at,updated_at,true " +"from tasks_executions_tmp") - #conn=op.get_bind() - #query=conn.execute("select task_name,point_in_time,created_at,updated_at from tasks_executions_tmp") - #results = query.fetchall() - #executions=[{'task_name': row[0], - # 'point_in_time': row[1], - # 'created_at': row[2], - # 'updated_at': row[3]} for row in results] - #op.bulk_insert('tasks_executions',executions) + # drop old table op.drop_table('tasks_executions_tmp') pass def downgrade() -> None: + # delete all lines active=False as they have no equivalent in old task_executions op.execute("delete from tasks_executions where active=False") + # drop active and id table op.drop_column("tasks_executions","active") op.drop_column("tasks_executions","id") + # recreate the primary unique key constraint of old table op.create_unique_constraint("tasks_executions_pkey", "tasks_executions", ["task_name"], From 563fb02f96f9d4ab1656cd05ea241d9634372172 Mon Sep 17 00:00:00 2001 From: RV Date: Mon, 2 Dec 2024 11:37:33 +0100 Subject: [PATCH 5/6] feat: TaskExecution add delta column time to last task execution --- .../versions/d9d279892b5c_add_task_execution.py | 4 ++++ backend/bloom/infra/database/sql_model.py | 1 + .../infra/repositories/repository_task_execution.py | 11 ++++++++--- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/backend/alembic/versions/d9d279892b5c_add_task_execution.py b/backend/alembic/versions/d9d279892b5c_add_task_execution.py index 64a5c2a9..28a4c161 100644 --- a/backend/alembic/versions/d9d279892b5c_add_task_execution.py +++ b/backend/alembic/versions/d9d279892b5c_add_task_execution.py @@ -40,6 +40,10 @@ def upgrade() -> None: server_default=func.now(), ), sa.Column("updated_at", sa.DateTime(timezone=True), onupdate=func.now()), + sa.Column("delta", + sa.Interval, + index=True, + nullable=True), sa.Column("active", sa.Boolean, index=True, diff --git a/backend/bloom/infra/database/sql_model.py b/backend/bloom/infra/database/sql_model.py index 9cfa521a..311b01ba 100644 --- a/backend/bloom/infra/database/sql_model.py +++ b/backend/bloom/infra/database/sql_model.py @@ -230,6 +230,7 @@ class TaskExecution(Base): point_in_time = Column("point_in_time", DateTime(timezone=True)) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + delta = Column("delta", Interval, nullable=False) active = Column("active", Boolean, nullable=False) diff --git a/backend/bloom/infra/repositories/repository_task_execution.py b/backend/bloom/infra/repositories/repository_task_execution.py index 6cf01fb4..51d9172b 100644 --- a/backend/bloom/infra/repositories/repository_task_execution.py +++ b/backend/bloom/infra/repositories/repository_task_execution.py @@ -3,7 +3,7 @@ from bloom.infra.database import sql_model from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert -from sqlalchemy.sql.expression import update +from sqlalchemy.sql.expression import update,asc,desc from sqlalchemy.orm import Session @@ -26,9 +26,14 @@ def set_point_in_time(session: Session, task_name: str, pit: datetime) -> None: .values(active=False) ) session.execute(stmt) + subquery_delta=select(pit-sql_model.TaskExecution.point_in_time)\ + .select_from(sql_model.TaskExecution)\ + .where(sql_model.TaskExecution.task_name==task_name)\ + .order_by(desc(sql_model.TaskExecution.point_in_time))\ + .limit(1).subquery() stmt = insert(sql_model.TaskExecution).values( task_name=task_name, point_in_time=pit, - active=True, - updated_at=datetime(timezone.utc)) + delta=subquery_delta, + active=True) session.execute(stmt) From 7aa02059528de4b91df4e2d75f04d4b9b46b5475 Mon Sep 17 00:00:00 2001 From: RV Date: Mon, 2 Dec 2024 16:52:35 +0100 Subject: [PATCH 6/6] feat: add generation of task_execution for all past spire_ais_data calls --- .../versions/d9d279892b5c_add_task_execution.py | 16 ++++++++++++++++ .../repositories/repository_task_execution.py | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/backend/alembic/versions/d9d279892b5c_add_task_execution.py b/backend/alembic/versions/d9d279892b5c_add_task_execution.py index 28a4c161..1a7bb9c3 100644 --- a/backend/alembic/versions/d9d279892b5c_add_task_execution.py +++ b/backend/alembic/versions/d9d279892b5c_add_task_execution.py @@ -56,6 +56,21 @@ def upgrade() -> None: +"from tasks_executions_tmp") # drop old table op.drop_table('tasks_executions_tmp') + + # Retrieve all historical spire api interrogation from spire_ais_data table + op.execute( + """insert into public.tasks_executions (task_name,point_in_time,created_at,delta,active) + select distinct + 'load_spire_data_from_api' as "task_name", + T1.created_at as "point_in_time", + T1.created_at, + T1.created_at-(select distinct created_at from spire_ais_data where created_at < T1.created_at group by created_at order by created_at desc limit 1) as "delta", + case when T1.created_at = (select MAX(created_at) from spire_ais_data) and not EXISTS(select 1 from public.tasks_executions where task_name = 'load_spire_data_from_api' and active = True) then true else false end as "active" + from spire_ais_data T1 + where T1.created_at not in (select point_in_time from public.tasks_executions where task_name = 'load_spire_data_from_api') + group by T1.created_at + order by T1.created_at desc + """) pass @@ -63,6 +78,7 @@ def downgrade() -> None: # delete all lines active=False as they have no equivalent in old task_executions op.execute("delete from tasks_executions where active=False") # drop active and id table + op.drop_column("tasks_executions","delta") op.drop_column("tasks_executions","active") op.drop_column("tasks_executions","id") # recreate the primary unique key constraint of old table diff --git a/backend/bloom/infra/repositories/repository_task_execution.py b/backend/bloom/infra/repositories/repository_task_execution.py index 51d9172b..46d88141 100644 --- a/backend/bloom/infra/repositories/repository_task_execution.py +++ b/backend/bloom/infra/repositories/repository_task_execution.py @@ -13,7 +13,7 @@ def get_point_in_time(session: Session, task_name: str) -> datetime: stmt = select(sql_model.TaskExecution)\ .where(sql_model.TaskExecution.task_name == task_name)\ .where(sql_model.TaskExecution.active == True) - e = session.execute(stmt).scalar() + e = session.execute(stmt).scalar_one_or_none() if not e: return datetime.fromtimestamp(0, timezone.utc) else: