Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

corrections 1 position par mmsi & pas de last_segment en base #400

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 153 additions & 104 deletions backend/bloom/tasks/create_update_excursions_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,64 +124,111 @@ def run():
for vessel_id in batch["vessel_id"].unique():
df_end = batch.loc[batch["vessel_id"] == vessel_id].copy()
df_end.rename(columns={"timestamp": "timestamp_end",
"heading": "heading_at_end",
"speed": "speed_at_end",
"longitude": "end_longitude",
"latitude": "end_latitude"
}, inplace=True)
"heading": "heading_at_end",
"speed": "speed_at_end",
"longitude": "end_longitude",
"latitude": "end_latitude"
}, inplace=True)
df_end.sort_values("timestamp_end", inplace=True)
df_end.reset_index(drop=True, inplace=True)
# get every end entry but the last one ; each one of them will be the start point of a segment
if len(df_end)>1:
df_start = df_end.iloc[0:-1, :].copy()
else:
df_start=df_end.copy()
for col in df_start.columns:
df_start.rename(columns={col: col.replace("end", "start")}, inplace=True)
vessel_last_segment = pd.DataFrame()
if last_segment.shape[0] > 0:
vessel_last_segment = last_segment.loc[
last_segment["vessel_id"] == vessel_id, ["mmsi", "timestamp_end", "heading_at_end", "speed_at_end",
"end_position", "excursion_id", "arrival_port_id"]]
vessel_last_segment["start_latitude"] = vessel_last_segment["end_position"].apply(lambda x: x.y)
vessel_last_segment["start_longitude"] = vessel_last_segment["end_position"].apply(lambda x: x.x)
vessel_last_segment.drop("end_position", inplace=True, axis=1)
vessel_last_segment.rename(columns={"timestamp_end": "timestamp_start",
"heading_at_end": "heading_at_start",
"speed_at_end": "speed_at_start",
}, inplace=True)

if vessel_last_segment.shape[0] > 0:
# if there's a last segment for this vessel, then it's not the first time a position for this vessel is received
is_new_vessel = False
# and it becomes the start point of the first new segment
df_start.loc[-1] = vessel_last_segment.iloc[0]
df_start.sort_index(inplace=True)
df_start.reset_index(drop=True, inplace=True)
# checks if the excursion of the last segment is closed or not
if vessel_last_segment["arrival_port_id"].iloc[0] >= 0:
for col in df_start.columns:
df_start.rename(columns={col: col.replace("end", "start")}, inplace=True)
vessel_last_segment = pd.DataFrame()
if last_segment.shape[0] > 0:
vessel_last_segment = last_segment.loc[
last_segment["vessel_id"] == vessel_id, ["mmsi", "timestamp_end", "heading_at_end", "speed_at_end",
"end_position", "excursion_id", "arrival_port_id"]]
vessel_last_segment["start_latitude"] = vessel_last_segment["end_position"].apply(lambda x: x.y)
vessel_last_segment["start_longitude"] = vessel_last_segment["end_position"].apply(lambda x: x.x)
vessel_last_segment.drop("end_position", inplace=True, axis=1)
vessel_last_segment.rename(columns={"timestamp_end": "timestamp_start",
"heading_at_end": "heading_at_start",
"speed_at_end": "speed_at_start",
}, inplace=True)
if vessel_last_segment.shape[0] > 0:
# if there's a last segment for this vessel, then it's not the first time a position for this vessel is received
is_new_vessel = False
# and it becomes the start point of the first new segment
df_start.loc[-1] = vessel_last_segment.iloc[0]
df_start.sort_index(inplace=True)
df_start.reset_index(drop=True, inplace=True)
# checks if the excursion of the last segment is closed or not
if vessel_last_segment["arrival_port_id"].iloc[0] >= 0:
open_ongoing_excursion = False
else:
open_ongoing_excursion = True
ongoing_excursion_id = int(vessel_last_segment["excursion_id"].iloc[0])

else:
# if there's no last segment for this vessel, then it's the first time a position for this vessel is received
is_new_vessel = True
# so we duplicate the starting point to have the first segment while setting it up 1s behind in time (so timestamp_start != timestamp_end)
df_start.loc[-1] = df_start.loc[0]
df_start["timestamp_start"].iloc[-1] += timedelta(0, -1)
df_start.sort_index(inplace=True)
df_start.reset_index(drop=True, inplace=True)
open_ongoing_excursion = False
# concat start and end point together
df = pd.concat([df_start, df_end], axis=1)
# removing segment with same timestamp_start and timestamp_end (no update)
df = df[df["timestamp_start"] != df["timestamp_end"]].copy()
# reseting index
df.reset_index(inplace=True, drop=True)
if (df.shape[0] > 0):
# calculate distance
def get_distance_in_miles(x) -> float:
p1 = (x.start_latitude, x.start_longitude)
p2 = (x.end_latitude, x.end_longitude)
return distance.distance(p1, p2).miles

