-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.py
36 lines (28 loc) · 1.2 KB
/
producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from simulator import *
import pandas as pd
from datetime import datetime, timedelta
import json
from kafka import KafkaProducer
if __name__ == "__main__":
df = run_simulation()
producer = KafkaProducer(bootstrap_servers="localhost:9092")
start = datetime.now()
for t in range(5, 3605, 5):
#Exclude waiting_at_origin_node because nothing is happening those time moments
#Exclude trip_end because we already have link data for the same car and timestamp,
#we already know that link was its destination
data = df.loc[(df["t"] == t) & (df["link"] != "waiting_at_origin_node") & (df["link"] != "trip_end")]
for _, value in data.iterrows():
json_data = {
"name": value["name"],
"origin": value["orig"],
"destination": value["dest"],
"time": (start + timedelta(seconds=t)).strftime("%Y-%m-%d %H:%M:%S"),
"link": value["link"],
"position": value["x"],
"spacing": value["s"],
"speed": value["v"]
}
producer.send("vehicle_positions", json.dumps(json_data).encode("utf-8"))
producer.flush()
producer.close()