Skip to content

Commit

Permalink
Impact de l'algo de création/maj et de suppression logique des bateau…
Browse files Browse the repository at this point in the history
…x décrit dans l'issue #140
  • Loading branch information
njouanin committed Mar 30, 2024
1 parent f1e1228 commit 9510619
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 17 deletions.
38 changes: 32 additions & 6 deletions src/bloom/infra/repositories/repository_vessel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dependency_injector.providers import Callable
from shapely import Point, wkb
from sqlalchemy.orm import Session
from sqlalchemy import select
from sqlalchemy import select, update, func


class VesselRepository:
Expand All @@ -20,6 +20,9 @@ def __init__(
) -> Callable[..., AbstractContextManager]:
self.session_factory = session_factory

def get_vessel_by_id(self, session: Session, vessel_id: int) -> Union[Vessel | None]:
return session.get(sql_model.Vessel, vessel_id)

def get_vessels_list(self, session: Session) -> list[Vessel]:
"""
Liste l'ensemble des vessels actifs
Expand All @@ -30,11 +33,6 @@ def get_vessels_list(self, session: Session) -> list[Vessel]:
return []
return [VesselRepository.map_to_domain(vessel) for vessel in e]

def batch_create_vessel(self, session: Session, ports: list[Vessel]) -> list[Vessel]:
orm_list = [VesselRepository.map_to_sql(port) for port in ports]
session.add_all(orm_list)
return [VesselRepository.map_to_domain(orm) for orm in orm_list]

def get_all_vessels_list(self, session: Session) -> list[Vessel]:
"""
Liste l'ensemble des vessels actifs ou inactifs
Expand All @@ -46,6 +44,34 @@ def get_all_vessels_list(self, session: Session) -> list[Vessel]:
return []
return [VesselRepository.map_to_domain(vessel) for vessel in e]

def batch_create_vessel(self, session: Session, vessels: list[Vessel]) -> list[Vessel]:
orm_list = [VesselRepository.map_to_sql(port) for port in vessels]
session.add_all(orm_list)
return [VesselRepository.map_to_domain(orm) for orm in orm_list]

def batch_update_vessel(self, session: Session, vessels: list[Vessel]) -> None:
updates = [{"id": v.id, "mmsi": v.mmsi, "ship_name": v.ship_name, "width": v.width, "length": v.length,
"country_iso3": v.country_iso3, "type": v.type, "imo": v.imo, "cfr": v.cfr,
"registration_number": v.registration_number, "external_marking": v.external_marking,
"ircs": v.ircs, "tracking_activated": v.tracking_activated, "tracking_status": v.tracking_status,
"home_port_id": v.home_port_id} for v in
vessels]
session.execute(update(sql_model.Vessel), updates)

def set_tracking(self, session: Session, vessel_ids: list[int], tracking_activated: bool,
tracking_status: str) -> None:
updates = [{"id": id, "tracking_activated": tracking_activated, "tracking_status": tracking_status} for id in
vessel_ids]
session.execute(update(sql_model.Vessel), updates)

def check_mmsi_integrity(self, session: Session) -> list[(int, int)]:
# Recherche des valeurs distinctes de MMSI ayant un nombre de résultats actif > 1
stmt = select(sql_model.Vessel.mmsi, func.count(sql_model.Vessel.id).label("count")).group_by(
sql_model.Vessel.mmsi).having(
func.count(sql_model.Vessel.id) > 1).where(
sql_model.Vessel.tracking_activated == True)
return session.execute(stmt).all()

@staticmethod
def map_to_domain(sql_vessel: sql_model.Vessel) -> Vessel:
return Vessel(
Expand Down
57 changes: 46 additions & 11 deletions src/bloom/tasks/load_dim_vessel_from_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@
def map_to_domain(row: pd.Series) -> Vessel:
isna = row.isna()
return Vessel(
id=int(row["id"]) if not isna["id"] else None,
mmsi=int(row["mmsi"]) if not isna["mmsi"] else None,
ship_name=row["ship_name"],
width=None,
length=row["loa"],
width=int(row["width"]) if not isna["width"] else None,
length=int(row["length"]) if not isna["length"] else None,
country_iso3=row["country_iso3"] if not isna["country_iso3"] else "XXX",
type=row["type"],
imo=row["IMO"] if not isna["IMO"] else None,
imo=row["imo"] if not isna["imo"] else None,
cfr=row["cfr"] if not isna["cfr"] else None,
registration_number=row["registration_number"]
if not isna["registration_number"]
else None,
external_marking=row["external_marking"] if not isna["external_marking"] else None,
ircs=row["ircs"] if not isna["ircs"] else None,
mt_activated=row["mt_activated"].strip(),
tracking_activated=row["tracking_activated"],
tracking_status=row["tracking_status"] if not isna["tracking_status"] else None,
)


Expand All @@ -35,21 +37,54 @@ def run(csv_file_name: str) -> None:
vessel_repository = use_cases.vessel_repository()
db = use_cases.db()

ports = []
total = 0
inserted_ports = []
deleted_ports = []
try:
df = pd.read_csv(csv_file_name, sep=";")
vessels = df.apply(map_to_domain, axis=1)
with db.session() as session:
ports = vessel_repository.batch_create_vessel(session, list(vessels))
session.commit()
total = len(ports)
ports_inserts = []
ports_updates = []
# Pour chaque enregistrement du fichier CSV
for vessel in vessels:
if vessel.id and vessel_repository.get_vessel_by_id(session, vessel.id):
# si la valeur du champ id n'est pas vide:
# rechercher l'enregistrement correspondant dans la table dim_vessel
# mettre à jour l'enregistrement à partir des données CSV.
ports_updates.append(vessel)
else:
# sinon:
# insérer les données CSV dans la table dim_vessel;
ports_inserts.append(vessel)
# Insertions / MAJ en batch
inserted_ports = vessel_repository.batch_create_vessel(session, ports_inserts)
vessel_repository.batch_update_vessel(session, ports_updates)

# En fin de traitement:
# les enregistrements de la table dim_vessel pourtant un MMSI absent du fichier CSV sont mis à jour
# avec la valeur tracking_activated=FALSE
csv_mmsi = list(df['mmsi'])
deleted_ports = list(
filter(lambda v: v.mmsi not in csv_mmsi, vessel_repository.get_all_vessels_list(session)))
vessel_repository.set_tracking(session, [v.id for v in deleted_ports], False,
"Suppression logique suite import nouveau fichier CSV")
# le traitement vérifie qu'il n'existe qu'un seul enregistrement à l'état tracking_activated==True
# pour chaque valeur distincte de MMSI.
integrity_errors = vessel_repository.check_mmsi_integrity(session)
if not integrity_errors:
session.commit()
else:
logger.error(
f"Erreur d'intégrité fonctionnelle, plusieurs bateaux actifs avec le même MMSI: {integrity_errors}")
session.rollback()
except ValidationError as e:
logger.error("Erreur de validation des données de bateau")
logger.error(e.errors())
except DBException as e:
except DBException:
logger.error("Erreur d'insertion en base")
logger.info(f"{total} bateau(x) créés")
logger.info(f"{len(inserted_ports)} bateau(x) créés")
logger.info(f"{len(ports_updates)} bateau(x) mise à jour ou inchangés")
logger.info(f"{len(deleted_ports)} bateau(x) désactivés")


if __name__ == "__main__":
Expand Down

0 comments on commit 9510619

Please sign in to comment.