From 212ee0e1d448cc6afdbb9e802aea5d2b9b428c27 Mon Sep 17 00:00:00 2001 From: JanPeterDatakind Date: Sat, 1 Jun 2024 16:53:06 +0900 Subject: [PATCH] Fixed check out and -in --- recipes-management/create_recipe.py | 3 + recipes-management/recipe_sync.py | 90 ++++++++++------------------- 2 files changed, 34 insertions(+), 59 deletions(-) diff --git a/recipes-management/create_recipe.py b/recipes-management/create_recipe.py index e479cb68..e863e0eb 100644 --- a/recipes-management/create_recipe.py +++ b/recipes-management/create_recipe.py @@ -66,7 +66,10 @@ def create_recipe_folder(recipe_name): "python_packages": ["Please enter the python packages"], "parameters": "Please enter the arguments the recipe expects", "response_text": "Please enter your response text", + "data_attribution": "Please enter the ist of table names that the recipe code queries to determine the answer", "mem_type": "recipe", + "locked_at": "", + "locked_by": "", } # Write metadata.json file diff --git a/recipes-management/recipe_sync.py b/recipes-management/recipe_sync.py index 71103ad0..a87b7dc4 100644 --- a/recipes-management/recipe_sync.py +++ b/recipes-management/recipe_sync.py @@ -59,7 +59,7 @@ def get_memories(force_checkout=False): with conn.connect() as connection: if force_checkout is False: query = text( - "SELECT custom_id, document, cmetadata FROM public.langchain_pg_embedding WHERE locked_by ='' AND locked_at =''" + "SELECT custom_id, document, cmetadata FROM public.langchain_pg_embedding WHERE cmetadata->>'locked_at' = ''" ) else: query = text( @@ -124,10 +124,10 @@ def save_data(df): functions_code = metadata["functions_code"] # if import it not already in the functions_code, add it with a linebreak if imports not in functions_code: - functions_code = imports + "\n\n\n" + functions_code + functions_code = imports + "\n\n" + functions_code # Concatenate functions_code and calling_code into recipe code recipe_code = ( - f"\n\n{functions_code}\n\n" f"# Calling code:\n{calling_code}\n\n" + f"{functions_code}\n\n" f"# Calling code:\n{calling_code}\n\n" ) # Save the recipe code @@ -180,8 +180,8 @@ def format_code_with_black(): def lock_records(df, locker_name): """ - Locks records in the database by setting the 'locked_by' field to the specified name - and the 'locked_at' field to the current timestamp for all 'custom_id' values in the DataFrame. + Locks records in the database by setting the 'locked_by' and 'locked_at' fields + within the 'cmetadata' JSON column for all 'custom_id' values in the DataFrame. Args: df (pandas.DataFrame): DataFrame containing the 'custom_id' values to lock. @@ -194,33 +194,30 @@ def lock_records(df, locker_name): conn = connect_to_db() # Get the current timestamp - current_time = datetime.now() - - # Get a list of custom_ids - custom_ids = df["custom_id"].tolist() + current_time = ( + datetime.now().isoformat() + ) # Use 'now' to get the current timestamp in SQL + + # Prepare the SQL query + query = f""" + UPDATE public.langchain_pg_embedding + SET cmetadata = jsonb_set( + jsonb_set( + cmetadata::jsonb, + '{{locked_by}}', + '\"{locker_name}\"' + ), + '{{locked_at}}', + '\"{current_time}\"' + ); + """ - # Prepare the SQL query using the IN clause - query = text( - """ - UPDATE public.langchain_pg_embedding - SET locked_by = :locker_name, locked_at = :current_time - WHERE custom_id IN :custom_ids - """ - ) + query = text(query) # Execute the query within a transaction context try: with conn.connect() as connection: with connection.begin(): - connection.execute( - query, - { - "locker_name": locker_name, - "current_time": current_time, - "custom_ids": tuple( - custom_ids - ), # Convert list to tuple for SQL IN clause - }, - ) + connection.execute(query) except Exception as e: print(f"Error occurred: {e}") @@ -405,68 +402,44 @@ def update_database(df: pd.DataFrame, approver: str): Returns: None """ - # Connect to the database engine = connect_to_db() - # Get the current timestamp - current_time = datetime.now() - - # Prepare additional columns - df["approval_status"] = "approved" - df["approver"] = approver - df["approval_latest_update"] = current_time - df["locked_by"] = "" - df["locked_at"] = "" - - # Prepare the SQL query template query_template = text( """ UPDATE langchain_pg_embedding SET document = :document, - cmetadata = :metadata, - approval_status = :approval_status, - approver = :approver, - approval_latest_update = :approval_latest_update, - locked_by = :locked_by, - locked_at = :locked_at + cmetadata = :metadata WHERE custom_id = :custom_id """ ) - - # Iterate over each row in the DataFrame try: with engine.connect() as conn: + trans = conn.begin() for index, row in df.iterrows(): try: - # Convert the metadata dictionary to a JSON string if it's a dict + print(row) metadata_json = ( json.dumps(row["metadata"]) if isinstance(row["metadata"], dict) else row["metadata"] ) - # Prepare parameters for the query params = { "document": row["document"], "metadata": metadata_json, - "approval_status": row["approval_status"], - "approver": row["approver"], - "approval_latest_update": row["approval_latest_update"], - "locked_by": row["locked_by"], - "locked_at": row["locked_at"], "custom_id": row["custom_id"], } - # Execute the query conn.execute(query_template, params) except KeyError as ke: - print( + logging.error( f"Skipping record at index {index} due to missing field: {ke}" ) except Exception as e: - print(f"Error updating record at index {index}: {e}") + logging.error(f"Error updating record at index {index}: {e}") + trans.commit() except Exception as e: - print(f"Error updating records: {e}") + logging.error(f"Error updating records: {e}") def check_out(recipe_checker="Mysterious Recipe Checker", force_checkout=False): @@ -515,7 +488,6 @@ def check_in(recipe_checker="Mysterious Recipe Checker"): # Create a DataFrame from the list of records records_to_check_in = pd.DataFrame(records) - # Update database update_database(df=records_to_check_in, approver=recipe_checker)