diff --git a/src/alembic/versions/4e912be8a176_alter_dim_vessel_gestion_maj_tracking.py b/src/alembic/versions/4e912be8a176_alter_dim_vessel_gestion_maj_tracking.py new file mode 100644 index 00000000..d2eff4a0 --- /dev/null +++ b/src/alembic/versions/4e912be8a176_alter_dim_vessel_gestion_maj_tracking.py @@ -0,0 +1,33 @@ +"""alter_dim_vessel_gestion_maj_tracking + +Revision ID: 4e912be8a176 +Revises: 65a36ee545fe +Create Date: 2024-03-30 10:38:00.015020 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '4e912be8a176' +down_revision = '65a36ee545fe' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.drop_constraint("dim_vessel_mmsi_key", "dim_vessel") + op.drop_column("dim_vessel", "mt_activated") + op.add_column("dim_vessel", sa.Column("tracking_activated", sa.Boolean, default=False)) + op.add_column("dim_vessel", sa.Column("tracking_status", sa.String)) + # A ce stade, on est certain que les MMSI sont uniques dans la table + op.execute("update dim_vessel set tracking_activated=true") + op.execute("update dim_vessel set tracking_activated=false, tracking_status='MMSI null' where mmsi is null") + op.alter_column("dim_vessel", "tracking_activated", nullable=False) + + +def downgrade() -> None: + op.drop_column("dim_vessel", "tracking_status") + op.drop_column("dim_vessel", "tracking_activated") + op.create_unique_constraint("dim_vessel_mmsi_key", "dim_vessel", ["mmsi"]) + op.add_column("dim_vessel", sa.Column("mt_activated", sa.Boolean)) diff --git a/src/bloom/domain/vessel.py b/src/bloom/domain/vessel.py index 88d35005..dada95fc 100644 --- a/src/bloom/domain/vessel.py +++ b/src/bloom/domain/vessel.py @@ -17,7 +17,8 @@ class Vessel(BaseModel): registration_number: Union[str, None] external_marking: Union[str, None] ircs: Union[str, None] - mt_activated: bool + tracking_activated: Union[bool, None] = None + tracking_status: Union[str, None] = None home_port_id: Union[int, None] = None created_at: Union[datetime, None] = None updated_at: Union[datetime, None] = None diff --git a/src/bloom/infra/database/sql_model.py b/src/bloom/infra/database/sql_model.py index e33df899..040fdfdb 100644 --- a/src/bloom/infra/database/sql_model.py +++ b/src/bloom/infra/database/sql_model.py @@ -26,7 +26,7 @@ class Vessel(Base): __tablename__ = "dim_vessel" id = Column("id", Integer, primary_key=True) - mmsi = Column("mmsi", Integer, unique=True) + mmsi = Column("mmsi", Integer) ship_name = Column("ship_name", String, nullable=False) width = Column("width", Double) length = Column("length", Double) @@ -37,7 +37,8 @@ class Vessel(Base): registration_number = Column("registration_number", String) external_marking = Column("external_marking", String) ircs = Column("ircs", String) - mt_activated = Column("mt_activated", Boolean, nullable=False) + tracking_activated = Column("tracking_activated", Boolean, nullable=False) + tracking_status = Column("tracking_status", String) home_port_id = Column("home_port_id", Integer, ForeignKey("dim_port.id")) created_at = Column( "created_at", DateTime(timezone=True), nullable=False, server_default=func.now(), diff --git a/src/bloom/infra/repositories/repository_vessel.py b/src/bloom/infra/repositories/repository_vessel.py index 96ca9928..bbfbda59 100644 --- a/src/bloom/infra/repositories/repository_vessel.py +++ b/src/bloom/infra/repositories/repository_vessel.py @@ -10,7 +10,7 @@ from dependency_injector.providers import Callable from shapely import Point, wkb from sqlalchemy.orm import Session -from sqlalchemy import select +from sqlalchemy import select, update, func class VesselRepository: @@ -20,27 +20,58 @@ def __init__( ) -> Callable[..., AbstractContextManager]: self.session_factory = session_factory - def load_vessel_metadata(self, session: Session) -> list[Vessel]: - stmt = select(sql_model.Vessel).where(sql_model.Vessel.mt_activated == True).where( # noqa: E712 - sql_model.Vessel.mmsi.isnot(None)) + def get_vessel_by_id(self, session: Session, vessel_id: int) -> Union[Vessel | None]: + return session.get(sql_model.Vessel, vessel_id) + + def get_vessels_list(self, session: Session) -> list[Vessel]: + """ + Liste l'ensemble des vessels actifs + """ + stmt = select(sql_model.Vessel).where(sql_model.Vessel.tracking_activated == True) e = session.execute(stmt).scalars() if not e: return [] return [VesselRepository.map_to_domain(vessel) for vessel in e] - def batch_create_vessel(self, session: Session, ports: list[Vessel]) -> list[Vessel]: - orm_list = [VesselRepository.map_to_sql(port) for port in ports] - session.add_all(orm_list) - return [VesselRepository.map_to_domain(orm) for orm in orm_list] - - def load_all_vessel_metadata(self, session: Session) -> list[Vessel]: - stmt = select(sql_model.Vessel).where(sql_model.Vessel.mmsi.isnot(None)) + def get_all_vessels_list(self, session: Session) -> list[Vessel]: + """ + Liste l'ensemble des vessels actifs ou inactifs + """ + stmt = select(sql_model.Vessel) e = session.execute(stmt).scalars() if not e: return [] return [VesselRepository.map_to_domain(vessel) for vessel in e] + def batch_create_vessel(self, session: Session, vessels: list[Vessel]) -> list[Vessel]: + orm_list = [VesselRepository.map_to_sql(port) for port in vessels] + session.add_all(orm_list) + return [VesselRepository.map_to_domain(orm) for orm in orm_list] + + def batch_update_vessel(self, session: Session, vessels: list[Vessel]) -> None: + updates = [{"id": v.id, "mmsi": v.mmsi, "ship_name": v.ship_name, "width": v.width, "length": v.length, + "country_iso3": v.country_iso3, "type": v.type, "imo": v.imo, "cfr": v.cfr, + "registration_number": v.registration_number, "external_marking": v.external_marking, + "ircs": v.ircs, "tracking_activated": v.tracking_activated, "tracking_status": v.tracking_status, + "home_port_id": v.home_port_id} for v in + vessels] + session.execute(update(sql_model.Vessel), updates) + + def set_tracking(self, session: Session, vessel_ids: list[int], tracking_activated: bool, + tracking_status: str) -> None: + updates = [{"id": id, "tracking_activated": tracking_activated, "tracking_status": tracking_status} for id in + vessel_ids] + session.execute(update(sql_model.Vessel), updates) + + def check_mmsi_integrity(self, session: Session) -> list[(int, int)]: + # Recherche des valeurs distinctes de MMSI ayant un nombre de résultats actif > 1 + stmt = select(sql_model.Vessel.mmsi, func.count(sql_model.Vessel.id).label("count")).group_by( + sql_model.Vessel.mmsi).having( + func.count(sql_model.Vessel.id) > 1).where( + sql_model.Vessel.tracking_activated == True) + return session.execute(stmt).all() + @staticmethod def map_to_domain(sql_vessel: sql_model.Vessel) -> Vessel: return Vessel( @@ -56,7 +87,8 @@ def map_to_domain(sql_vessel: sql_model.Vessel) -> Vessel: registration_number=sql_vessel.registration_number, external_marking=sql_vessel.external_marking, ircs=sql_vessel.ircs, - mt_activated=sql_vessel.mt_activated, + tracking_activated=sql_vessel.tracking_activated, + tracking_status=sql_vessel.tracking_status, home_port_id=sql_vessel.home_port_id, created_at=sql_vessel.created_at, updated_at=sql_vessel.updated_at, @@ -77,7 +109,8 @@ def map_to_sql(vessel: Vessel) -> sql_model.Vessel: registration_number=vessel.registration_number, external_marking=vessel.external_marking, ircs=vessel.ircs, - mt_activated=vessel.mt_activated, + tracking_activated=vessel.tracking_activated, + tracking_status=vessel.tracking_status, home_port_id=vessel.home_port_id, created_at=vessel.created_at, updated_at=vessel.updated_at, diff --git a/src/bloom/tasks/clean_positions.py b/src/bloom/tasks/clean_positions.py index 1886ad6e..8e59f086 100644 --- a/src/bloom/tasks/clean_positions.py +++ b/src/bloom/tasks/clean_positions.py @@ -43,7 +43,7 @@ def run(): nb_donnees = 0 nb_au_port = 0 nb_pas_au_port = 0 - vessels = vessel_repository.load_vessel_metadata(session) + vessels = vessel_repository.get_vessels_list(session) logger.info(f"{len(vessels)} bateaux à traiter") # Foreach vessel for vessel in vessels: diff --git a/src/bloom/tasks/load_dim_vessel_from_csv.py b/src/bloom/tasks/load_dim_vessel_from_csv.py index 891f7b87..52e4686c 100644 --- a/src/bloom/tasks/load_dim_vessel_from_csv.py +++ b/src/bloom/tasks/load_dim_vessel_from_csv.py @@ -13,20 +13,22 @@ def map_to_domain(row: pd.Series) -> Vessel: isna = row.isna() return Vessel( + id=int(row["id"]) if not isna["id"] else None, mmsi=int(row["mmsi"]) if not isna["mmsi"] else None, ship_name=row["ship_name"], - width=None, - length=row["loa"], + width=int(row["width"]) if not isna["width"] else None, + length=int(row["length"]) if not isna["length"] else None, country_iso3=row["country_iso3"] if not isna["country_iso3"] else "XXX", type=row["type"], - imo=row["IMO"] if not isna["IMO"] else None, + imo=row["imo"] if not isna["imo"] else None, cfr=row["cfr"] if not isna["cfr"] else None, registration_number=row["registration_number"] if not isna["registration_number"] else None, external_marking=row["external_marking"] if not isna["external_marking"] else None, ircs=row["ircs"] if not isna["ircs"] else None, - mt_activated=row["mt_activated"].strip(), + tracking_activated=row["tracking_activated"], + tracking_status=row["tracking_status"] if not isna["tracking_status"] else None, ) @@ -35,21 +37,54 @@ def run(csv_file_name: str) -> None: vessel_repository = use_cases.vessel_repository() db = use_cases.db() - ports = [] - total = 0 + inserted_ports = [] + deleted_ports = [] try: df = pd.read_csv(csv_file_name, sep=";") vessels = df.apply(map_to_domain, axis=1) with db.session() as session: - ports = vessel_repository.batch_create_vessel(session, list(vessels)) - session.commit() - total = len(ports) + ports_inserts = [] + ports_updates = [] + # Pour chaque enregistrement du fichier CSV + for vessel in vessels: + if vessel.id and vessel_repository.get_vessel_by_id(session, vessel.id): + # si la valeur du champ id n'est pas vide: + # rechercher l'enregistrement correspondant dans la table dim_vessel + # mettre à jour l'enregistrement à partir des données CSV. + ports_updates.append(vessel) + else: + # sinon: + # insérer les données CSV dans la table dim_vessel; + ports_inserts.append(vessel) + # Insertions / MAJ en batch + inserted_ports = vessel_repository.batch_create_vessel(session, ports_inserts) + vessel_repository.batch_update_vessel(session, ports_updates) + + # En fin de traitement: + # les enregistrements de la table dim_vessel pourtant un MMSI absent du fichier CSV sont mis à jour + # avec la valeur tracking_activated=FALSE + csv_mmsi = list(df['mmsi']) + deleted_ports = list( + filter(lambda v: v.mmsi not in csv_mmsi, vessel_repository.get_all_vessels_list(session))) + vessel_repository.set_tracking(session, [v.id for v in deleted_ports], False, + "Suppression logique suite import nouveau fichier CSV") + # le traitement vérifie qu'il n'existe qu'un seul enregistrement à l'état tracking_activated==True + # pour chaque valeur distincte de MMSI. + integrity_errors = vessel_repository.check_mmsi_integrity(session) + if not integrity_errors: + session.commit() + else: + logger.error( + f"Erreur d'intégrité fonctionnelle, plusieurs bateaux actifs avec le même MMSI: {integrity_errors}") + session.rollback() except ValidationError as e: logger.error("Erreur de validation des données de bateau") logger.error(e.errors()) - except DBException as e: + except DBException: logger.error("Erreur d'insertion en base") - logger.info(f"{total} bateau(x) créés") + logger.info(f"{len(inserted_ports)} bateau(x) créés") + logger.info(f"{len(ports_updates)} bateau(x) mise à jour ou inchangés") + logger.info(f"{len(deleted_ports)} bateau(x) désactivés") if __name__ == "__main__": diff --git a/src/bloom/tasks/load_spire_data_from_api.py b/src/bloom/tasks/load_spire_data_from_api.py index 807b4f98..29d9b1e5 100644 --- a/src/bloom/tasks/load_spire_data_from_api.py +++ b/src/bloom/tasks/load_spire_data_from_api.py @@ -21,7 +21,7 @@ def run(dump_path: str) -> None: orm_data = [] try: with db.session() as session: - vessels: list[Vessel] = vessel_repository.load_all_vessel_metadata(session) + vessels: list[Vessel] = vessel_repository.get_vessels_list(session) raw_vessels = spire_traffic_usecase.get_raw_vessels_from_spire(vessels) if dump_path is not None: @@ -44,7 +44,7 @@ def run(dump_path: str) -> None: logger.error(e.errors()) except Exception as e: logger.error("Echec de l'appel API", exc_info=e) - logger.info(f"{len(orm_data)} vessel data loaded") + logger.info(f"{len(orm_data)} données chargées") if __name__ == "__main__": diff --git a/src/bloom/tasks/update_vessel_data_voyage.py b/src/bloom/tasks/update_vessel_data_voyage.py index bf94dfe7..c8dd4290 100644 --- a/src/bloom/tasks/update_vessel_data_voyage.py +++ b/src/bloom/tasks/update_vessel_data_voyage.py @@ -52,7 +52,7 @@ def run() -> None: nb_donnees = 0 nb_insert_data = 0 nb_insert_voyage = 0 - vessels = vessel_repository.load_vessel_metadata(session) + vessels = vessel_repository.get_vessels_list(session) logger.info(f"{len(vessels)} bateaux à traiter") # Foreach vessel for vessel in vessels: