Skip to content

Commit

Permalink
Merge pull request #146 from dataforgoodfr/feature_maj_dim_vessels_is…
Browse files Browse the repository at this point in the history
…sue_140

Feature maj dim vessels issue 140
  • Loading branch information
njouanin authored Mar 30, 2024
2 parents a1372e4 + 9510619 commit c32a25d
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""alter_dim_vessel_gestion_maj_tracking
Revision ID: 4e912be8a176
Revises: 65a36ee545fe
Create Date: 2024-03-30 10:38:00.015020
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = '4e912be8a176'
down_revision = '65a36ee545fe'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.drop_constraint("dim_vessel_mmsi_key", "dim_vessel")
op.drop_column("dim_vessel", "mt_activated")
op.add_column("dim_vessel", sa.Column("tracking_activated", sa.Boolean, default=False))
op.add_column("dim_vessel", sa.Column("tracking_status", sa.String))
# A ce stade, on est certain que les MMSI sont uniques dans la table
op.execute("update dim_vessel set tracking_activated=true")
op.execute("update dim_vessel set tracking_activated=false, tracking_status='MMSI null' where mmsi is null")
op.alter_column("dim_vessel", "tracking_activated", nullable=False)


def downgrade() -> None:
op.drop_column("dim_vessel", "tracking_status")
op.drop_column("dim_vessel", "tracking_activated")
op.create_unique_constraint("dim_vessel_mmsi_key", "dim_vessel", ["mmsi"])
op.add_column("dim_vessel", sa.Column("mt_activated", sa.Boolean))
3 changes: 2 additions & 1 deletion src/bloom/domain/vessel.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ class Vessel(BaseModel):
registration_number: Union[str, None]
external_marking: Union[str, None]
ircs: Union[str, None]
mt_activated: bool
tracking_activated: Union[bool, None] = None
tracking_status: Union[str, None] = None
home_port_id: Union[int, None] = None
created_at: Union[datetime, None] = None
updated_at: Union[datetime, None] = None
5 changes: 3 additions & 2 deletions src/bloom/infra/database/sql_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
class Vessel(Base):
__tablename__ = "dim_vessel"
id = Column("id", Integer, primary_key=True)
mmsi = Column("mmsi", Integer, unique=True)
mmsi = Column("mmsi", Integer)
ship_name = Column("ship_name", String, nullable=False)
width = Column("width", Double)
length = Column("length", Double)
Expand All @@ -37,7 +37,8 @@ class Vessel(Base):
registration_number = Column("registration_number", String)
external_marking = Column("external_marking", String)
ircs = Column("ircs", String)
mt_activated = Column("mt_activated", Boolean, nullable=False)
tracking_activated = Column("tracking_activated", Boolean, nullable=False)
tracking_status = Column("tracking_status", String)
home_port_id = Column("home_port_id", Integer, ForeignKey("dim_port.id"))
created_at = Column(
"created_at", DateTime(timezone=True), nullable=False, server_default=func.now(),
Expand Down
59 changes: 46 additions & 13 deletions src/bloom/infra/repositories/repository_vessel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dependency_injector.providers import Callable
from shapely import Point, wkb
from sqlalchemy.orm import Session
from sqlalchemy import select
from sqlalchemy import select, update, func


class VesselRepository:
Expand All @@ -20,27 +20,58 @@ def __init__(
) -> Callable[..., AbstractContextManager]:
self.session_factory = session_factory

def load_vessel_metadata(self, session: Session) -> list[Vessel]:
stmt = select(sql_model.Vessel).where(sql_model.Vessel.mt_activated == True).where( # noqa: E712
sql_model.Vessel.mmsi.isnot(None))
def get_vessel_by_id(self, session: Session, vessel_id: int) -> Union[Vessel | None]:
return session.get(sql_model.Vessel, vessel_id)

def get_vessels_list(self, session: Session) -> list[Vessel]:
"""
Liste l'ensemble des vessels actifs
"""
stmt = select(sql_model.Vessel).where(sql_model.Vessel.tracking_activated == True)
e = session.execute(stmt).scalars()
if not e:
return []
return [VesselRepository.map_to_domain(vessel) for vessel in e]

def batch_create_vessel(self, session: Session, ports: list[Vessel]) -> list[Vessel]:
orm_list = [VesselRepository.map_to_sql(port) for port in ports]
session.add_all(orm_list)
return [VesselRepository.map_to_domain(orm) for orm in orm_list]

def load_all_vessel_metadata(self, session: Session) -> list[Vessel]:
stmt = select(sql_model.Vessel).where(sql_model.Vessel.mmsi.isnot(None))
def get_all_vessels_list(self, session: Session) -> list[Vessel]:
"""
Liste l'ensemble des vessels actifs ou inactifs
"""
stmt = select(sql_model.Vessel)
e = session.execute(stmt).scalars()

if not e:
return []
return [VesselRepository.map_to_domain(vessel) for vessel in e]

def batch_create_vessel(self, session: Session, vessels: list[Vessel]) -> list[Vessel]:
orm_list = [VesselRepository.map_to_sql(port) for port in vessels]
session.add_all(orm_list)
return [VesselRepository.map_to_domain(orm) for orm in orm_list]

def batch_update_vessel(self, session: Session, vessels: list[Vessel]) -> None:
updates = [{"id": v.id, "mmsi": v.mmsi, "ship_name": v.ship_name, "width": v.width, "length": v.length,
"country_iso3": v.country_iso3, "type": v.type, "imo": v.imo, "cfr": v.cfr,
"registration_number": v.registration_number, "external_marking": v.external_marking,
"ircs": v.ircs, "tracking_activated": v.tracking_activated, "tracking_status": v.tracking_status,
"home_port_id": v.home_port_id} for v in
vessels]
session.execute(update(sql_model.Vessel), updates)

def set_tracking(self, session: Session, vessel_ids: list[int], tracking_activated: bool,
tracking_status: str) -> None:
updates = [{"id": id, "tracking_activated": tracking_activated, "tracking_status": tracking_status} for id in
vessel_ids]
session.execute(update(sql_model.Vessel), updates)

def check_mmsi_integrity(self, session: Session) -> list[(int, int)]:
# Recherche des valeurs distinctes de MMSI ayant un nombre de résultats actif > 1
stmt = select(sql_model.Vessel.mmsi, func.count(sql_model.Vessel.id).label("count")).group_by(
sql_model.Vessel.mmsi).having(
func.count(sql_model.Vessel.id) > 1).where(
sql_model.Vessel.tracking_activated == True)
return session.execute(stmt).all()

@staticmethod
def map_to_domain(sql_vessel: sql_model.Vessel) -> Vessel:
return Vessel(
Expand All @@ -56,7 +87,8 @@ def map_to_domain(sql_vessel: sql_model.Vessel) -> Vessel:
registration_number=sql_vessel.registration_number,
external_marking=sql_vessel.external_marking,
ircs=sql_vessel.ircs,
mt_activated=sql_vessel.mt_activated,
tracking_activated=sql_vessel.tracking_activated,
tracking_status=sql_vessel.tracking_status,
home_port_id=sql_vessel.home_port_id,
created_at=sql_vessel.created_at,
updated_at=sql_vessel.updated_at,
Expand All @@ -77,7 +109,8 @@ def map_to_sql(vessel: Vessel) -> sql_model.Vessel:
registration_number=vessel.registration_number,
external_marking=vessel.external_marking,
ircs=vessel.ircs,
mt_activated=vessel.mt_activated,
tracking_activated=vessel.tracking_activated,
tracking_status=vessel.tracking_status,
home_port_id=vessel.home_port_id,
created_at=vessel.created_at,
updated_at=vessel.updated_at,
Expand Down
2 changes: 1 addition & 1 deletion src/bloom/tasks/clean_positions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def run():
nb_donnees = 0
nb_au_port = 0
nb_pas_au_port = 0
vessels = vessel_repository.load_vessel_metadata(session)
vessels = vessel_repository.get_vessels_list(session)
logger.info(f"{len(vessels)} bateaux à traiter")
# Foreach vessel
for vessel in vessels:
Expand Down
57 changes: 46 additions & 11 deletions src/bloom/tasks/load_dim_vessel_from_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@
def map_to_domain(row: pd.Series) -> Vessel:
isna = row.isna()
return Vessel(
id=int(row["id"]) if not isna["id"] else None,
mmsi=int(row["mmsi"]) if not isna["mmsi"] else None,
ship_name=row["ship_name"],
width=None,
length=row["loa"],
width=int(row["width"]) if not isna["width"] else None,
length=int(row["length"]) if not isna["length"] else None,
country_iso3=row["country_iso3"] if not isna["country_iso3"] else "XXX",
type=row["type"],
imo=row["IMO"] if not isna["IMO"] else None,
imo=row["imo"] if not isna["imo"] else None,
cfr=row["cfr"] if not isna["cfr"] else None,
registration_number=row["registration_number"]
if not isna["registration_number"]
else None,
external_marking=row["external_marking"] if not isna["external_marking"] else None,
ircs=row["ircs"] if not isna["ircs"] else None,
mt_activated=row["mt_activated"].strip(),
tracking_activated=row["tracking_activated"],
tracking_status=row["tracking_status"] if not isna["tracking_status"] else None,
)


Expand All @@ -35,21 +37,54 @@ def run(csv_file_name: str) -> None:
vessel_repository = use_cases.vessel_repository()
db = use_cases.db()

ports = []
total = 0
inserted_ports = []
deleted_ports = []
try:
df = pd.read_csv(csv_file_name, sep=";")
vessels = df.apply(map_to_domain, axis=1)
with db.session() as session:
ports = vessel_repository.batch_create_vessel(session, list(vessels))
session.commit()
total = len(ports)
ports_inserts = []
ports_updates = []
# Pour chaque enregistrement du fichier CSV
for vessel in vessels:
if vessel.id and vessel_repository.get_vessel_by_id(session, vessel.id):
# si la valeur du champ id n'est pas vide:
# rechercher l'enregistrement correspondant dans la table dim_vessel
# mettre à jour l'enregistrement à partir des données CSV.
ports_updates.append(vessel)
else:
# sinon:
# insérer les données CSV dans la table dim_vessel;
ports_inserts.append(vessel)
# Insertions / MAJ en batch
inserted_ports = vessel_repository.batch_create_vessel(session, ports_inserts)
vessel_repository.batch_update_vessel(session, ports_updates)

# En fin de traitement:
# les enregistrements de la table dim_vessel pourtant un MMSI absent du fichier CSV sont mis à jour
# avec la valeur tracking_activated=FALSE
csv_mmsi = list(df['mmsi'])
deleted_ports = list(
filter(lambda v: v.mmsi not in csv_mmsi, vessel_repository.get_all_vessels_list(session)))
vessel_repository.set_tracking(session, [v.id for v in deleted_ports], False,
"Suppression logique suite import nouveau fichier CSV")
# le traitement vérifie qu'il n'existe qu'un seul enregistrement à l'état tracking_activated==True
# pour chaque valeur distincte de MMSI.
integrity_errors = vessel_repository.check_mmsi_integrity(session)
if not integrity_errors:
session.commit()
else:
logger.error(
f"Erreur d'intégrité fonctionnelle, plusieurs bateaux actifs avec le même MMSI: {integrity_errors}")
session.rollback()
except ValidationError as e:
logger.error("Erreur de validation des données de bateau")
logger.error(e.errors())
except DBException as e:
except DBException:
logger.error("Erreur d'insertion en base")
logger.info(f"{total} bateau(x) créés")
logger.info(f"{len(inserted_ports)} bateau(x) créés")
logger.info(f"{len(ports_updates)} bateau(x) mise à jour ou inchangés")
logger.info(f"{len(deleted_ports)} bateau(x) désactivés")


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions src/bloom/tasks/load_spire_data_from_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def run(dump_path: str) -> None:
orm_data = []
try:
with db.session() as session:
vessels: list[Vessel] = vessel_repository.load_all_vessel_metadata(session)
vessels: list[Vessel] = vessel_repository.get_vessels_list(session)

raw_vessels = spire_traffic_usecase.get_raw_vessels_from_spire(vessels)
if dump_path is not None:
Expand All @@ -44,7 +44,7 @@ def run(dump_path: str) -> None:
logger.error(e.errors())
except Exception as e:
logger.error("Echec de l'appel API", exc_info=e)
logger.info(f"{len(orm_data)} vessel data loaded")
logger.info(f"{len(orm_data)} données chargées")


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion src/bloom/tasks/update_vessel_data_voyage.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def run() -> None:
nb_donnees = 0
nb_insert_data = 0
nb_insert_voyage = 0
vessels = vessel_repository.load_vessel_metadata(session)
vessels = vessel_repository.get_vessels_list(session)
logger.info(f"{len(vessels)} bateaux à traiter")
# Foreach vessel
for vessel in vessels:
Expand Down

0 comments on commit c32a25d

Please sign in to comment.