Skip to content

Commit

Permalink
Create journeys for offers
Browse files Browse the repository at this point in the history
This edit allows to define offers with return flight because
a single interest can have two flights (latter is the return one).
  • Loading branch information
boozec committed Apr 28, 2024
1 parent 903b3b1 commit f99fb73
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 96 deletions.
77 changes: 64 additions & 13 deletions bpmn/acmesky.bpmn
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:modeler="http://camunda.org/schema/modeler/1.0" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" id="Definitions_1" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.20.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.4.0" camunda:diagramRelationId="c3ef9488-c58c-45d0-a430-d837852e5d78">
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:bioc="http://bpmn.io/schema/bpmn/biocolor/1.0" xmlns:color="http://www.omg.org/spec/BPMN/non-normative/color/1.0" xmlns:modeler="http://camunda.org/schema/modeler/1.0" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" id="Definitions_1" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.20.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.4.0" camunda:diagramRelationId="c3ef9488-c58c-45d0-a430-d837852e5d78">
<bpmn:collaboration id="ACMESky" name="BPMN Diagram">
<bpmn:participant id="Participant_ACME" name="ACME Sky" processRef="Process_ACME" />
<bpmn:participant id="Participant_Prontogram" name="Prontogram" processRef="Process_Prontogram" />
Expand Down Expand Up @@ -46,7 +46,7 @@
</bpmn:textAnnotation>
<bpmn:association id="Association_1nkvtkr" associationDirection="None" sourceRef="TM_Journey_Rent_Error" targetRef="TextAnnotation_0id9qv6" />
</bpmn:collaboration>
<bpmn:process id="Process_ACME" name="sd_acmesky" isExecutable="false">
<bpmn:process id="Process_ACME" name="sd_acmesky" isExecutable="true">
<bpmn:laneSet id="LaneSet_1dms957">
<bpmn:lane id="Lane_1s7v4pg" name="Flights manager">
<bpmn:flowNodeRef>Start_Flight_Manager_New_Offer</bpmn:flowNodeRef>
Expand All @@ -59,8 +59,8 @@
</bpmn:lane>
<bpmn:lane id="Lane_0nrkxij" name="Interests management">
<bpmn:flowNodeRef>End_Check_Interests</bpmn:flowNodeRef>
<bpmn:flowNodeRef>Activity_Foreach_Interest</bpmn:flowNodeRef>
<bpmn:flowNodeRef>ST_Get_Available_Flights</bpmn:flowNodeRef>
<bpmn:flowNodeRef>Activity_Foreach_Journey</bpmn:flowNodeRef>
<bpmn:flowNodeRef>ST_Create_Journeys</bpmn:flowNodeRef>
<bpmn:flowNodeRef>TS_Check_Interests</bpmn:flowNodeRef>
<bpmn:childLaneSet id="LaneSet_146b69j" />
</bpmn:lane>
Expand All @@ -85,9 +85,12 @@
<bpmn:flowNodeRef>Activity_Rent_Service</bpmn:flowNodeRef>
<bpmn:flowNodeRef>Event_13jciaj</bpmn:flowNodeRef>
<bpmn:flowNodeRef>CM_Check_Offer</bpmn:flowNodeRef>
<bpmn:flowNodeRef>Event_1vdq7fa</bpmn:flowNodeRef>
<bpmn:flowNodeRef>Activity_0caef9i</bpmn:flowNodeRef>
<bpmn:flowNodeRef>Event_0em1zuc</bpmn:flowNodeRef>
</bpmn:lane>
</bpmn:laneSet>
<bpmn:sequenceFlow id="Flow_066ca6d" sourceRef="Activity_Foreach_Interest" targetRef="End_Check_Interests" />
<bpmn:sequenceFlow id="Flow_066ca6d" sourceRef="Activity_Foreach_Journey" targetRef="End_Check_Interests" />
<bpmn:sequenceFlow id="Flow_0ckkzu4" sourceRef="CM_Check_Offer" targetRef="ST_Retrieve_Offer" />
<bpmn:sequenceFlow id="Flow_1ggiirr" sourceRef="TS_Check_Airline" targetRef="ST_Get_User_Interests" />
<bpmn:sequenceFlow id="Flow_0uv1725" sourceRef="Activity_Foreach_AirlineService" targetRef="End_Check_Airline" />
Expand Down Expand Up @@ -160,12 +163,12 @@
<bpmn:outgoing>Flow_1htessf</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_1qn70rz" messageRef="Message_3o41756" />
</bpmn:startEvent>
<bpmn:subProcess id="Activity_Foreach_Interest" name="For each available flight">
<bpmn:subProcess id="Activity_Foreach_Journey" name="For each journey">
<bpmn:incoming>Flow_15s8hw1</bpmn:incoming>
<bpmn:outgoing>Flow_066ca6d</bpmn:outgoing>
<bpmn:multiInstanceLoopCharacteristics isSequential="true">
<bpmn:extensionElements>
<zeebe:loopCharacteristics inputCollection="=available_flights" />
<zeebe:loopCharacteristics inputCollection="=journeys" />
</bpmn:extensionElements>
</bpmn:multiInstanceLoopCharacteristics>
<bpmn:startEvent id="Start_Check_Each_Interest">
Expand Down Expand Up @@ -553,11 +556,11 @@
<bpmn:endEvent id="End_Check_Airline">
<bpmn:incoming>Flow_0uv1725</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_15s8hw1" sourceRef="ST_Get_Available_Flights" targetRef="Activity_Foreach_Interest" />
<bpmn:sequenceFlow id="Flow_0rpwl97" sourceRef="TS_Check_Interests" targetRef="ST_Get_Available_Flights" />
<bpmn:serviceTask id="ST_Get_Available_Flights" name="Get available flights">
<bpmn:sequenceFlow id="Flow_15s8hw1" sourceRef="ST_Create_Journeys" targetRef="Activity_Foreach_Journey" />
<bpmn:sequenceFlow id="Flow_0rpwl97" sourceRef="TS_Check_Interests" targetRef="ST_Create_Journeys" />
<bpmn:serviceTask id="ST_Create_Journeys" name="Create journeys from available flights">
<bpmn:extensionElements>
<zeebe:taskDefinition type="ST_Get_Available_Flights" />
<zeebe:taskDefinition type="ST_Create_Journeys" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0rpwl97</bpmn:incoming>
<bpmn:outgoing>Flow_15s8hw1</bpmn:outgoing>
Expand All @@ -566,6 +569,10 @@
<bpmn:sourceRef>DB_Available_Flights</bpmn:sourceRef>
<bpmn:targetRef>Property_1sjrqe5</bpmn:targetRef>
</bpmn:dataInputAssociation>
<bpmn:dataInputAssociation id="DataInputAssociation_0zzoevr">
<bpmn:sourceRef>DataStoreReference_1bsvx5n</bpmn:sourceRef>
<bpmn:targetRef>Property_1sjrqe5</bpmn:targetRef>
</bpmn:dataInputAssociation>
</bpmn:serviceTask>
<bpmn:startEvent id="TS_Check_Interests" name="Every 1 hour">
<bpmn:outgoing>Flow_0rpwl97</bpmn:outgoing>
Expand All @@ -577,6 +584,22 @@
<bpmn:outgoing>Flow_0ckkzu4</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_0ejjs91" messageRef="Message_2v61lip" />
</bpmn:startEvent>
<bpmn:startEvent id="Event_1vdq7fa">
<bpmn:outgoing>Flow_0epiiu9</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0epiiu9" sourceRef="Event_1vdq7fa" targetRef="Activity_0caef9i" />
<bpmn:serviceTask id="Activity_0caef9i">
<bpmn:extensionElements>
<zeebe:taskDefinition type="TEST" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0epiiu9</bpmn:incoming>
<bpmn:outgoing>Flow_0hvq3dy</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:endEvent id="Event_0em1zuc">
<bpmn:incoming>Flow_0hvq3dy</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0hvq3dy" sourceRef="Activity_0caef9i" targetRef="Event_0em1zuc" />
<bpmn:dataStoreReference id="DataStoreReference_1bsvx5n" name="Journeys" />
</bpmn:process>
<bpmn:process id="Process_Prontogram" name="sd_prontogram" isExecutable="false">
<bpmn:sequenceFlow id="Flow_0a080if" sourceRef="TM_Propagate_Message_From_Prontogram" targetRef="End_Prontogram" />
Expand Down Expand Up @@ -1110,7 +1133,22 @@
<dc:Bounds x="256" y="1438" width="87" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="BPMNShape_16hdy8m" bpmnElement="Activity_Foreach_Interest" isExpanded="true">
<bpmndi:BPMNShape id="Event_1vdq7fa_di" bpmnElement="Event_1vdq7fa" bioc:stroke="#831311" bioc:fill="#ffcdd2" color:background-color="#ffcdd2" color:border-color="#831311">
<dc:Bounds x="779" y="632" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_15gzw5g_di" bpmnElement="Activity_0caef9i" bioc:stroke="#831311" bioc:fill="#ffcdd2" color:background-color="#ffcdd2" color:border-color="#831311">
<dc:Bounds x="870" y="610" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0em1zuc_di" bpmnElement="Event_0em1zuc" bioc:stroke="#831311" bioc:fill="#ffcdd2" color:background-color="#ffcdd2" color:border-color="#831311">
<dc:Bounds x="1032" y="632" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="DataStoreReference_1bsvx5n_di" bpmnElement="DataStoreReference_1bsvx5n">
<dc:Bounds x="385" y="1025" width="50" height="50" />
<bpmndi:BPMNLabel>
<dc:Bounds x="387" y="1003" width="46" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="BPMNShape_16hdy8m" bpmnElement="Activity_Foreach_Journey" isExpanded="true">
<dc:Bounds x="800" y="1025" width="520" height="250" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
Expand Down Expand Up @@ -1515,7 +1553,7 @@
<bpmndi:BPMNShape id="Event_1xh39ie_di" bpmnElement="End_Check_Airline">
<dc:Bounds x="1632" y="1457" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="BPMNShape_10ve1sc" bpmnElement="ST_Get_Available_Flights">
<bpmndi:BPMNShape id="BPMNShape_10ve1sc" bpmnElement="ST_Create_Journeys">
<dc:Bounds x="650" y="1110" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
Expand Down Expand Up @@ -1637,6 +1675,14 @@
<di:waypoint x="588" y="1150" />
<di:waypoint x="650" y="1150" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0epiiu9_di" bpmnElement="Flow_0epiiu9">
<di:waypoint x="815" y="650" />
<di:waypoint x="870" y="650" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0hvq3dy_di" bpmnElement="Flow_0hvq3dy">
<di:waypoint x="970" y="650" />
<di:waypoint x="1032" y="650" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="Participant_1iv87gq_di" bpmnElement="Participant_Prontogram" isHorizontal="true">
<dc:Bounds x="900" y="400" width="700" height="163" />
<bpmndi:BPMNLabel />
Expand Down Expand Up @@ -2050,6 +2096,11 @@
<di:waypoint x="1150" y="1110" />
<di:waypoint x="1150" y="498" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="DataInputAssociation_0zzoevr_di" bpmnElement="DataInputAssociation_0zzoevr">
<di:waypoint x="435" y="1050" />
<di:waypoint x="700" y="1050" />
<di:waypoint x="700" y="1110" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
{Name: "TM_Ack_Flight_Request_Save", Handler: acmeskyHandlers.TMAckFlightRequestSave, Message: &acmejob.MessageCommand{Name: "CM_Ack_Flight_Request_Save", CorrelationKey: "0"}},

