Skip to content

Commit

Permalink
Fixed check out and -in
Browse files Browse the repository at this point in the history
  • Loading branch information
JanPeterDatakind committed Jun 1, 2024
1 parent 7fe660d commit 212ee0e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 59 deletions.
3 changes: 3 additions & 0 deletions recipes-management/create_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ def create_recipe_folder(recipe_name):
"python_packages": ["Please enter the python packages"],
"parameters": "Please enter the arguments the recipe expects",
"response_text": "Please enter your response text",
"data_attribution": "Please enter the ist of table names that the recipe code queries to determine the answer",
"mem_type": "recipe",
"locked_at": "",
"locked_by": "",
}

# Write metadata.json file
Expand Down
90 changes: 31 additions & 59 deletions recipes-management/recipe_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def get_memories(force_checkout=False):
with conn.connect() as connection:
if force_checkout is False:
query = text(
"SELECT custom_id, document, cmetadata FROM public.langchain_pg_embedding WHERE locked_by ='' AND locked_at =''"
"SELECT custom_id, document, cmetadata FROM public.langchain_pg_embedding WHERE cmetadata->>'locked_at' = ''"
)
else:
query = text(
Expand Down Expand Up @@ -124,10 +124,10 @@ def save_data(df):
functions_code = metadata["functions_code"]
# if import it not already in the functions_code, add it with a linebreak
if imports not in functions_code:
functions_code = imports + "\n\n\n" + functions_code
functions_code = imports + "\n\n" + functions_code
# Concatenate functions_code and calling_code into recipe code
recipe_code = (
f"\n\n{functions_code}\n\n" f"# Calling code:\n{calling_code}\n\n"
f"{functions_code}\n\n" f"# Calling code:\n{calling_code}\n\n"
)

# Save the recipe code
Expand Down Expand Up @@ -180,8 +180,8 @@ def format_code_with_black():

def lock_records(df, locker_name):
"""
Locks records in the database by setting the 'locked_by' field to the specified name
and the 'locked_at' field to the current timestamp for all 'custom_id' values in the DataFrame.
Locks records in the database by setting the 'locked_by' and 'locked_at' fields
within the 'cmetadata' JSON column for all 'custom_id' values in the DataFrame.
Args:
df (pandas.DataFrame): DataFrame containing the 'custom_id' values to lock.
Expand All @@ -194,33 +194,30 @@ def lock_records(df, locker_name):
conn = connect_to_db()

# Get the current timestamp
current_time = datetime.now()

# Get a list of custom_ids
custom_ids = df["custom_id"].tolist()
current_time = (
datetime.now().isoformat()
) # Use 'now' to get the current timestamp in SQL

# Prepare the SQL query
query = f"""
UPDATE public.langchain_pg_embedding
SET cmetadata = jsonb_set(
jsonb_set(
cmetadata::jsonb,
'{{locked_by}}',
'\"{locker_name}\"'
),
'{{locked_at}}',
'\"{current_time}\"'
);
"""

# Prepare the SQL query using the IN clause
query = text(
"""
UPDATE public.langchain_pg_embedding
SET locked_by = :locker_name, locked_at = :current_time
WHERE custom_id IN :custom_ids
"""
)
query = text(query)
# Execute the query within a transaction context
try:
with conn.connect() as connection:
with connection.begin():
connection.execute(
query,
{
"locker_name": locker_name,
"current_time": current_time,
"custom_ids": tuple(
custom_ids
), # Convert list to tuple for SQL IN clause
},
)
connection.execute(query)
except Exception as e:
print(f"Error occurred: {e}")

Expand Down Expand Up @@ -405,68 +402,44 @@ def update_database(df: pd.DataFrame, approver: str):
Returns:
None
"""
# Connect to the database
engine = connect_to_db()

# Get the current timestamp
current_time = datetime.now()

# Prepare additional columns
df["approval_status"] = "approved"
df["approver"] = approver
df["approval_latest_update"] = current_time
df["locked_by"] = ""
df["locked_at"] = ""

# Prepare the SQL query template
query_template = text(
"""
UPDATE langchain_pg_embedding
SET document = :document,
cmetadata = :metadata,
approval_status = :approval_status,
approver = :approver,
approval_latest_update = :approval_latest_update,
locked_by = :locked_by,
locked_at = :locked_at
cmetadata = :metadata
WHERE custom_id = :custom_id
"""
)

# Iterate over each row in the DataFrame
try:
with engine.connect() as conn:
trans = conn.begin()
for index, row in df.iterrows():
try:
# Convert the metadata dictionary to a JSON string if it's a dict
print(row)
metadata_json = (
json.dumps(row["metadata"])
if isinstance(row["metadata"], dict)
else row["metadata"]
)

# Prepare parameters for the query
params = {
"document": row["document"],
"metadata": metadata_json,
"approval_status": row["approval_status"],
"approver": row["approver"],
"approval_latest_update": row["approval_latest_update"],
"locked_by": row["locked_by"],
"locked_at": row["locked_at"],
"custom_id": row["custom_id"],
}

# Execute the query
conn.execute(query_template, params)
except KeyError as ke:
print(
logging.error(
f"Skipping record at index {index} due to missing field: {ke}"
)
except Exception as e:
print(f"Error updating record at index {index}: {e}")
logging.error(f"Error updating record at index {index}: {e}")
trans.commit()
except Exception as e:
print(f"Error updating records: {e}")
logging.error(f"Error updating records: {e}")


def check_out(recipe_checker="Mysterious Recipe Checker", force_checkout=False):
Expand Down Expand Up @@ -515,7 +488,6 @@ def check_in(recipe_checker="Mysterious Recipe Checker"):

# Create a DataFrame from the list of records
records_to_check_in = pd.DataFrame(records)

# Update database
update_database(df=records_to_check_in, approver=recipe_checker)

Expand Down

0 comments on commit 212ee0e

Please sign in to comment.