Skip to content

Commit

Permalink
Update Readme (#12)
Browse files Browse the repository at this point in the history
* Make spark session an explicit argument

* Update Readme
  • Loading branch information
Tom-Newton authored Jan 18, 2024
1 parent 40ef962 commit d098f14
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 167 deletions.
101 changes: 12 additions & 89 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ Apart from utilising existing high-level implementations, a couple of implementa

The thesis that this project laid the foundation for can be found here: http://www.diva-portal.org/smash/get/diva2:1695672/FULLTEXT01.pdf.

# This project is not being actively developed or maintained. If you wish to continue development on this project, feel free to make a fork!

## Table of contents

Expand All @@ -15,11 +14,7 @@ The thesis that this project laid the foundation for can be found here: http://w
- [Scala (Spark)](#scala-spark)
- [Python (PySpark)](#python-pyspark)
- [Quickstart (Python)](#quickstart-python)
1. [Creating the context](#1-creating-the-context)
2. [Performing a PIT join](#2-performing-a-pit-join)
1. [Early stop sort merge](#21-early-stop-sort-merge)
2. [Union merge](#22-union-merge)
3. [Exploding PIT join](#23-exploding-pit-join)
- [Early stop sort merge](#early-stop-sort-merge)
- [QuickStart (Scala)](#quickstart-scala)
- [Early stop sort merge](#early-stop-sort-merge)
- [Union ASOF merge](#union-asof-merge)
Expand All @@ -29,7 +24,7 @@ The thesis that this project laid the foundation for can be found here: http://w

| Dependency | Version |
| --------------- | ------- |
| Spark & PySpark | 3.2 |
| Spark & PySpark | 3.5.0 |
| Scala | 2.12 |
| Python | >=3.6 |

Expand All @@ -45,6 +40,9 @@ To make the artifacts available for the executors, set the configuration propert

Alternatively, set the configuration property `spark.jars` to include the path to the jar-file to make it available for both the driver and executors.

Additionally set `spark.sql.extensions` to include `io.github.ackuq.pit.SparkPIT`. This config
is a comma separated string.

### Python (PySpark)

Configure Spark using the instructions as observed in [the previous section](#scala-spark).
Expand All @@ -57,72 +55,28 @@ pip install spark-pit

## Quickstart (Python)

### 1. Creating the context

The object `PitContext` is the entrypoint for all of the functionality of the lirary. You can initialize this context with the following code:

```py
from ackuq.pit import PitContext

spark = <your spark session>
pit_context = PitContext(spark)
```

### 2. Performing a PIT join

There are currently 3 ways of executing a PIT join, using an early stop sort merge, union merge algorithm, or with exploding intermediate tables.

#### 2.1. Early stop sort merge

```py
pit_join = df1.join(df2, pit_context.pit_udf(df1.ts, df2.ts) & (df1.id == df2.id))
```

#### 2.2. Union merge
### Early stop sort merge

```py
pit_join = pit_context.union(
left=df1,
right=df2,
left_prefix="df1_",
right_prefix="df2_",
left_ts_column = "ts",
right_ts_column = "ts",
partition_cols=["id"],
)
```
from ackuq.pit.joinPIT import joinPIT

#### 2.3. Exploding PIT join

```py
pit_join = pit_context.exploding(
left=df1,
right=df2,
left_ts_column=df1["ts"],
right_ts_column=df2["ts"],
partition_cols = [df1["id"], df2["id"]],
)
pit_join = joinPIT(spark, df1, df2, df1.ts, df2.ts, (df1.id == df2.id))
```

## Quickstart (Scala)

Instead of using a context, which is done in the Python implementation, all of the functionality is divided into objects.

### Early stop sort merge

```scala
import io.github.ackuq.pit.EarlyStopSortMerge.{pit, init}
import org.apache.spark.sql.functions.lit
import io.github.ackuq.pit.EarlyStopSortMerge.joinPIT

// Pass the spark session, this will register the required stratergies and optimizer rules.
init(spark)

val pitJoin = df1.join(df2, pit(df1("ts"), df2("ts"), lit(0)) && df1("id") === df2("id"))
val pitJoin = joinPIT(df1, df2, df1("ts"), df2("ts"), df1("id") === df2("id"), "inner", 0)
```

#### Adding tolerance

The UDF takes a third argument (required) for tolerance, when this argument is set to a non-null value, the PIT join does not return matches where the timestamp differ by a specific value. E.g. setting the third argument to `lit(3)` would only accept PIT matches that differ by at most 3 time units.
The final argument is the tolerance, when this argument is set to a non-zero value, the PIT join does not return matches where the timestamps differ by more than the specific value. E.g. setting tolerance to `3` would only accept PIT matches that differ by at most 3 time units.

#### Left outer join

Expand All @@ -131,36 +85,5 @@ The default join type for PIT joins are inner joins, but if you'd like to keep a
Usage:

```scala
val pitJoin = df1.join(
df2, pit(df1("ts"), df2("ts"), lit(0)) && df1("id") === df2("id"),
"left"
)
```

### Union merge

```scala
import io.github.ackuq.pit.Union

val pitJoin = Union.join(
df1,
df2,
leftPrefix = Some("df1_"),
rightPrefix = "df2_",
partitionCols = Seq("id")
)
```

### Exploding PIT join

```scala
import io.github.ackuq.pit.Exploding

val pitJoin = Exploding.join(
df1,
df2,
leftTSColumn = df1("ts"),
rightTSColumn = df2("ts"),
partitionCols = Seq((df1("id"), df2("id")))
)
val pitJoin = joinPIT(df1, df2, df1("ts"), df2("ts"), df1("id") === df2("id"), "left")
```
70 changes: 0 additions & 70 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,76 +8,6 @@ This projects aims to expose different ways of executing PIT-joins, also called

In order to run this project in PySpark, you will need to have the JAR file of the Scala implementation be available inside you Spark Session.

## Quickstart

### 1. Creating the context

The object `PitContext` is the entrypoint for all of the functionality of the lirary. You can initialize this context with the following code:

```py
from pyspark import SQLContext
from ackuq.pit import PitContext

sql_context = SQLContext(spark.sparkContext)
pit_context = PitContext(sql_context)
```

### 2. Performing a PIT join

There are currently 3 ways of executing a PIT join, using an early stop sort merge, union asof algorithm, or with exploding intermediate tables.

#### 2.1. Early stop sort merge

```py
pit_join = df1.join(df2, pit_context.pit_udf(df1.ts, df2.ts) & (df1.id == df2.id))
```

##### 2.1.2. Adding tolerance

In this implementation, tolerance can be added to not allow matches whose timestamp differ by at most some value. To utilize this, set the third argument of the UDF to the desired integer value of the maximum different between two timestamps.

```py
pit_join = df1.join(df2, pit_context.pit_udf(df1.ts, df2.ts, 100) & (df1.id == df2.id))
```

##### 2.1.3. Left outer join

Left outer joins are supported in this implementation, the main difference between a regular inner join and a left outer join is that whether or not a left row gets matched with a right row, it will still be a part of the resulting table. In the resulting table, all the left rows that did not find a match have the values of the right columns set to `null`.

```py
pit_join = df1.join(
df2,
pit_context.pit_udf(df1.ts, df2.ts, 100) & (df1.id == df2.id),
"left"
)
```

#### 2.2. Union merge

```py
pit_join = pit_context.union(
left=df1,
right=df2,
left_prefix="df1_",
right_prefix="df2_",
left_ts_column = "ts",
right_ts_column = "ts",
partition_cols=["id"],
)
```

#### 2.3. Exploding PIT join

```py
pit_join = pit_context.exploding(
left=df1,
right=df2,
left_ts_column=df1["ts"],
right_ts_column=df2["ts"],
partition_cols = [df1["id"], df2["id"]],
)
```

## Development

### Testing
Expand Down
7 changes: 4 additions & 3 deletions python/ackuq/pit/joinPIT.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
# SOFTWARE.
#

from pyspark.sql import Column, DataFrame
from pyspark.sql import Column, DataFrame, SparkSession


def joinPIT(
spark: SparkSession,
left: DataFrame,
right: DataFrame,
leftPitKey: Column,
Expand All @@ -34,7 +35,7 @@ def joinPIT(
how: str = "inner",
tolerance: int = 0,
) -> DataFrame:
jdf = left.sparkSession.sparkContext._jvm.io.github.ackuq.pit.EarlyStopSortMerge.joinPIT(
jdf = spark.sparkContext._jvm.io.github.ackuq.pit.EarlyStopSortMerge.joinPIT(
left._jdf,
right._jdf,
leftPitKey._jc,
Expand All @@ -43,4 +44,4 @@ def joinPIT(
how,
tolerance,
)
return DataFrame(jdf, left.sparkSession)
return DataFrame(jdf, spark)
30 changes: 25 additions & 5 deletions python/tests/test_sort_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def test_two_aligned(self):
fg1 = self.small_data.fg1
fg2 = self.small_data.fg2

pit_join = joinPIT(fg1, fg2, fg1["ts"], fg2["ts"], (fg1["id"] == fg2["id"]))
pit_join = joinPIT(
self.spark, fg1, fg2, fg1["ts"], fg2["ts"], (fg1["id"] == fg2["id"])
)

self.assertSchemaEqual(pit_join.schema, self.small_data.PIT_1_2.schema)
self.assertEqual(pit_join.collect(), self.small_data.PIT_1_2.collect())
Expand All @@ -46,7 +48,9 @@ def test_two_misaligned(self):
fg1 = self.small_data.fg1
fg2 = self.small_data.fg3

pit_join = joinPIT(fg1, fg2, fg1["ts"], fg2["ts"], (fg1["id"] == fg2["id"]))
pit_join = joinPIT(
self.spark, fg1, fg2, fg1["ts"], fg2["ts"], (fg1["id"] == fg2["id"])
)

self.assertSchemaEqual(pit_join.schema, self.small_data.PIT_1_3.schema)
self.assertEqual(pit_join.collect(), self.small_data.PIT_1_3.collect())
Expand All @@ -57,14 +61,17 @@ def test_three_misaligned(self):
fg3 = self.small_data.fg3

left = joinPIT(
self.spark,
fg1,
fg2,
fg1["ts"],
fg2["ts"],
(fg1["id"] == fg2["id"]),
)

pit_join = joinPIT(left, fg3, fg1["ts"], fg3["ts"], (fg1["id"] == fg3["id"]))
pit_join = joinPIT(
self.spark, left, fg3, fg1["ts"], fg3["ts"], (fg1["id"] == fg3["id"])
)

self.assertSchemaEqual(pit_join.schema, self.small_data.PIT_1_2_3.schema)
self.assertEqual(pit_join.collect(), self.small_data.PIT_1_2_3.collect())
Expand All @@ -74,7 +81,13 @@ def test_two_tolerance(self):
fg2 = self.small_data.fg3

pit_join = joinPIT(
fg1, fg2, fg1["ts"], fg2["ts"], (fg1["id"] == fg2["id"]), tolerance=1
self.spark,
fg1,
fg2,
fg1["ts"],
fg2["ts"],
(fg1["id"] == fg2["id"]),
tolerance=1,
)
self.assertSchemaEqual(pit_join.schema, self.small_data.PIT_1_3_T1.schema)
self.assertEqual(pit_join.collect(), self.small_data.PIT_1_3_T1.collect())
Expand All @@ -84,7 +97,14 @@ def test_two_tolerance_outer(self):
fg2 = self.small_data.fg3

pit_join = joinPIT(
fg1, fg2, fg1["ts"], fg2["ts"], (fg1["id"] == fg2["id"]), "left", 1
self.spark,
fg1,
fg2,
fg1["ts"],
fg2["ts"],
(fg1["id"] == fg2["id"]),
"left",
1,
)
self.assertSchemaEqual(pit_join.schema, self.small_data.PIT_1_3_T1_OUTER.schema)
self.assertEqual(pit_join.collect(), self.small_data.PIT_1_3_T1_OUTER.collect())

0 comments on commit d098f14

Please sign in to comment.