// Interests manager lane
{Name: "ST_Get_Available_Flights", Handler: acmeskyHandlers.STGetAvailableFlights},
{Name: "ST_Create_Journeys", Handler: acmeskyHandlers.STCreateJourneys},
{Name: "ST_Prepare_Offer", Handler: acmeskyHandlers.STPrepareOffer},
{Name: "TM_Send_Offer", Handler: acmeskyHandlers.TMSendOffer, Message: &acmejob.MessageCommand{Name: "CM_New_Message_For_Prontogram", CorrelationKey: "0"}},

Expand Down
1 change: 1 addition & 0 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func InitDb(dsn string) (*gorm.DB, error) {
&models.Interest{},
&models.AvailableFlight{},
&models.Offer{},
&models.Journey{},
)
}

Expand Down
121 changes: 121 additions & 0 deletions internal/handlers/acmesky/st_create_journeys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package handlers

import (
"context"

"github.com/charmbracelet/log"

"github.com/acme-sky/workers/internal/db"
acmejob "github.com/acme-sky/workers/internal/job"
"github.com/acme-sky/workers/internal/models"
"github.com/camunda/zeebe/clients/go/v8/pkg/entities"
"github.com/camunda/zeebe/clients/go/v8/pkg/worker"
)

// Service Task raised by ACMESky Interests Manager lame every 1 hour.
// Get available flights info from the database and create journeys.
// by "Activity_Foreach_Journey".
func STCreateJourneys(client worker.JobClient, job entities.Job) {
jobKey := job.GetKey()

variables, err := job.GetVariablesAsMap()
if err != nil {
acmejob.FailJob(client, job)
return
}

db, _ := db.GetDb()
var available_flights []models.AvailableFlight

if found := db.Where("departaure_time::date >= now()::date AND offer_sent = false").Preload("User").Preload("Interest").Find(&available_flights); found == nil {
log.Errorf("[%s] [%d] Interests not found", job.Type, jobKey)
acmejob.FailJob(client, job)
return
}

interests := make(map[int][]models.AvailableFlight)

var journeys []uint

for _, flight := range available_flights {
if flight.InterestId != nil {
interests[*flight.InterestId] = append(interests[*flight.InterestId], flight)
} else {
in := map[string]interface{}{
"flight1_id": flight.Id,
"user_id": flight.UserId,
"cost": flight.Cost,
}
input, err := models.ValidateJourney(db, in)

if err != nil {
log.Errorf("[%s] [%d] Error creating journey: %s", job.Type, jobKey, err.Error())
acmejob.FailJob(client, job)
return
}

journey := models.NewJourney(*input)
if err := db.Create(&journey).Error; err != nil {
log.Errorf("[%s] [%d] Journey not saved: %s", job.Type, jobKey, err.Error())
} else {
log.Infof("[%s] [%d] Journey saved", job.Type, jobKey)
journeys = append(journeys, journey.Id)
}
}
}

for _, flights := range interests {
var in map[string]interface{}

if len(flights) == 2 {
in = map[string]interface{}{
"flight1_id": flights[0].Id,
"flight2_id": flights[1].Id,
"user_id": flights[0].UserId,
"cost": flights[0].Cost + flights[1].Cost,
}
} else {
in = map[string]interface{}{
"flight1_id": flights[0].Id,
"user_id": flights[0].UserId,
"cost": flights[0].Cost,
}
}

input, err := models.ValidateJourney(db, in)

if err != nil {
log.Errorf("[%s] [%d] Error creating journey: %s", job.Type, jobKey, err.Error())
acmejob.FailJob(client, job)
return
}

journey := models.NewJourney(*input)
if err := db.Preload("AvailableFlight").Preload("User").Create(&journey).Error; err != nil {
log.Errorf("[%s] [%d] Journey not saved: %s", job.Type, jobKey, err.Error())
} else {
log.Infof("[%s] [%d] Journey saved", job.Type, jobKey)
journeys = append(journeys, journey.Id)
}
}

variables["journeys"] = journeys

request, err := client.NewCompleteJobCommand().JobKey(jobKey).VariablesFromMap(variables)
if err != nil {
acmejob.FailJob(client, job)
return
}

ctx := context.Background()
_, err = request.Send(ctx)
if err != nil {
acmejob.FailJob(client, job)
return
}

log.Infof("[%s] [%d] Successfully completed job", job.Type, jobKey)

acmejob.JobVariables[job.Type] <- variables
acmejob.JobStatuses.Close(job.Type, 0)
}
55 changes: 0 additions & 55 deletions internal/handlers/acmesky/st_get_available_flights.go

This file was deleted.

Loading

0 comments on commit f99fb73

Please sign in to comment.