Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev deepfm spark random split #343

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 30 additions & 29 deletions RecommenderSystems/deepfm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@
<img width="539" alt="Screen Shot 2022-04-01 at 4 45 22 PM" src="https://user-images.githubusercontent.com/46690197/161228714-ae9410bb-56db-46b0-8f0b-cb8becb6ee03.png">
</p>


## Directory description

```txt
.
├── deepfm_train_eval.py # OneFlow DeepFM train/val/test scripts with OneEmbedding module
├── README.md # Documentation
├── deepfm_train_eval.py # OneFlow DeepFM train/val/test scripts with OneEmbedding module
├── README.md # Documentation
├── tools
│ ├── deepfm_parquet.scala # Read Criteo Kaggle data and export it as parquet data format
│ └── launch_spark.sh # Spark launching shell script
│ └── split_criteo_kaggle.py # Split criteo kaggle dataset to train\val\test set
├── train_deepfm.sh # DeepFM training shell script
│ ├── deepfm_parquet.scala # Read Criteo Kaggle data and export it as parquet data format
│ └── launch_spark.sh # Spark launching shell script
│ └── split_criteo_kaggle.scala # Split criteo kaggle dataset to train\val\test set
├── train_deepfm.sh # DeepFM training shell script
```

## Arguments description
Expand Down Expand Up @@ -59,7 +58,7 @@ The model is evaluated at the end of every epoch. At the end of each epoch, if t

The monitor used for the early stop is `val_auc - val_log_loss`. The mode of the early stop is `max`. You could tune `patience` and `min_delta` as needed.

If you want to disable early stopping, simply add `--disable_early_stop` in the [train_deepfm.sh](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/train_deepfm.sh).
If you want to disable early stopping, simply add `--disable_early_stop` in the **train_deepfm.sh**.

## Getting Started

Expand All @@ -78,8 +77,6 @@ A hands-on guide to train a DeepFM model.
```json
psutil
petastorm
pandas
sklearn
```

### Dataset
Expand All @@ -90,17 +87,11 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both

> χ may include categorical fields (e.g., gender, location) and continuous fields (e.g., age). Each categorical field is represented as a vec- tor of one-hot encoding, and each continuous field is repre- sented as the value itself, or a vector of one-hot encoding after discretization.

