From d3bc820b7fc740198f28dc11df02335e40f85ccc Mon Sep 17 00:00:00 2001 From: yadavved01 <78868250+yadavved01@users.noreply.github.com> Date: Sat, 11 Feb 2023 10:57:36 +0100 Subject: [PATCH] Create Data_challenge --- data/Data_challenge | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 data/Data_challenge diff --git a/data/Data_challenge b/data/Data_challenge new file mode 100644 index 00000000..b3641d61 --- /dev/null +++ b/data/Data_challenge @@ -0,0 +1,40 @@ +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + +object EventAnalyzer { + def main(args: Array[String]): Unit = { + val spark = SparkSession + .builder() + .appName("Event Analyzer") + .getOrCreate() + + val eventsDF: DataFrame = spark.read.json("events.json") + + // Filter events based on type + val filteredDF = eventsDF.filter(col("type") === "pageview" || col("type") === "consent.given" || col("type") === "consent.asked") + + // Convert the datetime to a date + val eventsWithDateDF = filteredDF.withColumn("date", to_date(col("datetime"))) + + // Extract the status of purposes/vendors from the user token + val extractedDF = eventsWithDateDF.withColumn("enabled_purposes", getField(from_json(col("user.token"), StructType(Array(StructField("purposes", StructType(Array(StructField("enabled", ArrayType(StringType, true), true))))), true), "purposes.enabled")) + val withPurposeDF = extractedDF.withColumn("positive_consent", size(col("enabled_purposes")) > 0) + + // Group by date and type of event + val groupedDF = withPurposeDF + .groupBy("date", "positive_consent") + .agg(count("*").alias("count")) + + // Calculate the percentage of consent given for each date + val resultDF = groupedDF + .groupBy("date") + .pivot("positive_consent", Seq(false, true)) + .agg(sum("count")) + .withColumn("percentage", col(true) / (col(false) + col(true))) + + resultDF.show() + + spark.stop() + } +}