-
Notifications
You must be signed in to change notification settings - Fork 0
/
spark_consumer.py
83 lines (66 loc) · 3.1 KB
/
spark_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, from_json, count, avg
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType
from pyspark.sql.functions import window
import sys
#==============================================================================================
def mdb_write(batch, batch_id):
batch.write\
.format("mongodb")\
.mode("append")\
.option("spark.mongodb.connection.uri", "mongodb://localhost:27017/")\
.option("database", "big_data_project_2024")\
.option("collection", "raw")\
.save()
def mdb_processed_write(batch, batch_id):
batch.write\
.format("mongodb")\
.mode("append")\
.option("spark.mongodb.connection.uri", "mongodb://localhost:27017/")\
.option("database", "big_data_project_2024")\
.option("collection", "processed")\
.save()
#==============================================================================================
spark = SparkSession.builder.appName("Traffic Data").getOrCreate()
#Read from kafka stream
#==============================================================================================
df = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "vehicle_positions")\
.load()
#Force-enable Intellisense
assert isinstance(df, DataFrame)
#Define schema for the json column
#==============================================================================================
schema = StructType([
StructField("name", StringType()),
StructField("origin", StringType()),
StructField("destination", StringType()),
StructField("time", TimestampType()),
StructField("link", StringType()),
StructField("position", FloatType()),
StructField("spacing", FloatType()),
StructField("speed", FloatType())
])
#df is a kafka message, with a binary "value" field
#We need to cast it into json string
#==============================================================================================
df = df.select(from_json(col("value").cast("string"), schema).alias("data"))
df = df.select("data.*")
#Write raw data to mongo
#==============================================================================================
mdb_raw_query = df.writeStream.outputMode("append").foreachBatch(mdb_write).start()
#Perform all the required calculations
#==============================================================================================
df = df.select("time", "link", "name", "speed")
df = df.withWatermark("time", "1 milliseconds").groupby(window("time", "1 seconds"), "link").agg(
count("name").alias("vcount"),
avg("speed").alias("vspeed"),
)
df = df.select(col("window.start").alias("time"), col("link"), col("vcount"), col("vspeed"))
#Write processed data to mongodb
#==============================================================================================
mdb_processed_query = df.writeStream.outputMode("append").foreachBatch(mdb_processed_write).start()
mdb_raw_query.awaitTermination()
mdb_processed_query.awaitTermination()