diff --git a/.github/workflows/python-package-conda.yml b/.github/workflows/python-package-conda.yml index 2c17d76..8f38b1c 100644 --- a/.github/workflows/python-package-conda.yml +++ b/.github/workflows/python-package-conda.yml @@ -23,4 +23,4 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics --exclude working_dirs - name: Test cpu-only metrics run: | - python calculate_metrics.py --datasets test --schemes duped --models 70m + python calculate_metrics.py --datasets pile_test --schemes duped --models 70m diff --git a/calculate_metrics.py b/calculate_metrics.py index dc46315..1ab1ac6 100644 --- a/calculate_metrics.py +++ b/calculate_metrics.py @@ -2,15 +2,16 @@ import os from argparse import ArgumentParser from datetime import datetime -from typing import Dict, Optional +from typing import Dict, Optional, List from datasets import load_dataset as hf_load_dataset from pyspark.sql import SparkSession, DataFrame +from pyspark.sql import functions as F from utils import initialize_logger, initialize_formatter from filters import PIPELINE from filters.constants import PrecomputedFeatureName -from spark.constants import NUM_PARTITIONS, SPARK_CACHE_DIR +from spark.constants import NUM_SPARK_PARTITIONS, NUM_OUTPUT_PARTITIONS, SPARK_CACHE_DIR from spark.utils import initialize_spark LOGGER: logging.Logger = initialize_logger() @@ -54,7 +55,7 @@ def parse_cli_args(): ) dataset_args_help = "The dataset in which to get inference responses for. Valid options are: memories, pile." - datasets_args_default = ["pile", "memories", "test"] + datasets_args_default = ["pile", "memories", "pile_test"] parser.add_argument( "--datasets", type=str, @@ -82,9 +83,58 @@ def parse_cli_args(): return parser.parse_args() -def load_dataset(dataset_name: str, scheme: str, model_size: str) -> DataFrame: +def load_pile_dataset(scheme: str) -> DataFrame: """ - Load the dataset from HuggingFace datasets. If the dataset is not locally available, then + Load the Pile dataset from HuggingFace. If the dataset is not locally available, then + download it from HuggingFace datasets and cache it as a Spark DataFrame in Parquet format. + + Args: + scheme (str): Data scheme used for Pythia model training. + + Returns: + DataFrame: Spark DataFrame containing the dataset. + """ + hf_dataset_name = f"EleutherAI/pile-{scheme}-pythia-random-sampled" + cache_path = f"{SPARK_CACHE_DIR}/{hf_dataset_name}" + + if os.path.isdir(cache_path): + LOGGER.info(f"Dataset {hf_dataset_name} already exists, skipping the download.") + return SPARK.read.parquet(cache_path) + + LOGGER.info(f"Downloading dataset {hf_dataset_name}...") + + # The original dataset has a different capitalization for the column names, so we'll rename them along + # with other columns for clarity and consistency. + dataset = ( + hf_load_dataset(hf_dataset_name, split="train") + .to_pandas() + .rename( + columns={ + "Index": "sequence_id", + "Tokens": "tokens", + "70M": "70m", + "160M": "160m", + "410M": "410m", + "1B": "1b", + "1.4B": "1.4b", + "2.8B": "2.8b", + "6.9B": "6.9b", + "12B": "12b", + } + ) + ) + dataset.tokens = dataset.tokens.map(lambda x: x.tolist()) + + LOGGER.info(f"Converting and caching the dataset {hf_dataset_name} as Spark DataFrame in {cache_path}...") + # Convert the Pandas DataFrame dataset to Spark DataFrame in Parquet + SPARK.createDataFrame(dataset).repartition(NUM_SPARK_PARTITIONS).write.parquet(cache_path) + + return SPARK.read.parquet(cache_path) + + +def load_non_pile_dataset(dataset_name: str, scheme: str, model_size: str) -> DataFrame: + """ + Load the non-Pile dataset from HuggingFace. If the dataset is not locally available, then download it from HuggingFace datasets and cache it as a Spark DataFrame in Parquet format. Args: @@ -97,17 +147,15 @@ def load_dataset(dataset_name: str, scheme: str, model_size: str) -> DataFrame: """ split_name = f"{scheme}.{model_size}" required_columns = ["sequence_id", "tokens"] - is_pile = dataset_name.split("-")[0] == "pile" - is_test = dataset_name.split("-")[0] == "test" + is_test = dataset_name == "pile_test" + is_memorized = dataset_name == "memories" - if is_pile and not is_test: - hf_dataset_name = f"EleutherAI/pile-{scheme}-pythia-random-sampled" - elif is_test: - hf_dataset_name = f"usvsnsp/pile-test-sampled" - else: + if is_memorized: hf_dataset_name = f"EleutherAI/pythia-memorized-evals" + else: + hf_dataset_name = f"usvsnsp/pile-test-sampled" - cache_name = hf_dataset_name if is_pile or is_test else f"{hf_dataset_name}-{split_name}" + cache_name = hf_dataset_name if is_test else f"{hf_dataset_name}-{split_name}" cache_path = f"{SPARK_CACHE_DIR}/{cache_name}" if os.path.isdir(cache_path): @@ -115,104 +163,80 @@ def load_dataset(dataset_name: str, scheme: str, model_size: str) -> DataFrame: return SPARK.read.parquet(cache_path) LOGGER.info(f"Downloading dataset {hf_dataset_name}...") - if is_pile: - # The original dataset has a different capitalization for the column names, so we'll rename them along - # with other columns for clarity and consistency. - dataset = ( - hf_load_dataset(hf_dataset_name, split="train") - .to_pandas() - .rename( - columns={ - "Index": "sequence_id", - "Tokens": "tokens", - "70M": "70m", - "160M": "160m", - "410M": "410m", - "1B": "1b", - "1.4B": "1.4b", - "2.8B": "2.8b", - "6.9B": "6.9b", - "12B": "12b", - } - ) - ) - dataset.tokens = dataset.tokens.map(lambda x: x.tolist()) - # This dataset already contains the memorization score, we'll fetch it by the model parameter size. - required_columns.append(model_size) - # We'll also rename the memorization score column for consistency. - dataset = dataset[required_columns].rename(columns={model_size: "memorization_score"}) - elif is_test: - dataset = hf_load_dataset(hf_dataset_name, split="train").to_pandas() - dataset.tokens = dataset.tokens.map(lambda x: x.tolist()) - else: + if is_memorized: dataset = hf_load_dataset(hf_dataset_name, split=split_name).to_pandas().rename(columns={"index": "sequence_id"}) dataset.tokens = dataset.tokens.map(lambda x: x.tolist()) dataset = dataset[required_columns] - # This dataset already indicates all sequences are memorized. - dataset["memorization_score"] = 1.0 + else: + dataset = hf_load_dataset(hf_dataset_name, split="train").to_pandas() + dataset.tokens = dataset.tokens.map(lambda x: x.tolist()) LOGGER.info(f"Converting and caching the dataset {hf_dataset_name} as Spark DataFrame in {cache_path}...") # Convert the Pandas DataFrame dataset to Spark DataFrame in Parquet - SPARK.createDataFrame(dataset).repartition(NUM_PARTITIONS).write.parquet(cache_path) + SPARK.createDataFrame(dataset).repartition(NUM_SPARK_PARTITIONS).write.parquet(cache_path) return SPARK.read.parquet(cache_path) -def load_precomputed_features(schema: str, is_test=False) -> Dict[PrecomputedFeatureName, DataFrame]: +def load_precomputed_features(scheme: str, is_test=False) -> Dict[PrecomputedFeatureName, DataFrame]: """ Load the pre-computed features from HuggingFace datasets. If the features are not locally available, then download them from HuggingFace datasets and cache them as Spark DataFrames in Parquet format. Args: - schema (str): Data scheme used for Pythia model training. + scheme (str): Data scheme used for Pythia model training. is_test (bool): Load a sampled versions if required in case of testing Returns: Dict[PrecomputedFeatureName, DataFrame]: Dictionary of pre-computed features. """ + num_test_rows = 3000 features = {} hf_dataset_names = [ - (PrecomputedFeatureName.SEQUENCE_FREQUENCIES, f"usvsnsp/{schema}-num-duplicates", "train", {"Index": "sequence_id", "Counts": "frequency"}), + # (enum, hf_name, hf_split_name, column_mapping) + (PrecomputedFeatureName.SEQUENCE_FREQUENCIES, f"usvsnsp/{scheme}-num-duplicates", "train", {"Index": "sequence_id", "Counts": "frequency"}), ( PrecomputedFeatureName.MEMORIZED_TOKEN_FREQUENCIES, - f"usvsnsp/{schema}-num-frequencies", + f"usvsnsp/{scheme}-num-frequencies", "memorized", {"TokenID": "token_id", "Frequency": "frequency"}, ), ( PrecomputedFeatureName.NON_MEMORIZED_TOKEN_FREQUENCIES, - f"usvsnsp/{schema}-num-frequencies", + f"usvsnsp/{scheme}-num-frequencies", "non_memorized", {"TokenID": "token_id", "Frequency": "frequency"}, ), + (PrecomputedFeatureName.EMBEDDINGS, f"usvsnsp/{scheme}-embeddings", "train", {}), ] for enum, name, split, column_mapping in hf_dataset_names: cache_path = f"{SPARK_CACHE_DIR}/{name}-{split}" + adjusted_split = f"{split}-test" if is_test else split + adjusted_hf_split = f"{split}[:{num_test_rows}]" if is_test else split + adjusted_cache_path = f"{cache_path}-test" if is_test else cache_path - if is_test: - cache_path = cache_path + "-test" - - if os.path.isdir(cache_path): - LOGGER.info(f"Dataset {name}-{split} already exists, skipping the download.") - features[enum] = SPARK.read.parquet(cache_path) + if os.path.isdir(adjusted_cache_path): + LOGGER.info(f"Dataset {name}-{adjusted_split} already exists, skipping the download.") + features[enum] = SPARK.read.parquet(adjusted_cache_path) continue - LOGGER.info(f"Downloading dataset {name}-{split}...") - if is_test: - split = split + "[:3000]" - dataset = hf_load_dataset(name, split=split).to_pandas().rename(columns=column_mapping) + LOGGER.info(f"Downloading dataset {name}-{adjusted_split}...") + dataset = hf_load_dataset(name, split=adjusted_hf_split).to_pandas().rename(columns=column_mapping) + + if enum == PrecomputedFeatureName.EMBEDDINGS: + dataset.embeddings = dataset.embeddings.map(lambda x: x.tolist()) - LOGGER.info(f"Converting and caching the dataset {name}-{split} as Spark DataFrame {cache_path}...") + LOGGER.info(f"Converting and caching the dataset {name}-{adjusted_split} as Spark DataFrame {adjusted_cache_path}...") # Convert the Pandas DataFrame dataset to Spark DataFrame in Parquet - SPARK.createDataFrame(dataset).repartition(NUM_PARTITIONS).write.parquet(cache_path) + SPARK.createDataFrame(dataset).repartition(NUM_SPARK_PARTITIONS).write.parquet(adjusted_cache_path) - features[enum] = SPARK.read.parquet(cache_path).cache() + features[enum] = SPARK.read.parquet(adjusted_cache_path).cache() return features -def run_pipeline( +def run_non_pile_pipeline( dataset: DataFrame, dataset_name: str, split_name: str, @@ -220,14 +244,81 @@ def run_pipeline( sample_size: Optional[int] = None, sample_seed: Optional[int] = None, ) -> None: + """ + Run the pipeline for non-Pile datasets. + + Args: + dataset (DataFrame): Spark DataFrame containing the dataset. + dataset_name (str): Name of the dataset. + split_name (str): Name of the split. + run_id (str): ID of the run. + sample_size (Optional[int]): Number of samples to take from the dataset. + sample_seed (Optional[int]): Seed to use for sampling the dataset. + + Returns: + None + """ if sample_size is not None: dataset = dataset.sample(1.0, seed=sample_seed).limit(sample_size) transformed_dataset = PIPELINE.transform(dataset) + # Non-pile datasets already indicate that all sequences are memorized. + transformed_dataset = transformed_dataset.withColumn("memorization_score", F.lit(1.0)) LOGGER.info(f"Transformed Dataset {dataset_name}-{split_name} Schema:") transformed_dataset.printSchema() + LOGGER.info(f"{transformed_dataset.schema.simpleString()}") file_name = split_name.replace(".", "_", 1) - transformed_dataset.coalesce(1).write.parquet(f"datasets/{run_id}/{dataset_name}_{file_name}") + transformed_dataset.coalesce(NUM_OUTPUT_PARTITIONS).write.parquet(f"datasets/{run_id}/{dataset_name}_{file_name}") + + +def run_pile_pipeline( + dataset: DataFrame, + dataset_name: str, + data_scheme: str, + model_sizes: List[str], + run_id: str, + sample_size: Optional[int] = None, + sample_seed: Optional[int] = None, +) -> None: + """ + Run the pipeline for Pile datasets. + + Args: + dataset (DataFrame): Spark DataFrame containing the dataset. + dataset_name (str): Name of the dataset. + data_scheme (str): Data scheme used for Pythia model training. + model_sizes (List[str]): List of Pythia model sizes. + run_id (str): ID of the run. + sample_size (Optional[int]): Number of samples to take from the dataset. + sample_seed (Optional[int]): Seed to use for sampling the dataset. + + Returns: + None + """ + if sample_size is not None: + dataset = dataset.sample(1.0, seed=sample_seed).limit(sample_size) + + main = dataset.alias("main") + no_scores = main.select("sequence_id", "tokens") + transformed_dataset = PIPELINE.transform(no_scores).alias("transformed") + + # Memorization score already exists per model size, we'll perform the join to export + # each dataset by model size separately. + for model_size in model_sizes: + memorization_scores = main.select( + "main.sequence_id", + F.col(f"main.`{model_size}`").alias("memorization_score"), + ).alias("score") + joined_dataset = transformed_dataset.join(memorization_scores, on="sequence_id", how="left").select( + "transformed.*", + "score.memorization_score", + ) + split_name = f"{data_scheme}.{model_size}" + LOGGER.info(f"Transformed Dataset {dataset_name}-{split_name} Schema:") + joined_dataset.printSchema() + LOGGER.info(f"{joined_dataset.schema.simpleString()}") + file_name = split_name.replace(".", "_", 1) + joined_dataset.coalesce(NUM_OUTPUT_PARTITIONS).write.parquet(f"datasets/{run_id}/{dataset_name}_{file_name}") def main(): @@ -254,22 +345,37 @@ def main(): LOGGER.info(f"Sample seed: {args.sample_seed}") LOGGER.info("---------------------------------------------------------------------------") - for model_size in args.models if isinstance(args.models, list) else args.models.split(","): - for dataset_name in args.datasets if isinstance(args.datasets, list) else args.datasets.split(","): - is_test = dataset_name == "test" - for data_scheme in args.schemes if isinstance(args.schemes, list) else args.schemes.split(","): - LOGGER.info("Loading pre-computed features...") - precomputed_features = load_precomputed_features(data_scheme, is_test=is_test) - PIPELINE.register_features(precomputed_features) - - split_name = f"{data_scheme}.{model_size}" - LOGGER.info(f"Loading dataset {dataset_name} and split {split_name}...") - dataset = load_dataset(dataset_name, data_scheme, model_size) - LOGGER.info(f"Calculating metrics for {split_name} on dataset {dataset_name}...") - run_pipeline(dataset, dataset_name, split_name, args.run_id, args.sample_size, args.sample_seed) - - # Reset before caching the next set of pre-computed features - SPARK.catalog.clearCache() + model_sizes = args.models if isinstance(args.models, list) else args.models.split(",") + dataset_names = args.datasets if isinstance(args.datasets, list) else args.datasets.split(",") + data_schemes = args.schemes if isinstance(args.schemes, list) else args.schemes.split(",") + + for dataset_name in dataset_names: + is_test = dataset_name == "pile_test" + is_memorized = dataset_name == "memories" + is_pile = dataset_name == "pile" + + for data_scheme in data_schemes: + LOGGER.info("Loading pre-computed features...") + precomputed_features = load_precomputed_features(data_scheme, is_test=is_test) + PIPELINE.register_features(precomputed_features) + + if is_memorized or is_test: + # The memorized dataset has multiple splits by the model size + for model_size in model_sizes: + split_name = f"{data_scheme}.{model_size}" + LOGGER.info(f"Loading dataset {dataset_name} and split {split_name}...") + dataset = load_non_pile_dataset(dataset_name, data_scheme, model_size) + LOGGER.info(f"Calculating metrics for {split_name} on dataset {dataset_name}...") + run_non_pile_pipeline(dataset, dataset_name, split_name, args.run_id, args.sample_size, args.sample_seed) + elif is_pile: + LOGGER.info(f"Loading dataset {dataset_name}...") + # The pile dataset contains all model sizes in a single split + dataset = load_pile_dataset(data_scheme) + LOGGER.info(f"Calculating metrics for {data_scheme} on dataset {dataset_name}...") + run_pile_pipeline(dataset, dataset_name, data_scheme, model_sizes, args.run_id, args.sample_size, args.sample_seed) + + # Clear the cache because pre-computed features are differentiated based on the data scheme + SPARK.catalog.clearCache() if __name__ == "__main__": diff --git a/filters/__init__.py b/filters/__init__.py index 6ef5a75..4433824 100644 --- a/filters/__init__.py +++ b/filters/__init__.py @@ -1,7 +1,13 @@ from .base import PIPELINE_SINGLETON as PIPELINE -# The import here determines the order of the pipeline -from .detokenize import detokenize -from .highly_duplicated_filter import sequence_duplicates_filter -from .token_frequency_statistics_filter import token_frequency_statistics_filter -from .pattern_incrementing import incrementing_sequences_filter -from .highly_repetitive import highly_repetitive_filter + +_has_registered_all_filters = False + +if not _has_registered_all_filters: + # The import here determines the order of the pipeline + from .detokenize import detokenize + from .highly_duplicated_filter import sequence_duplicates_filter + from .token_frequency_statistics_filter import token_frequency_statistics_filter + from .pattern_incrementing import incrementing_sequences_filter + from .highly_repetitive import highly_repetitive_filter + + _has_registered_all_filters = True diff --git a/filters/base.py b/filters/base.py index 3b75f8c..66d0f07 100644 --- a/filters/base.py +++ b/filters/base.py @@ -5,7 +5,7 @@ from filters.constants import PrecomputedFeatureName from utils import initialize_logger -from spark.constants import NUM_PARTITIONS, SPARK_CACHE_DIR +from spark.constants import NUM_OUTPUT_PARTITIONS, SPARK_CACHE_DIR FilterFunc: TypeAlias = Callable[..., Any] PrecomputedFeatures: TypeAlias = Dict[PrecomputedFeatureName, DataFrame] @@ -15,11 +15,21 @@ class MetricFilterPipeline: def __init__(self): + """ + Pipeline for applying filters to a dataset. + """ self.filters: List[FilterFunc] = [] self.features: PrecomputedFeatures = {} self.spark: SparkSession def register_filter(self) -> FilterFunc: + """ + Decorator for registering a filter function to the pipeline. + + Returns: + FilterFunc: Decorated filter function + """ + def decorator(filter_func: FilterFunc) -> FilterFunc: def wrapper(*args, **kwargs) -> Any: return filter_func(*args, **kwargs) @@ -32,17 +42,45 @@ def wrapper(*args, **kwargs) -> Any: return decorator def register_features(self, features: PrecomputedFeatures) -> None: + """ + Register precomputed features to the pipeline. + + Args: + features (PrecomputedFeatures): Precomputed features + + Returns: + None + """ LOGGER.info(f"Registering features {features.keys()}...") self.features.update(features) def register_spark_session(self, spark: SparkSession) -> None: + """ + Register Spark session to the pipeline. + + Args: + spark (SparkSession): Spark session + + Returns: + None + """ self.spark = spark def transform(self, original: DataFrame) -> DataFrame: + """ + Apply all filters to the dataset. + + Args: + original (DataFrame): Original dataset + + Returns: + DataFrame: Filtered dataset + """ current_dataset = original for filter_func in self.filters: # Checkpointing each filter to side-step potential OOM issues + LOGGER.info(f"Running filter {filter_func.__name__}...") current_dataset: DataFrame = filter_func(current_dataset, self.features).checkpoint() return current_dataset diff --git a/filters/constants.py b/filters/constants.py index 2748225..51683ad 100644 --- a/filters/constants.py +++ b/filters/constants.py @@ -5,3 +5,4 @@ class PrecomputedFeatureName(Enum): SEQUENCE_FREQUENCIES = "sequence_frequencies" MEMORIZED_TOKEN_FREQUENCIES = "memorized_token_frequencies" NON_MEMORIZED_TOKEN_FREQUENCIES = "non_memorized_token_frequencies" + EMBEDDINGS = "embeddings" diff --git a/filters/detokenize.py b/filters/detokenize.py index 5dbe968..faf20a8 100644 --- a/filters/detokenize.py +++ b/filters/detokenize.py @@ -9,7 +9,8 @@ @PIPELINE_SINGLETON.register_filter() def detokenize(dataset: DataFrame, _) -> DataFrame: - """Detokenizes tokens into text as a preprocessing step. + """ + Detokenizes tokens into text as a preprocessing step. Args: dataset (DataFrame): Dataset containing sequences of tokens diff --git a/filters/highly_duplicated_filter.py b/filters/highly_duplicated_filter.py index d2bc672..d4c2a93 100644 --- a/filters/highly_duplicated_filter.py +++ b/filters/highly_duplicated_filter.py @@ -7,11 +7,12 @@ @PIPELINE_SINGLETON.register_filter() def sequence_duplicates_filter(dataset: DataFrame, features: PrecomputedFeatures) -> DataFrame: - """Compute the number of duplicates (frequency) of a sequence. + """ + Compute the number of duplicates (frequency) of a sequence. Args: dataset (DataFrame): Dataset containing sequences of tokens - features (PrecomputedFeatures): + features (PrecomputedFeatures): Precomputed features Returns: DataFrame: Dataframe with additional columns of `sequence_duplicates`, number of times that @@ -21,7 +22,7 @@ def sequence_duplicates_filter(dataset: DataFrame, features: PrecomputedFeatures sequence_frequencies = features[PrecomputedFeatureName.SEQUENCE_FREQUENCIES].alias("sequence_frequencies") # Join on `sequence_id` to extract the sequence frequency - final = main.join(sequence_frequencies, on="sequence_id", how="inner").select( + final = main.join(sequence_frequencies, on="sequence_id", how="left").select( "main.*", F.col("sequence_frequencies.frequency").alias("sequence_duplicates"), ) diff --git a/filters/highly_repetitive.py b/filters/highly_repetitive.py index d579bb3..2cd35ff 100644 --- a/filters/highly_repetitive.py +++ b/filters/highly_repetitive.py @@ -1,3 +1,5 @@ +from typing import List, Tuple, Union + from pyspark.sql import DataFrame from pyspark.sql import functions as F from pyspark.sql import types as T @@ -5,15 +7,16 @@ from .base import PIPELINE_SINGLETON -def break_and_compare(ls: list, k: int) -> list: +def break_and_compare(ls: List, k: int) -> List: """ This function takes a list ls and an integer k as input and returns a list which is the first chunk of ls that is repeated k times. If no such chunk exists, it returns an empty list. - Parameters: - - ls (list): The input list. + Args: + ls (List): The input list. k (int): The integer value used for splitting and comparing the list. + Returns: + List: The first chunk of ls that is repeated k times. If no such chunk exists, it returns an empty list. """ n = len(ls) while n % k != 0: @@ -41,17 +44,19 @@ def break_and_compare(ls: list, k: int) -> list: return [] -def break_and_compare_wrapper(ls: list, start_k: int, end_k: int) -> list: +def break_and_compare_wrapper(ls: List, start_k: int, end_k: int) -> Union[Tuple[List, int, int], Tuple[List, int]]: """ This function serves as a wrapper for the `break_and_compare` function. It takes an additional two integer parameters `start_k` and `end_k` to define a range of values for `k`. It iterates over this range and calls `break_and_compare` for each value of `k` within the range. - Parameters: - - `ls` (list): The input list. - - `start_k` (int): The starting value of `k` for the range (inclusive). - - `end_k` (int): The ending value of `k` for the range (inclusive). + Args: + ls (List): The input list. + start_k (int): The starting value of `k` for the range (inclusive). + end_k (int): The ending value of `k` for the range (inclusive). + Returns: + Union[Tuple[List, int, int], Tuple[List, int]]: A tuple containing the result of `break_and_compare` and the values of `i` and `k` for which the result was obtained. """ # end_k is inclusive ls = list(ls) @@ -74,14 +79,23 @@ def break_and_compare_wrapper(ls: list, start_k: int, end_k: int) -> list: return result, i, k result = break_and_compare(ls[i:], k) if result: - return result, k + return result, i, k result = break_and_compare(ls, k) if result: return result, 0, k return [], 0, -1 -def find_smallest_repeating_unit(lst): +def find_smallest_repeating_unit(lst) -> List: + """ + This function takes a list as input and returns the smallest repeating unit of the list. If no such unit exists, it returns the list itself. + + Args: + lst (List): The input list. + + Returns: + List: The smallest repeating unit of the list. If no such unit exists, it returns the list itself. + """ if lst is None: return [] n = len(lst) @@ -94,15 +108,16 @@ def find_smallest_repeating_unit(lst): # Check if the entire list can be formed by repeating the unit if all(lst[i : i + unit_length] == unit for i in range(0, n, unit_length)): - return unit + return unit, n // unit_length # If no repeating unit is found, the list itself is the smallest repeating unit - return lst + return lst, 1 @PIPELINE_SINGLETON.register_filter() def highly_repetitive_filter(dataset: DataFrame, _) -> DataFrame: - """Returns the repeating chunk and the number of times a sequence is repeating + """ + Returns the repeating chunk and the number of times a sequence is repeating. Args: dataset (DataFrame): Dataset containing sequences of tokens @@ -111,32 +126,47 @@ def highly_repetitive_filter(dataset: DataFrame, _) -> DataFrame: Outputs Include: - `num_repeating`: Number of times a sequence is repeating - `smallest_repeating_chunk`: Smallest repeating token sequence + Returns: - DataFrame: with additional column of `is_incrementing` + DataFrame: with additional columns + `repeating_chunk`: Repeating Chunk + `num_repeating`: Number of times the chunk is repeating + `repeating_offset`: Offset of repeating sequence """ main = dataset.alias("main") repetitive_schema = T.StructType( [ - T.StructField("num_repeating", T.IntegerType()), - T.StructField("repeating_offset", T.IntegerType()), T.StructField("repeating_chunk", T.ArrayType(T.LongType())), + T.StructField("repeating_offset", T.IntegerType()), + T.StructField("num_repeating", T.IntegerType()) + ] + ) + + start_k = 2 + end_k = 5 + repetitiveUDF = F.udf(lambda seq: break_and_compare_wrapper(seq, start_k, end_k), repetitive_schema) + + smallest_repeating_chunk_schema = T.StructType( + [ + T.StructField("smallest_repeating_chunk", T.ArrayType(T.LongType())), + T.StructField("num_times", T.IntegerType()) ] ) - repetitiveUDF = F.udf(lambda seq: break_and_compare_wrapper(seq, 2, 5), repetitive_schema) - smallest_repeating_chunkUDF = F.udf(lambda seq: find_smallest_repeating_unit(seq), T.ArrayType(T.LongType())) + smallest_repeating_chunkUDF = F.udf(lambda seq: find_smallest_repeating_unit(seq), smallest_repeating_chunk_schema) - repetitive_counts = main.select("sequence_id", "text").withColumn("repetitive", repetitiveUDF("text")) - repetitive_counts = repetitive_counts.withColumn("smallest_repeating_chunk", smallest_repeating_chunkUDF("repetitive.repeating_chunk")) + repetitive_counts = main.select("sequence_id", "tokens").withColumn("repetitive", repetitiveUDF("tokens")) + repetitive_counts = repetitive_counts.withColumn("smallest_repeating", smallest_repeating_chunkUDF("repetitive.repeating_chunk")) final = ( - repetitive_counts.join(main, on="sequence_id", how="left") + main.join(repetitive_counts, on="sequence_id", how="left") .drop(repetitive_counts.sequence_id) - .drop(repetitive_counts.text) + .drop(repetitive_counts.tokens) .drop(repetitive_counts.repetitive.repeating_chunk) .select( "main.*", - "repetitive.*", - "smallest_repeating_chunk", + F.col("repetitive.repeating_offset").alias("repeating_offset"), + (F.col("repetitive.num_repeating")*F.col("smallest_repeating.num_times")).alias("num_repeating"), + F.col("smallest_repeating.smallest_repeating_chunk").alias("smallest_repeating_chunk") ) ) diff --git a/filters/pattern_incrementing.py b/filters/pattern_incrementing.py index 1a36513..787129c 100644 --- a/filters/pattern_incrementing.py +++ b/filters/pattern_incrementing.py @@ -1,13 +1,23 @@ +import re +import unicodedata + from pyspark.sql import DataFrame from pyspark.sql import functions as F from pyspark.sql import types as T from .base import PIPELINE_SINGLETON -import unicodedata -import re def replace_non_numeric_with_whitespace(text: str) -> str: + """ + Replaces non-numeric characters with whitespace. + + Args: + text (str): Sequence of tokens + + Returns: + str: Sequence of tokens with non-numeric characters replaced with whitespace + """ # Replace non-numeric characters with whitespace # cleaned_text = re.sub(r'[^0-9]', ' ', text) new_text = "" @@ -48,6 +58,15 @@ def replace_non_numeric_with_whitespace(text: str) -> str: def incrementing_sequences_filter_wrapper(text: str) -> bool: + """ + Returns if a sequence is incrementing. + + Args: + text (str): Sequence of tokens + + Returns: + bool: True if the sequence is incrementing, False otherwise + """ # count number of numeric and non-numeric characters num_numeric = 0 num_non_numeric = 0 @@ -317,7 +336,8 @@ def incrementing_sequences_filter_wrapper(text: str) -> bool: @PIPELINE_SINGLETON.register_filter() def incrementing_sequences_filter(dataset: DataFrame, _) -> DataFrame: - """Returns if a sequence is incrementing + """ + Returns if a sequence is incrementing. Args: dataset (DataFrame): Dataset containing sequences of tokens @@ -338,4 +358,4 @@ def incrementing_sequences_filter(dataset: DataFrame, _) -> DataFrame: samp = r""" "A.1 , A.2 , A.3 , A.4, B.1 , B.2, B.3, C.1" """ - print(incrementing_sequences_filter(samp)) + print(incrementing_sequences_filter_wrapper(samp)) diff --git a/filters/token_frequency_statistics_filter.py b/filters/token_frequency_statistics_filter.py index a4a9c03..1476e19 100644 --- a/filters/token_frequency_statistics_filter.py +++ b/filters/token_frequency_statistics_filter.py @@ -7,7 +7,8 @@ @PIPELINE_SINGLETON.register_filter() def token_frequency_statistics_filter(dataset: DataFrame, features: PrecomputedFeatures) -> DataFrame: - """Compute token frequency statistics of a list of token frequencies ordered by the token index (not ID) in the sequence. + """ + Compute token frequency statistics of a list of token frequencies ordered by the token index (not ID) in the sequence. Statistics include: - `max_frequency`: maximum frequency of token frequencies in the sequence @@ -28,8 +29,7 @@ def token_frequency_statistics_filter(dataset: DataFrame, features: PrecomputedF memorized_frequencies = features[PrecomputedFeatureName.MEMORIZED_TOKEN_FREQUENCIES].alias("memorized") non_memorized_frequencies = features[PrecomputedFeatureName.NON_MEMORIZED_TOKEN_FREQUENCIES].alias("non_memorized") - # First, we expand the token indices, then join to extract the frequencies - # Note that we dropped the memorization score, we'll re-join it later. + # First, we expand the token indices, then join to extract the frequencies. flattened_main = main.select("sequence_id", F.posexplode("tokens").alias("token_index", "token_id")) token_frequencies = ( flattened_main.join(memorized_frequencies, on="token_id", how="left") @@ -69,6 +69,6 @@ def token_frequency_statistics_filter(dataset: DataFrame, features: PrecomputedF F.transform(F.col("frequencies"), lambda x: x.frequency).alias("frequencies"), ).alias("filtered") - # Finally, re-attach the memorization score from the original dataset - final = filtered_frequencies.join(main, on="sequence_id", how="left").drop(filtered_frequencies.sequence_id).select("main.*", "filtered.*") + final = main.join(filtered_frequencies, on="sequence_id", how="left").drop(filtered_frequencies.sequence_id).select("main.*", "filtered.*") + return final diff --git a/spark/constants.py b/spark/constants.py index 5c69f5c..6b86764 100644 --- a/spark/constants.py +++ b/spark/constants.py @@ -1,2 +1,4 @@ SPARK_CACHE_DIR = "spark_cache" -NUM_PARTITIONS = 1 +NUM_CPU_COUNT = 64 +NUM_OUTPUT_PARTITIONS = 1 +NUM_SPARK_PARTITIONS = 4096 diff --git a/spark/utils.py b/spark/utils.py index d5b3507..069882a 100644 --- a/spark/utils.py +++ b/spark/utils.py @@ -2,7 +2,7 @@ from pyspark.conf import SparkConf from pyspark.sql import SparkSession -from spark.constants import NUM_PARTITIONS, SPARK_CACHE_DIR +from spark.constants import NUM_CPU_COUNT, NUM_SPARK_PARTITIONS, SPARK_CACHE_DIR def initialize_spark() -> SparkSession: @@ -18,13 +18,13 @@ def initialize_spark() -> SparkSession: SparkConf() .setMaster("local[*]") .setAppName("semantic-memorization") - .set("spark.driver.cores", "128") + .set("spark.driver.cores", f"{NUM_CPU_COUNT}") .set("spark.driver.memory", "80g") .set("spark.driver.memoryOverhead", "16g") - .set("spark.sql.shuffle.partitions", f"{NUM_PARTITIONS}") + .set("spark.sql.shuffle.partitions", f"{NUM_SPARK_PARTITIONS}") .set("spark.sql.execution.arrow.pyspark.enabled", "true") .set("spark.sql.execution.arrow.pyspark.fallback.enabled", "true") - .set("spark.driver.extraJavaOptions", "-XX:+UseG1GC") + .set("spark.sql.execution.arrow.maxRecordsPerBatch", "10000") .set("spark.memory.offHeap.enabled", "true") .set("spark.memory.offHeap.size", "16g") .set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")