Skip to content

Commit

Permalink
move update offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks committed Dec 4, 2024
1 parent eb6fa03 commit e4bc357
Showing 1 changed file with 7 additions and 9 deletions.
16 changes: 7 additions & 9 deletions utils/python/hsfs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,28 +301,26 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in
.option("includeHeaders", "true")
.option("failOnDataLoss", "false")
.load()
.limit(5000000)
)

# update offsets
df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect()
offset_dict = json.loads(offset_string)
for offset_row in df_offsets:
offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + 1

# filter only the necassary entries
df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id))
df = df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"]))

# limit the number of records ingested
df = df.limit(5000000)

# deserialize dataframe so that it can be properly saved
deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df)

# insert data
entity.stream = False # to make sure we dont write to kafka
entity.insert(deserialized_df, storage="offline")

# update offsets
df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect()
offset_dict = json.loads(offset_string)
for offset_row in df_offsets:
offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + 1

# save offsets
offset_df = spark.createDataFrame([offset_dict])
offset_df.coalesce(1).write.mode("overwrite").json(offset_location)
Expand Down

0 comments on commit e4bc357

Please sign in to comment.