From 185908b14c1ed5586e06b5ea99feaac7f2485838 Mon Sep 17 00:00:00 2001 From: Liuxinman Date: Mon, 30 May 2022 17:48:44 +0800 Subject: [PATCH 1/6] Add spark random split script --- .../deepfm/tools/split_dataset.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 RecommenderSystems/deepfm/tools/split_dataset.scala diff --git a/RecommenderSystems/deepfm/tools/split_dataset.scala b/RecommenderSystems/deepfm/tools/split_dataset.scala new file mode 100644 index 000000000..04dcafaf1 --- /dev/null +++ b/RecommenderSystems/deepfm/tools/split_dataset.scala @@ -0,0 +1,16 @@ +import org.apache.spark.sql.functions.udf + +def splitDataset(srcDir: String, dstDir:String) = { + val categorical_names = (1 to 26).map{id=>s"C$id"} + val dense_names = (1 to 13).map{id=>s"I$id"} + val integer_names = Seq("label") ++ dense_names + val col_names = integer_names ++ categorical_names + + val df = spark.read.option("delimiter", "\t").csv(s"${srcDir}/train.txt").toDF(col_names: _*) + + val splits = df.randomSplit(Array(0.8, 0.1, 0.1), seed=2018) + + splits(0).write.option("header", "true").csv(s"${dstDir}/train.csv") + splits(1).write.option("header", "true").csv(s"${dstDir}/valid.csv") + splits(2).write.option("header", "true").csv(s"${dstDir}/test.csv") +} From 32a8b5e8f2f05d76a7fd264cdd45e038178896b2 Mon Sep 17 00:00:00 2001 From: Liuxinman Date: Mon, 30 May 2022 17:49:48 +0800 Subject: [PATCH 2/6] remove old split script --- .../deepfm/tools/split_criteo_kaggle.py | 55 ------------------- 1 file changed, 55 deletions(-) delete mode 100644 RecommenderSystems/deepfm/tools/split_criteo_kaggle.py diff --git a/RecommenderSystems/deepfm/tools/split_criteo_kaggle.py b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.py deleted file mode 100644 index 62e2918ba..000000000 --- a/RecommenderSystems/deepfm/tools/split_criteo_kaggle.py +++ /dev/null @@ -1,55 +0,0 @@ -import numpy as np -import pandas as pd -import argparse -from sklearn.model_selection import StratifiedKFold - -RANDOM_SEED = 2018 # Fix seed for reproduction - - -def split_train_val_test(input_dir, output_dir): - num_dense_fields = 13 - num_sparse_fields = 26 - - fields = ["Label"] - fields += [f"I{i+1}" for i in range(num_dense_fields)] - fields += [f"C{i+1}" for i in range(num_sparse_fields)] - - ddf = pd.read_csv( - f"{input_dir}/train.txt", - sep="\t", - header=None, - names=fields, - encoding="utf-8", - dtype=object, - ) - X = ddf.values - y = ddf["Label"].map(lambda x: float(x)).values - print(f"{len(X)} samples in total") - - folds = StratifiedKFold(n_splits=10, shuffle=True, random_state=RANDOM_SEED) - - fold_indexes = [valid_idx for _, valid_idx in folds.split(X, y)] - test_index = fold_indexes[0] - valid_index = fold_indexes[1] - train_index = np.concatenate(fold_indexes[2:]) - - ddf.loc[test_index, :].to_csv(f"{output_dir}/test.csv", index=False, encoding="utf-8") - ddf.loc[valid_index, :].to_csv(f"{output_dir}/valid.csv", index=False, encoding="utf-8") - ddf.loc[train_index, :].to_csv(f"{output_dir}/train.csv", index=False, encoding="utf-8") - - print("Train lines:", len(train_index)) - print("Validation lines:", len(valid_index)) - print("Test lines:", len(test_index)) - print("Postive ratio:", np.sum(y) / len(y)) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument( - "--input_dir", type=str, required=True, help="Path to downloaded criteo kaggle dataset", - ) - parser.add_argument( - "--output_dir", type=str, required=True, help="Path to splitted criteo kaggle dataset", - ) - args = parser.parse_args() - split_train_val_test(args.input_dir, args.output_dir) From ce4e77dd034b1737c699f0536b09b9636c1754ad Mon Sep 17 00:00:00 2001 From: Liuxinman Date: Tue, 31 May 2022 15:18:22 +0800 Subject: [PATCH 3/6] Update readme; rename split_dataset to split_criteo_kaggle --- RecommenderSystems/deepfm/README.md | 60 +++++++++---------- ...ataset.scala => split_criteo_kaggle.scala} | 2 +- 2 files changed, 31 insertions(+), 31 deletions(-) rename RecommenderSystems/deepfm/tools/{split_dataset.scala => split_criteo_kaggle.scala} (91%) diff --git a/RecommenderSystems/deepfm/README.md b/RecommenderSystems/deepfm/README.md index d41f4c4e3..931eac4a5 100644 --- a/RecommenderSystems/deepfm/README.md +++ b/RecommenderSystems/deepfm/README.md @@ -6,18 +6,17 @@ Screen Shot 2022-04-01 at 4 45 22 PM

