Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions backend/alembic/versions/d9d279892b5c_add_task_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""add task execution

Revision ID: d9d279892b5c
Revises: 7ba4634af5ad
Create Date: 2024-11-30 12:54:22.318425

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import func


# revision identifiers, used by Alembic.
revision = 'd9d279892b5c'
down_revision = '7ba4634af5ad'
branch_labels = None
depends_on = None


def upgrade() -> None:
"""
Create a new task_executions table that permit to store historical data
for each task execution
The goal is to be able to detect timeframe not covered by Spire API interrogation
"""
# drop constraint from existing table to free the constraint name
op.drop_constraint(table_name="tasks_executions",
constraint_name="tasks_executions_pkey")
# rename existing table to keep existing data
op.rename_table('tasks_executions','tasks_executions_tmp')
# create the new task_executions table with id and active columns in addition
op.create_table("tasks_executions",
sa.Column("id", sa.Integer(),sa.Identity(), primary_key=True, index=True),
sa.Column("task_name", sa.String),
sa.Column("point_in_time", sa.DateTime(timezone=True)),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=func.now(),
),
sa.Column("updated_at", sa.DateTime(timezone=True), onupdate=func.now()),
sa.Column("delta",
sa.Interval,
index=True,
nullable=True),
sa.Column("active",
sa.Boolean,
index=True,
default=False),
)
# copy of existing data to new table with active=True
op.execute("insert into tasks_executions "
+"(task_name,point_in_time,created_at,updated_at,active) "
+"select task_name,point_in_time,created_at,updated_at,true "
+"from tasks_executions_tmp")
# drop old table
op.drop_table('tasks_executions_tmp')

# Retrieve all historical spire api interrogation from spire_ais_data table
op.execute(
"""insert into public.tasks_executions (task_name,point_in_time,created_at,delta,active)
select distinct
'load_spire_data_from_api' as "task_name",
T1.created_at as "point_in_time",
T1.created_at,
T1.created_at-(select distinct created_at from spire_ais_data where created_at < T1.created_at group by created_at order by created_at desc limit 1) as "delta",
case when T1.created_at = (select MAX(created_at) from spire_ais_data) and not EXISTS(select 1 from public.tasks_executions where task_name = 'load_spire_data_from_api' and active = True) then true else false end as "active"
from spire_ais_data T1
where T1.created_at not in (select point_in_time from public.tasks_executions where task_name = 'load_spire_data_from_api')
group by T1.created_at
order by T1.created_at desc
""")
pass


def downgrade() -> None:
# delete all lines active=False as they have no equivalent in old task_executions
op.execute("delete from tasks_executions where active=False")
# drop active and id table
op.drop_column("tasks_executions","delta")
op.drop_column("tasks_executions","active")
op.drop_column("tasks_executions","id")
# recreate the primary unique key constraint of old table
op.create_unique_constraint("tasks_executions_pkey",
"tasks_executions",
["task_name"],
)
pass
8 changes: 6 additions & 2 deletions backend/bloom/infra/database/sql_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
Integer,
Interval,
String,
PrimaryKeyConstraint
PrimaryKeyConstraint,
Identity
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.sql import func, select
Expand Down Expand Up @@ -224,10 +225,13 @@ class Segment(Base):

class TaskExecution(Base):
__tablename__ = "tasks_executions"
task_name = Column("task_name", String, primary_key=True)
id = Column("id", Integer, Identity(), primary_key=True)
task_name = Column("task_name", String)
point_in_time = Column("point_in_time", DateTime(timezone=True))
created_at = Column("created_at", DateTime(timezone=True), server_default=func.now())
updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now())
delta = Column("delta", Interval, nullable=False)
active = Column("active", Boolean, nullable=False)


class RelSegmentZone(Base):
Expand Down
25 changes: 21 additions & 4 deletions backend/bloom/infra/repositories/repository_task_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,37 @@
from bloom.infra.database import sql_model
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.sql.expression import update,asc,desc
from sqlalchemy.orm import Session


class TaskExecutionRepository:
@staticmethod
def get_point_in_time(session: Session, task_name: str) -> datetime:
stmt = select(sql_model.TaskExecution).where(sql_model.TaskExecution.task_name == task_name)
e = session.execute(stmt).scalar()
stmt = select(sql_model.TaskExecution)\
.where(sql_model.TaskExecution.task_name == task_name)\
.where(sql_model.TaskExecution.active == True)
e = session.execute(stmt).scalar_one_or_none()
if not e:
return datetime.fromtimestamp(0, timezone.utc)
else:
return e.point_in_time

def set_point_in_time(session: Session, task_name: str, pit: datetime) -> None:
stmt = insert(sql_model.TaskExecution).values(task_name=task_name, point_in_time=pit).on_conflict_do_update(
index_elements=["task_name"], set_=dict(point_in_time=pit, updated_at=datetime.now(timezone.utc)))
stmt= ( update(sql_model.TaskExecution)
.where(sql_model.TaskExecution.task_name==task_name)
.where(sql_model.TaskExecution.active==True)
.values(active=False)
)
session.execute(stmt)
subquery_delta=select(pit-sql_model.TaskExecution.point_in_time)\
.select_from(sql_model.TaskExecution)\
.where(sql_model.TaskExecution.task_name==task_name)\
.order_by(desc(sql_model.TaskExecution.point_in_time))\
.limit(1).subquery()
stmt = insert(sql_model.TaskExecution).values(
task_name=task_name,
point_in_time=pit,
delta=subquery_delta,
active=True)
session.execute(stmt)
7 changes: 6 additions & 1 deletion backend/bloom/tasks/load_spire_data_from_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from bloom.infra.http.spire_api_utils import map_raw_vessels_to_domain
from bloom.logger import logger
from pydantic import ValidationError
from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository


def run(dump_path: str) -> None:
Expand All @@ -24,9 +25,10 @@ def run(dump_path: str) -> None:
vessels: list[Vessel] = vessel_repository.get_vessels_list(session)
if len(vessels) > 0:
raw_vessels = spire_traffic_usecase.get_raw_vessels_from_spire(vessels)
current_datetime=datetime.now(timezone.utc)
if dump_path is not None:
try:
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
now =current_datetime.strftime("%Y-%m-%dT%H:%M:%S")
dump_file = Path(args.dump_path, f"spire_{now}").with_suffix(".json")
with dump_file.open("wt") as handle:
json.dump(raw_vessels, handle)
Expand All @@ -38,6 +40,9 @@ def run(dump_path: str) -> None:
spire_ais_data,
session,
)
TaskExecutionRepository.set_point_in_time(session,
"load_spire_data_from_api",
current_datetime)
session.commit()
except ValidationError as e:
logger.error("Erreur de validation des données JSON")
Expand Down