1. Download the [Criteo Kaggle dataset](https://www.kaggle.com/c/criteo-display-ad-challenge) and then split it using [split_criteo_kaggle.py](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/tools/split_criteo_kaggle.py).

Note: Same as [the DeepFM_Criteo_x4_001 experiment](https://github.com/openbenchmark/BARS/tree/master/ctr_prediction/benchmarks/DeepFM/DeepFM_criteo_x4_001) in FuxiCTR, only train.txt is used. Also, the dataset is randomly spllitted into 8:1:1 as training set, validation set and test set. The dataset is splitted using StratifiedKFold in sklearn.

```shell
python3 split_criteo_kaggle.py --input_dir=/path/to/your/criteo_kaggle --output_dir=/path/to/your/output/dir
```
1. Download the [Criteo Kaggle dataset](https://www.kaggle.com/c/criteo-display-ad-challenge).

2. Download spark from https://spark.apache.org/downloads.html and then uncompress the tar file into the directory where you want to install Spark. Ensure the `SPARK_HOME` environment variable points to the directory where the spark is.

3. launch a spark shell using [launch_spark.sh](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/tools/launch_spark.sh).
3. launch a spark shell using **launch_spark.sh**.

- Modify the SPARK_LOCAL_DIRS as needed

Expand All @@ -110,9 +101,19 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both

- Run `bash launch_spark.sh`

4. load [deepfm_parquet.scala](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/tools/deepfm_parquet.scala) to your spark shell by `:load deepfm_parquet.scala`.
4. Load **split_criteo_kaggle.scala** to your spark shell by `:load split_criteo_kaggle.scala`.

Then, call the `splitCriteoKaggle(srcDir: String, dstDir:String)` function to split the dataset.

5. call the `makeDeepfmDataset(srcDir: String, dstDir:String)` function to generate the dataset.
```shell
splitCriteoKaggle("/path/to/your/src_dir", "/path/to/your/dst_dir")
```

Note: Same as [the DeepFM_Criteo_x4_001 experiment](https://github.com/openbenchmark/BARS/tree/master/ctr_prediction/benchmarks/DeepFM/DeepFM_criteo_x4_001) in FuxiCTR, only train.txt is used. Also, the dataset is randomly spllitted into 8:1:1 as training set, validation set and test set. The dataset is splitted using RandomSplit in spark.

5. load **deepfm_parquet.scala** to your spark shell by `:load deepfm_parquet.scala`.

Then, call the `makeDeepfmDataset(srcDir: String, dstDir:String)` function to generate the dataset.

```shell
makeDeepfmDataset("/path/to/your/src_dir", "/path/to/your/dst_dir")
Expand All @@ -121,25 +122,25 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both
After generating parquet dataset, dataset information will also be printed. It contains the information about the number of samples and table size array, which is needed when training.

```txt
train samples = 36672493
validation samples = 4584062
test samples = 4584062
train samples = 36673135
validation samples = 4584737
test samples = 4582745
table size array:
649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376
649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376
1460,583,10131227,2202608,305,24,12517,633,3,93145,5683,8351593,3194,27,14992,5461306,10,5652,2173,4,7046547,18,15,286181,105,142572
```

### Start Training by Oneflow

1. Modify the [train_deepfm.sh](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/train_deepfm.sh) as needed.
1. Modify the **train_deepfm.sh** as needed.

```shell
#!/bin/bash
DEVICE_NUM_PER_NODE=1
DATA_DIR=/path/to/deepfm_parquet
PERSISTENT_PATH=/path/to/persistent
MODEL_SAVE_DIR=/path/to/model/save/dir

python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes 1 \
Expand All @@ -158,9 +159,9 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both
--net_dropout 0.2 \
--learning_rate 0.001 \
--embedding_vec_size 16 \
--num_train_samples 36672493 \
--num_val_samples 4584062 \
--num_test_samples 4584062 \
--num_train_samples 36673135 \
--num_val_samples 4584737 \
--num_test_samples 4582745 \
--model_save_dir $MODEL_SAVE_DIR \
--save_best_model
```
Expand Down
55 changes: 0 additions & 55 deletions RecommenderSystems/deepfm/tools/split_criteo_kaggle.py

This file was deleted.

25 changes: 25 additions & 0 deletions RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import org.apache.spark.sql.functions.udf
import org.apache.spark.storage.StorageLevel

def splitCriteoKaggle(srcDir: String, dstDir:String) = {
val categorical_names = (1 to 26).map{id=>s"C$id"}
val dense_names = (1 to 13).map{id=>s"I$id"}
val integer_names = Seq("label") ++ dense_names
val col_names = integer_names ++ categorical_names

val inputDF = spark.read.option("delimiter", "\t").csv(s"${srcDir}/train.txt").toDF(col_names: _*)

val df = inputDF.persist(StorageLevel.MEMORY_AND_DISK)

val splits = df.randomSplit(Array(0.8, 0.1, 0.1), seed=2018)
val train_samples = splits(0).count()
println(s"train samples = $train_samples")
val valid_samples = splits(1).count()
println(s"valid samples = $valid_samples")
val test_samples = splits(2).count()
println(s"test samples = $test_samples")

splits(0).write.option("header", "true").csv(s"${dstDir}/train.csv")
splits(1).write.option("header", "true").csv(s"${dstDir}/valid.csv")
splits(2).write.option("header", "true").csv(s"${dstDir}/test.csv")
}
6 changes: 3 additions & 3 deletions RecommenderSystems/deepfm/train_deepfm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ python3 -m oneflow.distributed.launch \
--net_dropout 0.2 \
--learning_rate 0.001 \
--embedding_vec_size 16 \
--num_train_samples 36672493 \
--num_val_samples 4584062 \
--num_test_samples 4584062 \
--num_train_samples 36673135 \
--num_val_samples 4584737 \
--num_test_samples 4582745 \
--model_save_dir $MODEL_SAVE_DIR \
--save_best_model