- ## Directory description ```txt . -├── deepfm_train_eval.py # OneFlow DeepFM train/val/test scripts with OneEmbedding module -├── README.md # Documentation +├── deepfm_train_eval.py # OneFlow DeepFM train/val/test scripts with OneEmbedding module +├── README.md # Documentation ├── tools -│ ├── deepfm_parquet.scala # Read Criteo Kaggle data and export it as parquet data format -│ └── launch_spark.sh # Spark launching shell script -│ └── split_criteo_kaggle.py # Split criteo kaggle dataset to train\val\test set -├── train_deepfm.sh # DeepFM training shell script +│ ├── deepfm_parquet.scala # Read Criteo Kaggle data and export it as parquet data format +│ └── launch_spark.sh # Spark launching shell script +│ └── split_criteo_kaggle.scala # Split criteo kaggle dataset to train\val\test set +├── train_deepfm.sh # DeepFM training shell script ``` ## Arguments description @@ -38,7 +37,6 @@ We use exactly the same default values as [the DeepFM_Criteo_x4_001 experiment]( | embedding_vec_size | embedding vector size | 16 | | dnn | dnn hidden units number | 1000,1000,1000,1000,1000 | | net_dropout | number of minibatch training interations | 0.2 | -| embedding_vec_size | embedding vector size | 16 | | learning_rate | initial learning rate | 0.001 | | batch_size | training/evaluation batch size | 10000 | | train_batches | the maximum number of training batches | 75000 | @@ -59,7 +57,7 @@ The model is evaluated at the end of every epoch. At the end of each epoch, if t The monitor used for the early stop is `val_auc - val_log_loss`. The mode of the early stop is `max`. You could tune `patience` and `min_delta` as needed. -If you want to disable early stopping, simply add `--disable_early_stop` in the [train_deepfm.sh](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/train_deepfm.sh). +If you want to disable early stopping, simply add `--disable_early_stop` in the **train_deepfm.sh**. ## Getting Started @@ -78,8 +76,6 @@ A hands-on guide to train a DeepFM model. ```json psutil petastorm - pandas - sklearn ``` ### Dataset @@ -90,17 +86,11 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both > χ may include categorical fields (e.g., gender, location) and continuous fields (e.g., age). Each categorical field is represented as a vec- tor of one-hot encoding, and each continuous field is repre- sented as the value itself, or a vector of one-hot encoding after discretization. -1. Download the [Criteo Kaggle dataset](https://www.kaggle.com/c/criteo-display-ad-challenge) and then split it using [split_criteo_kaggle.py](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/tools/split_criteo_kaggle.py). - - Note: Same as [the DeepFM_Criteo_x4_001 experiment](https://github.com/openbenchmark/BARS/tree/master/ctr_prediction/benchmarks/DeepFM/DeepFM_criteo_x4_001) in FuxiCTR, only train.txt is used. Also, the dataset is randomly spllitted into 8:1:1 as training set, validation set and test set. The dataset is splitted using StratifiedKFold in sklearn. - - ```shell - python3 split_criteo_kaggle.py --input_dir=/path/to/your/criteo_kaggle --output_dir=/path/to/your/output/dir - ``` +1. Download the [Criteo Kaggle dataset](https://www.kaggle.com/c/criteo-display-ad-challenge). 2. Download spark from https://spark.apache.org/downloads.html and then uncompress the tar file into the directory where you want to install Spark. Ensure the `SPARK_HOME` environment variable points to the directory where the spark is. -3. launch a spark shell using [launch_spark.sh](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/tools/launch_spark.sh). +3. launch a spark shell using **launch_spark.sh**. - Modify the SPARK_LOCAL_DIRS as needed @@ -110,9 +100,19 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both - Run `bash launch_spark.sh` -4. load [deepfm_parquet.scala](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/tools/deepfm_parquet.scala) to your spark shell by `:load deepfm_parquet.scala`. +4. Load **split_criteo_kaggle.scala** to your spark shell by `:load split_criteo_kaggle.scala`. -5. call the `makeDeepfmDataset(srcDir: String, dstDir:String)` function to generate the dataset. + Then, call the `splitCriteoKaggle(srcDir: String, dstDir:String)` function to split the dataset. + + ```shell + splitCriteoKaggle("/path/to/your/src_dir", "/path/to/your/dst_dir") + ``` + + Note: Same as [the DeepFM_Criteo_x4_001 experiment](https://github.com/openbenchmark/BARS/tree/master/ctr_prediction/benchmarks/DeepFM/DeepFM_criteo_x4_001) in FuxiCTR, only train.txt is used. Also, the dataset is randomly spllitted into 8:1:1 as training set, validation set and test set. The dataset is splitted using RandomSplit in spark. + +5. load **deepfm_parquet.scala** to your spark shell by `:load deepfm_parquet.scala`. + + Then, call the `makeDeepfmDataset(srcDir: String, dstDir:String)` function to generate the dataset. ```shell makeDeepfmDataset("/path/to/your/src_dir", "/path/to/your/dst_dir") @@ -121,17 +121,17 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both After generating parquet dataset, dataset information will also be printed. It contains the information about the number of samples and table size array, which is needed when training. ```txt - train samples = 36672493 - validation samples = 4584062 - test samples = 4584062 + train samples = 36673135 + validation samples = 4584737 + test samples = 4582745 table size array: - 649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376 + 649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376 1460,583,10131227,2202608,305,24,12517,633,3,93145,5683,8351593,3194,27,14992,5461306,10,5652,2173,4,7046547,18,15,286181,105,142572 ``` ### Start Training by Oneflow -1. Modify the [train_deepfm.sh](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/train_deepfm.sh) as needed. +1. Modify the **train_deepfm.sh** as needed. ```shell #!/bin/bash @@ -139,7 +139,7 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both DATA_DIR=/path/to/deepfm_parquet PERSISTENT_PATH=/path/to/persistent MODEL_SAVE_DIR=/path/to/model/save/dir - + python3 -m oneflow.distributed.launch \ --nproc_per_node $DEVICE_NUM_PER_NODE \ --nnodes 1 \ @@ -158,9 +158,9 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both --net_dropout 0.2 \ --learning_rate 0.001 \ --embedding_vec_size 16 \ - --num_train_samples 36672493 \ - --num_val_samples 4584062 \ - --num_test_samples 4584062 \ + --num_train_samples 36673135 \ + --num_val_samples 4584737 \ + --num_test_samples 4582745 \ --model_save_dir $MODEL_SAVE_DIR \ --save_best_model ``` diff --git a/RecommenderSystems/deepfm/tools/split_dataset.scala b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala similarity index 91% rename from RecommenderSystems/deepfm/tools/split_dataset.scala rename to RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala index 04dcafaf1..564f486a0 100644 --- a/RecommenderSystems/deepfm/tools/split_dataset.scala +++ b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala @@ -1,6 +1,6 @@ import org.apache.spark.sql.functions.udf -def splitDataset(srcDir: String, dstDir:String) = { +def splitCriteoKaggle(srcDir: String, dstDir:String) = { val categorical_names = (1 to 26).map{id=>s"C$id"} val dense_names = (1 to 13).map{id=>s"I$id"} val integer_names = Seq("label") ++ dense_names From fbd35091655822709666f50a8a63e383c04cf0a8 Mon Sep 17 00:00:00 2001 From: Liuxinman Date: Tue, 31 May 2022 15:54:58 +0800 Subject: [PATCH 4/6] Update sample# in train_deepfm.sh --- RecommenderSystems/deepfm/train_deepfm.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/RecommenderSystems/deepfm/train_deepfm.sh b/RecommenderSystems/deepfm/train_deepfm.sh index c4b32d107..321b10152 100644 --- a/RecommenderSystems/deepfm/train_deepfm.sh +++ b/RecommenderSystems/deepfm/train_deepfm.sh @@ -22,8 +22,8 @@ python3 -m oneflow.distributed.launch \ --net_dropout 0.2 \ --learning_rate 0.001 \ --embedding_vec_size 16 \ - --num_train_samples 36672493 \ - --num_val_samples 4584062 \ - --num_test_samples 4584062 \ + --num_train_samples 36673135 \ + --num_val_samples 4584737 \ + --num_test_samples 4582745 \ --model_save_dir $MODEL_SAVE_DIR \ --save_best_model From 65e241ef648a123157db656f318a3f849885c7a2 Mon Sep 17 00:00:00 2001 From: Liuxinman Date: Tue, 14 Jun 2022 15:00:48 +0800 Subject: [PATCH 5/6] Update split to cache dataframe --- .../deepfm/tools/split_criteo_kaggle.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala index 564f486a0..775cff680 100644 --- a/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala +++ b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala @@ -1,4 +1,5 @@ import org.apache.spark.sql.functions.udf +import org.apache.spark.storage.StorageLevel def splitCriteoKaggle(srcDir: String, dstDir:String) = { val categorical_names = (1 to 26).map{id=>s"C$id"} @@ -6,9 +7,17 @@ def splitCriteoKaggle(srcDir: String, dstDir:String) = { val integer_names = Seq("label") ++ dense_names val col_names = integer_names ++ categorical_names - val df = spark.read.option("delimiter", "\t").csv(s"${srcDir}/train.txt").toDF(col_names: _*) + val inputDF = spark.read.option("delimiter", ",").csv(s"${srcDir}/train.txt").toDF(col_names: _*) - val splits = df.randomSplit(Array(0.8, 0.1, 0.1), seed=2018) + val df = inputDF.persist(StorageLevel.MEMORY_AND_DISK) + + val splits = df.randomSplit(Array(0.8, 0.1, 0.1), seed=2020) + val train_samples = splits(0).count() + println(s"train samples = $train_samples") + val valid_samples = splits(1).count() + println(s"valid samples = $valid_samples") + val test_samples = splits(2).count() + println(s"test samples = $test_samples") splits(0).write.option("header", "true").csv(s"${dstDir}/train.csv") splits(1).write.option("header", "true").csv(s"${dstDir}/valid.csv") From 1620448e14eea878378719ebfd278afbbc567159 Mon Sep 17 00:00:00 2001 From: Liuxinman Date: Fri, 17 Jun 2022 12:30:43 +0800 Subject: [PATCH 6/6] Fix wrong delimiter --- RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala index 775cff680..424ac79e6 100644 --- a/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala +++ b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala @@ -7,11 +7,11 @@ def splitCriteoKaggle(srcDir: String, dstDir:String) = { val integer_names = Seq("label") ++ dense_names val col_names = integer_names ++ categorical_names - val inputDF = spark.read.option("delimiter", ",").csv(s"${srcDir}/train.txt").toDF(col_names: _*) + val inputDF = spark.read.option("delimiter", "\t").csv(s"${srcDir}/train.txt").toDF(col_names: _*) val df = inputDF.persist(StorageLevel.MEMORY_AND_DISK) - val splits = df.randomSplit(Array(0.8, 0.1, 0.1), seed=2020) + val splits = df.randomSplit(Array(0.8, 0.1, 0.1), seed=2018) val train_samples = splits(0).count() println(s"train samples = $train_samples") val valid_samples = splits(1).count()