diff --git a/backend/alembic/versions/d9d279892b5c_add_task_execution.py b/backend/alembic/versions/d9d279892b5c_add_task_execution.py new file mode 100644 index 00000000..1a7bb9c3 --- /dev/null +++ b/backend/alembic/versions/d9d279892b5c_add_task_execution.py @@ -0,0 +1,89 @@ +"""add task execution + +Revision ID: d9d279892b5c +Revises: 7ba4634af5ad +Create Date: 2024-11-30 12:54:22.318425 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.sql import func + + +# revision identifiers, used by Alembic. +revision = 'd9d279892b5c' +down_revision = '7ba4634af5ad' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + """ + Create a new task_executions table that permit to store historical data + for each task execution + The goal is to be able to detect timeframe not covered by Spire API interrogation + """ + # drop constraint from existing table to free the constraint name + op.drop_constraint(table_name="tasks_executions", + constraint_name="tasks_executions_pkey") + # rename existing table to keep existing data + op.rename_table('tasks_executions','tasks_executions_tmp') + # create the new task_executions table with id and active columns in addition + op.create_table("tasks_executions", + sa.Column("id", sa.Integer(),sa.Identity(), primary_key=True, index=True), + sa.Column("task_name", sa.String), + sa.Column("point_in_time", sa.DateTime(timezone=True)), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=func.now(), + ), + sa.Column("updated_at", sa.DateTime(timezone=True), onupdate=func.now()), + sa.Column("delta", + sa.Interval, + index=True, + nullable=True), + sa.Column("active", + sa.Boolean, + index=True, + default=False), + ) + # copy of existing data to new table with active=True + op.execute("insert into tasks_executions " + +"(task_name,point_in_time,created_at,updated_at,active) " + +"select task_name,point_in_time,created_at,updated_at,true " + +"from tasks_executions_tmp") + # drop old table + op.drop_table('tasks_executions_tmp') + + # Retrieve all historical spire api interrogation from spire_ais_data table + op.execute( + """insert into public.tasks_executions (task_name,point_in_time,created_at,delta,active) + select distinct + 'load_spire_data_from_api' as "task_name", + T1.created_at as "point_in_time", + T1.created_at, + T1.created_at-(select distinct created_at from spire_ais_data where created_at < T1.created_at group by created_at order by created_at desc limit 1) as "delta", + case when T1.created_at = (select MAX(created_at) from spire_ais_data) and not EXISTS(select 1 from public.tasks_executions where task_name = 'load_spire_data_from_api' and active = True) then true else false end as "active" + from spire_ais_data T1 + where T1.created_at not in (select point_in_time from public.tasks_executions where task_name = 'load_spire_data_from_api') + group by T1.created_at + order by T1.created_at desc + """) + pass + + +def downgrade() -> None: + # delete all lines active=False as they have no equivalent in old task_executions + op.execute("delete from tasks_executions where active=False") + # drop active and id table + op.drop_column("tasks_executions","delta") + op.drop_column("tasks_executions","active") + op.drop_column("tasks_executions","id") + # recreate the primary unique key constraint of old table + op.create_unique_constraint("tasks_executions_pkey", + "tasks_executions", + ["task_name"], + ) + pass diff --git a/backend/bloom/infra/database/sql_model.py b/backend/bloom/infra/database/sql_model.py index 57a3f855..311b01ba 100644 --- a/backend/bloom/infra/database/sql_model.py +++ b/backend/bloom/infra/database/sql_model.py @@ -10,7 +10,8 @@ Integer, Interval, String, - PrimaryKeyConstraint + PrimaryKeyConstraint, + Identity ) from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.sql import func, select @@ -224,10 +225,13 @@ class Segment(Base): class TaskExecution(Base): __tablename__ = "tasks_executions" - task_name = Column("task_name", String, primary_key=True) + id = Column("id", Integer, Identity(), primary_key=True) + task_name = Column("task_name", String) point_in_time = Column("point_in_time", DateTime(timezone=True)) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + delta = Column("delta", Interval, nullable=False) + active = Column("active", Boolean, nullable=False) class RelSegmentZone(Base): diff --git a/backend/bloom/infra/repositories/repository_task_execution.py b/backend/bloom/infra/repositories/repository_task_execution.py index b3bdefae..46d88141 100644 --- a/backend/bloom/infra/repositories/repository_task_execution.py +++ b/backend/bloom/infra/repositories/repository_task_execution.py @@ -3,20 +3,37 @@ from bloom.infra.database import sql_model from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.sql.expression import update,asc,desc from sqlalchemy.orm import Session class TaskExecutionRepository: @staticmethod def get_point_in_time(session: Session, task_name: str) -> datetime: - stmt = select(sql_model.TaskExecution).where(sql_model.TaskExecution.task_name == task_name) - e = session.execute(stmt).scalar() + stmt = select(sql_model.TaskExecution)\ + .where(sql_model.TaskExecution.task_name == task_name)\ + .where(sql_model.TaskExecution.active == True) + e = session.execute(stmt).scalar_one_or_none() if not e: return datetime.fromtimestamp(0, timezone.utc) else: return e.point_in_time def set_point_in_time(session: Session, task_name: str, pit: datetime) -> None: - stmt = insert(sql_model.TaskExecution).values(task_name=task_name, point_in_time=pit).on_conflict_do_update( - index_elements=["task_name"], set_=dict(point_in_time=pit, updated_at=datetime.now(timezone.utc))) + stmt= ( update(sql_model.TaskExecution) + .where(sql_model.TaskExecution.task_name==task_name) + .where(sql_model.TaskExecution.active==True) + .values(active=False) + ) + session.execute(stmt) + subquery_delta=select(pit-sql_model.TaskExecution.point_in_time)\ + .select_from(sql_model.TaskExecution)\ + .where(sql_model.TaskExecution.task_name==task_name)\ + .order_by(desc(sql_model.TaskExecution.point_in_time))\ + .limit(1).subquery() + stmt = insert(sql_model.TaskExecution).values( + task_name=task_name, + point_in_time=pit, + delta=subquery_delta, + active=True) session.execute(stmt) diff --git a/backend/bloom/tasks/load_spire_data_from_api.py b/backend/bloom/tasks/load_spire_data_from_api.py index 4a6ee28c..6948f9bf 100644 --- a/backend/bloom/tasks/load_spire_data_from_api.py +++ b/backend/bloom/tasks/load_spire_data_from_api.py @@ -9,6 +9,7 @@ from bloom.infra.http.spire_api_utils import map_raw_vessels_to_domain from bloom.logger import logger from pydantic import ValidationError +from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository def run(dump_path: str) -> None: @@ -24,9 +25,10 @@ def run(dump_path: str) -> None: vessels: list[Vessel] = vessel_repository.get_vessels_list(session) if len(vessels) > 0: raw_vessels = spire_traffic_usecase.get_raw_vessels_from_spire(vessels) + current_datetime=datetime.now(timezone.utc) if dump_path is not None: try: - now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + now =current_datetime.strftime("%Y-%m-%dT%H:%M:%S") dump_file = Path(args.dump_path, f"spire_{now}").with_suffix(".json") with dump_file.open("wt") as handle: json.dump(raw_vessels, handle) @@ -38,6 +40,9 @@ def run(dump_path: str) -> None: spire_ais_data, session, ) + TaskExecutionRepository.set_point_in_time(session, + "load_spire_data_from_api", + current_datetime) session.commit() except ValidationError as e: logger.error("Erreur de validation des données JSON")