df["distance"] = df.apply(get_distance_in_miles, axis=1)
else:
#df_start=df_end.copy()
vessel_last_segment = pd.DataFrame()
if last_segment.shape[0] > 0:
vessel_last_segment = last_segment.loc[
last_segment["vessel_id"] == vessel_id, ["mmsi", "timestamp_end", "heading_at_end", "speed_at_end",
"end_position", "excursion_id", "arrival_port_id"]]
vessel_last_segment["start_latitude"] = vessel_last_segment["end_position"].apply(lambda x: x.y)
vessel_last_segment["start_longitude"] = vessel_last_segment["end_position"].apply(lambda x: x.x)
vessel_last_segment.drop("end_position", inplace=True, axis=1)
vessel_last_segment.rename(columns={"timestamp_end": "timestamp_start",
"heading_at_end": "heading_at_start",
"speed_at_end": "speed_at_start",
}, inplace=True)
if vessel_last_segment.shape[0] > 0:
# if there's a last segment for this vessel, then it's not the first time a position for this vessel is received
is_new_vessel = False
df_start= vessel_last_segment
df_start.reset_index(drop=True, inplace=True)
# checks if the excursion of the last segment is closed or not
if vessel_last_segment["arrival_port_id"].iloc[0] >= 0:
open_ongoing_excursion = False
else:
open_ongoing_excursion = True
ongoing_excursion_id = int(vessel_last_segment["excursion_id"].iloc[0])

else:
open_ongoing_excursion = True
ongoing_excursion_id = int(vessel_last_segment["excursion_id"].iloc[0])
# if there's no last segment for this vessel, then it's the first time a position for this vessel is received
is_new_vessel = True
# so we duplicate the starting point to have the first segment while setting it up 1s behind in time (so timestamp_start != timestamp_end)
df_start = df_end.copy()
for col in df_start.columns:
df_start.rename(columns={col: col.replace("end", "start")}, inplace=True)
df_start["timestamp_start"] += timedelta(0, -1)
df_start.sort_index(inplace=True)
df_start.reset_index(drop=True, inplace=True)
open_ongoing_excursion = False
# concat start and end point together
df = pd.concat([df_start, df_end], axis=1)
# removing segment with same timestamp_start and timestamp_end (no update)
df = df[df["timestamp_start"] != df["timestamp_end"]].copy()
# reseting index
df.reset_index(inplace=True, drop=True)

else:
# if there's no last segment for this vessel, then it's the first time a position for this vessel is received
is_new_vessel = True
# so we duplicate the starting point to have the first segment while setting it up 1s behind in time (so timestamp_start != timestamp_end)
df_start.loc[-1] = df_start.loc[0]
df_start["timestamp_start"].iloc[-1] += timedelta(0, -1)
df_start.sort_index(inplace=True)
df_start.reset_index(drop=True, inplace=True)
open_ongoing_excursion = False

# concat start and end point together
df = pd.concat([df_start, df_end], axis=1)

# removing segment with same timestamp_start and timestamp_end (no update)
df = df[df["timestamp_start"] != df["timestamp_end"]].copy()
# reseting index
df.reset_index(inplace=True, drop=True)
if (df.shape[0] > 0):
# calculate distance
def get_distance_in_miles(x) -> float:
Expand All @@ -208,66 +255,68 @@ def get_duration(x) -> float:

# set last_vessel_segment
df["last_vessel_segment"] = 0
df["last_vessel_segment"].iloc[-1] = 1
if len(df) >1 :
df["last_vessel_segment"].iloc[-1] = 1
else :
df["last_vessel_segment"] = 1

# check if segment ends in a port (only for segment with average_speed < maximal_speed_to_check_if_in_port or with type 'DEFAULT_AIS')
def get_port(x, session):
if x.type == 'DEFAULT_AIS' or x.average_speed < maximal_speed_to_check_if_in_port:
res = port_repository.get_closest_port_in_range(session, x.end_longitude, x.end_latitude,
threshold_distance_to_port)
if res:
(port_id, distance) = res
return port_id
else:
return None
def get_port(x, session):
if x.type == 'DEFAULT_AIS' or x.average_speed < maximal_speed_to_check_if_in_port:
res = port_repository.get_closest_port_in_range(session, x.end_longitude, x.end_latitude,
threshold_distance_to_port)
if res:
(port_id, distance) = res
return port_id
else:
return None

