From 185908b14c1ed5586e06b5ea99feaac7f2485838 Mon Sep 17 00:00:00 2001
From: Liuxinman
Date: Mon, 30 May 2022 17:48:44 +0800
Subject: [PATCH 1/6] Add spark random split script
---
.../deepfm/tools/split_dataset.scala | 16 ++++++++++++++++
1 file changed, 16 insertions(+)
create mode 100644 RecommenderSystems/deepfm/tools/split_dataset.scala
diff --git a/RecommenderSystems/deepfm/tools/split_dataset.scala b/RecommenderSystems/deepfm/tools/split_dataset.scala
new file mode 100644
index 000000000..04dcafaf1
--- /dev/null
+++ b/RecommenderSystems/deepfm/tools/split_dataset.scala
@@ -0,0 +1,16 @@
+import org.apache.spark.sql.functions.udf
+
+def splitDataset(srcDir: String, dstDir:String) = {
+ val categorical_names = (1 to 26).map{id=>s"C$id"}
+ val dense_names = (1 to 13).map{id=>s"I$id"}
+ val integer_names = Seq("label") ++ dense_names
+ val col_names = integer_names ++ categorical_names
+
+ val df = spark.read.option("delimiter", "\t").csv(s"${srcDir}/train.txt").toDF(col_names: _*)
+
+ val splits = df.randomSplit(Array(0.8, 0.1, 0.1), seed=2018)
+
+ splits(0).write.option("header", "true").csv(s"${dstDir}/train.csv")
+ splits(1).write.option("header", "true").csv(s"${dstDir}/valid.csv")
+ splits(2).write.option("header", "true").csv(s"${dstDir}/test.csv")
+}
From 32a8b5e8f2f05d76a7fd264cdd45e038178896b2 Mon Sep 17 00:00:00 2001
From: Liuxinman
Date: Mon, 30 May 2022 17:49:48 +0800
Subject: [PATCH 2/6] remove old split script
---
.../deepfm/tools/split_criteo_kaggle.py | 55 -------------------
1 file changed, 55 deletions(-)
delete mode 100644 RecommenderSystems/deepfm/tools/split_criteo_kaggle.py
diff --git a/RecommenderSystems/deepfm/tools/split_criteo_kaggle.py b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.py
deleted file mode 100644
index 62e2918ba..000000000
--- a/RecommenderSystems/deepfm/tools/split_criteo_kaggle.py
+++ /dev/null
@@ -1,55 +0,0 @@
-import numpy as np
-import pandas as pd
-import argparse
-from sklearn.model_selection import StratifiedKFold
-
-RANDOM_SEED = 2018 # Fix seed for reproduction
-
-
-def split_train_val_test(input_dir, output_dir):
- num_dense_fields = 13
- num_sparse_fields = 26
-
- fields = ["Label"]
- fields += [f"I{i+1}" for i in range(num_dense_fields)]
- fields += [f"C{i+1}" for i in range(num_sparse_fields)]
-
- ddf = pd.read_csv(
- f"{input_dir}/train.txt",
- sep="\t",
- header=None,
- names=fields,
- encoding="utf-8",
- dtype=object,
- )
- X = ddf.values
- y = ddf["Label"].map(lambda x: float(x)).values
- print(f"{len(X)} samples in total")
-
- folds = StratifiedKFold(n_splits=10, shuffle=True, random_state=RANDOM_SEED)
-
- fold_indexes = [valid_idx for _, valid_idx in folds.split(X, y)]
- test_index = fold_indexes[0]
- valid_index = fold_indexes[1]
- train_index = np.concatenate(fold_indexes[2:])
-
- ddf.loc[test_index, :].to_csv(f"{output_dir}/test.csv", index=False, encoding="utf-8")
- ddf.loc[valid_index, :].to_csv(f"{output_dir}/valid.csv", index=False, encoding="utf-8")
- ddf.loc[train_index, :].to_csv(f"{output_dir}/train.csv", index=False, encoding="utf-8")
-
- print("Train lines:", len(train_index))
- print("Validation lines:", len(valid_index))
- print("Test lines:", len(test_index))
- print("Postive ratio:", np.sum(y) / len(y))
-
-
-if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "--input_dir", type=str, required=True, help="Path to downloaded criteo kaggle dataset",
- )
- parser.add_argument(
- "--output_dir", type=str, required=True, help="Path to splitted criteo kaggle dataset",
- )
- args = parser.parse_args()
- split_train_val_test(args.input_dir, args.output_dir)
From ce4e77dd034b1737c699f0536b09b9636c1754ad Mon Sep 17 00:00:00 2001
From: Liuxinman
Date: Tue, 31 May 2022 15:18:22 +0800
Subject: [PATCH 3/6] Update readme; rename split_dataset to
split_criteo_kaggle
---
RecommenderSystems/deepfm/README.md | 60 +++++++++----------
...ataset.scala => split_criteo_kaggle.scala} | 2 +-
2 files changed, 31 insertions(+), 31 deletions(-)
rename RecommenderSystems/deepfm/tools/{split_dataset.scala => split_criteo_kaggle.scala} (91%)
diff --git a/RecommenderSystems/deepfm/README.md b/RecommenderSystems/deepfm/README.md
index d41f4c4e3..931eac4a5 100644
--- a/RecommenderSystems/deepfm/README.md
+++ b/RecommenderSystems/deepfm/README.md
@@ -6,18 +6,17 @@
-
## Directory description
```txt
.
-├── deepfm_train_eval.py # OneFlow DeepFM train/val/test scripts with OneEmbedding module
-├── README.md # Documentation
+├── deepfm_train_eval.py # OneFlow DeepFM train/val/test scripts with OneEmbedding module
+├── README.md # Documentation
├── tools
-│ ├── deepfm_parquet.scala # Read Criteo Kaggle data and export it as parquet data format
-│ └── launch_spark.sh # Spark launching shell script
-│ └── split_criteo_kaggle.py # Split criteo kaggle dataset to train\val\test set
-├── train_deepfm.sh # DeepFM training shell script
+│ ├── deepfm_parquet.scala # Read Criteo Kaggle data and export it as parquet data format
+│ └── launch_spark.sh # Spark launching shell script
+│ └── split_criteo_kaggle.scala # Split criteo kaggle dataset to train\val\test set
+├── train_deepfm.sh # DeepFM training shell script
```
## Arguments description
@@ -38,7 +37,6 @@ We use exactly the same default values as [the DeepFM_Criteo_x4_001 experiment](
| embedding_vec_size | embedding vector size | 16 |
| dnn | dnn hidden units number | 1000,1000,1000,1000,1000 |
| net_dropout | number of minibatch training interations | 0.2 |
-| embedding_vec_size | embedding vector size | 16 |
| learning_rate | initial learning rate | 0.001 |
| batch_size | training/evaluation batch size | 10000 |
| train_batches | the maximum number of training batches | 75000 |
@@ -59,7 +57,7 @@ The model is evaluated at the end of every epoch. At the end of each epoch, if t
The monitor used for the early stop is `val_auc - val_log_loss`. The mode of the early stop is `max`. You could tune `patience` and `min_delta` as needed.
-If you want to disable early stopping, simply add `--disable_early_stop` in the [train_deepfm.sh](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/train_deepfm.sh).
+If you want to disable early stopping, simply add `--disable_early_stop` in the **train_deepfm.sh**.
## Getting Started
@@ -78,8 +76,6 @@ A hands-on guide to train a DeepFM model.
```json
psutil
petastorm
- pandas
- sklearn
```
### Dataset
@@ -90,17 +86,11 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both
> χ may include categorical fields (e.g., gender, location) and continuous fields (e.g., age). Each categorical field is represented as a vec- tor of one-hot encoding, and each continuous field is repre- sented as the value itself, or a vector of one-hot encoding after discretization.
-1. Download the [Criteo Kaggle dataset](https://www.kaggle.com/c/criteo-display-ad-challenge) and then split it using [split_criteo_kaggle.py](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/tools/split_criteo_kaggle.py).
-
- Note: Same as [the DeepFM_Criteo_x4_001 experiment](https://github.com/openbenchmark/BARS/tree/master/ctr_prediction/benchmarks/DeepFM/DeepFM_criteo_x4_001) in FuxiCTR, only train.txt is used. Also, the dataset is randomly spllitted into 8:1:1 as training set, validation set and test set. The dataset is splitted using StratifiedKFold in sklearn.
-
- ```shell
- python3 split_criteo_kaggle.py --input_dir=/path/to/your/criteo_kaggle --output_dir=/path/to/your/output/dir
- ```
+1. Download the [Criteo Kaggle dataset](https://www.kaggle.com/c/criteo-display-ad-challenge).
2. Download spark from https://spark.apache.org/downloads.html and then uncompress the tar file into the directory where you want to install Spark. Ensure the `SPARK_HOME` environment variable points to the directory where the spark is.
-3. launch a spark shell using [launch_spark.sh](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/tools/launch_spark.sh).
+3. launch a spark shell using **launch_spark.sh**.
- Modify the SPARK_LOCAL_DIRS as needed
@@ -110,9 +100,19 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both
- Run `bash launch_spark.sh`
-4. load [deepfm_parquet.scala](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/tools/deepfm_parquet.scala) to your spark shell by `:load deepfm_parquet.scala`.
+4. Load **split_criteo_kaggle.scala** to your spark shell by `:load split_criteo_kaggle.scala`.
-5. call the `makeDeepfmDataset(srcDir: String, dstDir:String)` function to generate the dataset.
+ Then, call the `splitCriteoKaggle(srcDir: String, dstDir:String)` function to split the dataset.
+
+ ```shell
+ splitCriteoKaggle("/path/to/your/src_dir", "/path/to/your/dst_dir")
+ ```
+
+ Note: Same as [the DeepFM_Criteo_x4_001 experiment](https://github.com/openbenchmark/BARS/tree/master/ctr_prediction/benchmarks/DeepFM/DeepFM_criteo_x4_001) in FuxiCTR, only train.txt is used. Also, the dataset is randomly spllitted into 8:1:1 as training set, validation set and test set. The dataset is splitted using RandomSplit in spark.
+
+5. load **deepfm_parquet.scala** to your spark shell by `:load deepfm_parquet.scala`.
+
+ Then, call the `makeDeepfmDataset(srcDir: String, dstDir:String)` function to generate the dataset.
```shell
makeDeepfmDataset("/path/to/your/src_dir", "/path/to/your/dst_dir")
@@ -121,17 +121,17 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both
After generating parquet dataset, dataset information will also be printed. It contains the information about the number of samples and table size array, which is needed when training.
```txt
- train samples = 36672493
- validation samples = 4584062
- test samples = 4584062
+ train samples = 36673135
+ validation samples = 4584737
+ test samples = 4582745
table size array:
- 649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376
+ 649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376
1460,583,10131227,2202608,305,24,12517,633,3,93145,5683,8351593,3194,27,14992,5461306,10,5652,2173,4,7046547,18,15,286181,105,142572
```
### Start Training by Oneflow
-1. Modify the [train_deepfm.sh](https://github.com/Oneflow-Inc/models/blob/dev_deepfm_multicol_oneemb/RecommenderSystems/deepfm/train_deepfm.sh) as needed.
+1. Modify the **train_deepfm.sh** as needed.
```shell
#!/bin/bash
@@ -139,7 +139,7 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both
DATA_DIR=/path/to/deepfm_parquet
PERSISTENT_PATH=/path/to/persistent
MODEL_SAVE_DIR=/path/to/model/save/dir
-
+
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes 1 \
@@ -158,9 +158,9 @@ According to [the DeepFM paper](https://arxiv.org/abs/1703.04247), we treat both
--net_dropout 0.2 \
--learning_rate 0.001 \
--embedding_vec_size 16 \
- --num_train_samples 36672493 \
- --num_val_samples 4584062 \
- --num_test_samples 4584062 \
+ --num_train_samples 36673135 \
+ --num_val_samples 4584737 \
+ --num_test_samples 4582745 \
--model_save_dir $MODEL_SAVE_DIR \
--save_best_model
```
diff --git a/RecommenderSystems/deepfm/tools/split_dataset.scala b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala
similarity index 91%
rename from RecommenderSystems/deepfm/tools/split_dataset.scala
rename to RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala
index 04dcafaf1..564f486a0 100644
--- a/RecommenderSystems/deepfm/tools/split_dataset.scala
+++ b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala
@@ -1,6 +1,6 @@
import org.apache.spark.sql.functions.udf
-def splitDataset(srcDir: String, dstDir:String) = {
+def splitCriteoKaggle(srcDir: String, dstDir:String) = {
val categorical_names = (1 to 26).map{id=>s"C$id"}
val dense_names = (1 to 13).map{id=>s"I$id"}
val integer_names = Seq("label") ++ dense_names
From fbd35091655822709666f50a8a63e383c04cf0a8 Mon Sep 17 00:00:00 2001
From: Liuxinman
Date: Tue, 31 May 2022 15:54:58 +0800
Subject: [PATCH 4/6] Update sample# in train_deepfm.sh
---
RecommenderSystems/deepfm/train_deepfm.sh | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/RecommenderSystems/deepfm/train_deepfm.sh b/RecommenderSystems/deepfm/train_deepfm.sh
index c4b32d107..321b10152 100644
--- a/RecommenderSystems/deepfm/train_deepfm.sh
+++ b/RecommenderSystems/deepfm/train_deepfm.sh
@@ -22,8 +22,8 @@ python3 -m oneflow.distributed.launch \
--net_dropout 0.2 \
--learning_rate 0.001 \
--embedding_vec_size 16 \
- --num_train_samples 36672493 \
- --num_val_samples 4584062 \
- --num_test_samples 4584062 \
+ --num_train_samples 36673135 \
+ --num_val_samples 4584737 \
+ --num_test_samples 4582745 \
--model_save_dir $MODEL_SAVE_DIR \
--save_best_model
From 65e241ef648a123157db656f318a3f849885c7a2 Mon Sep 17 00:00:00 2001
From: Liuxinman
Date: Tue, 14 Jun 2022 15:00:48 +0800
Subject: [PATCH 5/6] Update split to cache dataframe
---
.../deepfm/tools/split_criteo_kaggle.scala | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git a/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala
index 564f486a0..775cff680 100644
--- a/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala
+++ b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala
@@ -1,4 +1,5 @@
import org.apache.spark.sql.functions.udf
+import org.apache.spark.storage.StorageLevel
def splitCriteoKaggle(srcDir: String, dstDir:String) = {
val categorical_names = (1 to 26).map{id=>s"C$id"}
@@ -6,9 +7,17 @@ def splitCriteoKaggle(srcDir: String, dstDir:String) = {
val integer_names = Seq("label") ++ dense_names
val col_names = integer_names ++ categorical_names
- val df = spark.read.option("delimiter", "\t").csv(s"${srcDir}/train.txt").toDF(col_names: _*)
+ val inputDF = spark.read.option("delimiter", ",").csv(s"${srcDir}/train.txt").toDF(col_names: _*)
- val splits = df.randomSplit(Array(0.8, 0.1, 0.1), seed=2018)
+ val df = inputDF.persist(StorageLevel.MEMORY_AND_DISK)
+
+ val splits = df.randomSplit(Array(0.8, 0.1, 0.1), seed=2020)
+ val train_samples = splits(0).count()
+ println(s"train samples = $train_samples")
+ val valid_samples = splits(1).count()
+ println(s"valid samples = $valid_samples")
+ val test_samples = splits(2).count()
+ println(s"test samples = $test_samples")
splits(0).write.option("header", "true").csv(s"${dstDir}/train.csv")
splits(1).write.option("header", "true").csv(s"${dstDir}/valid.csv")
From 1620448e14eea878378719ebfd278afbbc567159 Mon Sep 17 00:00:00 2001
From: Liuxinman
Date: Fri, 17 Jun 2022 12:30:43 +0800
Subject: [PATCH 6/6] Fix wrong delimiter
---
RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala
index 775cff680..424ac79e6 100644
--- a/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala
+++ b/RecommenderSystems/deepfm/tools/split_criteo_kaggle.scala
@@ -7,11 +7,11 @@ def splitCriteoKaggle(srcDir: String, dstDir:String) = {
val integer_names = Seq("label") ++ dense_names
val col_names = integer_names ++ categorical_names
- val inputDF = spark.read.option("delimiter", ",").csv(s"${srcDir}/train.txt").toDF(col_names: _*)
+ val inputDF = spark.read.option("delimiter", "\t").csv(s"${srcDir}/train.txt").toDF(col_names: _*)
val df = inputDF.persist(StorageLevel.MEMORY_AND_DISK)
- val splits = df.randomSplit(Array(0.8, 0.1, 0.1), seed=2020)
+ val splits = df.randomSplit(Array(0.8, 0.1, 0.1), seed=2018)
val train_samples = splits(0).count()
println(s"train samples = $train_samples")
val valid_samples = splits(1).count()