df["port"] = df.apply(get_port, axis=1, args=(session,))

# get or create new excursion
# logic :
# if segment ends in a port while ongoing excursion is open, then we close the excursion
# else, if the ongoing excursion is open, then we use the ongoing excursion_id for the segment
# else, we create a new excursion whose id will become the ongoing excursion_id for this segment and the future ones
# additionnaly, when we create a new excursion, if the vessel is 'new' then we create an 'empty' excursion
# else, if the first segment of this new excursion is of type 'DEFAULT_AIS', we estimate the time of departure based
# on its ending position, distance traveled and a given average exit speed
df["excursion_id"] = np.NaN
for a in df.index:
if df["port"].iloc[a] is not None and df["port"].iloc[a] >= 0:
if (open_ongoing_excursion):
close_excursion(session, ongoing_excursion_id, int(df["port"].iloc[a]),
df["end_latitude"].iloc[a],
df["end_longitude"].iloc[a],
df["timestamp_end"].iloc[a]) # put the close excursion function here
df["excursion_id"].iloc[a] = ongoing_excursion_id
open_ongoing_excursion = False
nb_closed_excursion += 1
elif open_ongoing_excursion:
else:
return None
df["port"] = df.apply(get_port, axis=1, args=(session,))

# get or create new excursion
# logic :
# if segment ends in a port while ongoing excursion is open, then we close the excursion
# else, if the ongoing excursion is open, then we use the ongoing excursion_id for the segment
# else, we create a new excursion whose id will become the ongoing excursion_id for this segment and the future ones
# additionnaly, when we create a new excursion, if the vessel is 'new' then we create an 'empty' excursion
# else, if the first segment of this new excursion is of type 'DEFAULT_AIS', we estimate the time of departure based
# on its ending position, distance traveled and a given average exit speed
df["excursion_id"] = np.NaN
for a in df.index:
if df["port"].iloc[a] is not None and df["port"].iloc[a] >= 0:
if (open_ongoing_excursion):
close_excursion(session, ongoing_excursion_id, int(df["port"].iloc[a]),
df["end_latitude"].iloc[a],
df["end_longitude"].iloc[a],
df["timestamp_end"].iloc[a]) # put the close excursion function here
df["excursion_id"].iloc[a] = ongoing_excursion_id
open_ongoing_excursion = False
nb_closed_excursion += 1
elif open_ongoing_excursion:
df["excursion_id"].iloc[a] = ongoing_excursion_id
else:
if is_new_vessel:
ongoing_excursion_id = add_excursion(session, int(vessel_id),
df["timestamp_end"].iloc[a],
Point(df["end_longitude"].iloc[a],
df["end_latitude"].iloc[
a]))
is_new_vessel = False
nb_created_excursion += 1
else:
if is_new_vessel:
ongoing_excursion_id = add_excursion(session, int(vessel_id),
df["timestamp_end"].iloc[a],
Point(df["end_longitude"].iloc[a],
df["end_latitude"].iloc[
a]))
is_new_vessel = False
nb_created_excursion += 1
else:
def get_time_of_departure():
if (df['type'].iloc[a] == 'DEFAULT_AIS'):
return df['timestamp_end'].iloc[a] - timedelta(0, 3600 * df['distance'].iloc[
a] / average_exit_speed)
else:
return df["timestamp_start"].iloc[a]

ongoing_excursion_id = add_excursion(session, int(vessel_id),
get_time_of_departure()) # put the create new excursion function here
nb_created_excursion += 1
open_ongoing_excursion = True
df["excursion_id"].iloc[a] = ongoing_excursion_id
def get_time_of_departure():
if (df['type'].iloc[a] == 'DEFAULT_AIS'):
return df['timestamp_end'].iloc[a] - timedelta(0, 3600 * df['distance'].iloc[
a] / average_exit_speed)
else:
return df["timestamp_start"].iloc[a]

ongoing_excursion_id = add_excursion(session, int(vessel_id),
get_time_of_departure()) # put the create new excursion function here
nb_created_excursion += 1
open_ongoing_excursion = True
df["excursion_id"].iloc[a] = ongoing_excursion_id
# concat the result for current vessel in the result dataframe
if (df.shape[0] > 0):
result = pd.concat([result, df[df["excursion_id"] >= 0]], axis=0)
Expand Down
Loading