From c2f87244a48127ce71a703e75ac4188af2a4c7a1 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Thu, 19 Dec 2024 19:23:05 +0100 Subject: [PATCH] Modify dataset env var API to upsert style --- CHANGELOG.md | 4 + resources/schema.gql | 50 +++----- .../src/mutations/dataset_env_vars_mut.rs | 118 ++++++++---------- .../tests/tests/test_gql_dataset_env_vars.rs | 69 +++------- .../src/repos/dataset_env_var_repository.rs | 47 ++----- .../src/services/dataset_env_var_service.rs | 18 ++- .../src/dataset_env_var_service_impl.rs | 51 +++----- .../src/dataset_env_var_service_null.rs | 15 +-- .../repos/inmem_dataset_env_var_repository.rs | 77 ++++++------ .../test_inmem_dataset_env_var_repository.rs | 2 +- ...2c6c174cc3cbd5eaf76a865e8baa92ca999fb.json | 39 ++++++ ...35ae09b6fb859545d821ea52f492f5f3168e2.json | 16 --- ...67cbeb2c89c9db598a30d703bf0a8fadc8d87.json | 19 --- .../postgres_dataset_env_var_repository.rs | 87 +++++-------- ...est_postgres_dataset_env_var_repository.rs | 2 +- .../dataset_env_var_repository_test_suite.rs | 44 ++++--- ...ec7ea26b2e7e8a920332e6676abff09192f87.json | 12 ++ ...35ae09b6fb859545d821ea52f492f5f3168e2.json | 12 -- ...67cbeb2c89c9db598a30d703bf0a8fadc8d87.json | 12 -- src/infra/datasets/sqlite/src/lib.rs | 2 + .../sqlite_dataset_env_var_repository.rs | 117 ++++++++--------- .../test_sqlite_dataset_env_var_repository.rs | 4 +- 22 files changed, 331 insertions(+), 486 deletions(-) create mode 100644 src/infra/datasets/postgres/.sqlx/query-1f81f06928fca92051bcaa7aaf42c6c174cc3cbd5eaf76a865e8baa92ca999fb.json delete mode 100644 src/infra/datasets/postgres/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json delete mode 100644 src/infra/datasets/postgres/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json create mode 100644 src/infra/datasets/sqlite/.sqlx/query-5e5b7fb1fefebae2c03722d043aec7ea26b2e7e8a920332e6676abff09192f87.json delete mode 100644 src/infra/datasets/sqlite/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json delete mode 100644 src/infra/datasets/sqlite/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 28ee74aa64..e4fa2df724 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ Recommendation: for ease of reading, use the following order: - Fixed --> +## [Unreleased] +### Changed +- Merged two methods(`saveEnvVariable` and `modifyEnvVariable`) from `DatasetEnvVarsMut` info one `upsertEnvVariable` + ## [0.213.1] - 2024-12-18 ### Fixed - Removed all occurrences of `DataWriterMetadataState` from telemetry spans (too much pressure) diff --git a/resources/schema.gql b/resources/schema.gql index 1dfc334042..647d87db40 100644 --- a/resources/schema.gql +++ b/resources/schema.gql @@ -555,9 +555,8 @@ type DatasetEnvVars { } type DatasetEnvVarsMut { - saveEnvVariable(key: String!, value: String!, isSecret: Boolean!): SaveDatasetEnvVarResult! + upsertEnvVariable(key: String!, value: String!, isSecret: Boolean!): UpsertDatasetEnvVarResult! deleteEnvVariable(id: DatasetEnvVarID!): DeleteDatasetEnvVarResult! - modifyEnvVariable(id: DatasetEnvVarID!, newValue: String!, isSecret: Boolean!): ModifyDatasetEnvVarResult! } type DatasetFlowConfigs { @@ -1418,20 +1417,6 @@ type MetadataManifestUnsupportedVersion implements CommitResult & CreateDatasetF message: String! } -interface ModifyDatasetEnvVarResult { - message: String! -} - -type ModifyDatasetEnvVarResultNotFound implements ModifyDatasetEnvVarResult { - envVarId: DatasetEnvVarID! - message: String! -} - -type ModifyDatasetEnvVarResultSuccess implements ModifyDatasetEnvVarResult { - envVarId: DatasetEnvVarID! - message: String! -} - enum MqttQos { AT_MOST_ONCE AT_LEAST_ONCE @@ -1662,21 +1647,6 @@ type RevokeResultSuccess implements RevokeResult { message: String! } -interface SaveDatasetEnvVarResult { - message: String! -} - -type SaveDatasetEnvVarResultDuplicate implements SaveDatasetEnvVarResult { - datasetEnvVarKey: String! - datasetName: DatasetName! - message: String! -} - -type SaveDatasetEnvVarResultSuccess implements SaveDatasetEnvVarResult { - envVar: ViewDatasetEnvVar! - message: String! -} - input ScheduleInput @oneOf { timeDelta: TimeDeltaInput """ @@ -1956,6 +1926,24 @@ interface UpdateReadmeResult { message: String! } +interface UpsertDatasetEnvVarResult { + message: String! +} + +type UpsertDatasetEnvVarResultCreated implements UpsertDatasetEnvVarResult { + envVar: ViewDatasetEnvVar! + message: String! +} + +type UpsertDatasetEnvVarResultUpdated implements UpsertDatasetEnvVarResult { + envVar: ViewDatasetEnvVar! + message: String! +} + +type UpsertDatasetEnvVarUpToDate implements UpsertDatasetEnvVarResult { + message: String! +} + type ViewAccessToken { """ Unique identifier of the access token diff --git a/src/adapter/graphql/src/mutations/dataset_env_vars_mut.rs b/src/adapter/graphql/src/mutations/dataset_env_vars_mut.rs index 52d2e3726c..664a217a56 100644 --- a/src/adapter/graphql/src/mutations/dataset_env_vars_mut.rs +++ b/src/adapter/graphql/src/mutations/dataset_env_vars_mut.rs @@ -11,8 +11,7 @@ use kamu_datasets::{ DatasetEnvVarService, DatasetEnvVarValue, DeleteDatasetEnvVarError, - ModifyDatasetEnvVarError, - SaveDatasetEnvVarError, + UpsertDatasetEnvVarStatus, }; use opendatafabric as odf; use secrecy::SecretString; @@ -34,13 +33,13 @@ impl DatasetEnvVarsMut { Self { dataset_handle } } - async fn save_env_variable( + async fn upsert_env_variable( &self, ctx: &Context<'_>, key: String, value: String, is_secret: bool, - ) -> Result { + ) -> Result { utils::check_dataset_write_access(ctx, &self.dataset_handle).await?; let dataset_env_var_service = from_catalog_n!(ctx, dyn DatasetEnvVarService); @@ -51,31 +50,30 @@ impl DatasetEnvVarsMut { DatasetEnvVarValue::Regular(value) }; - match dataset_env_var_service - .create_dataset_env_var( + let upsert_result = dataset_env_var_service + .upsert_dataset_env_var( key.as_str(), &dataset_env_var_value, &self.dataset_handle.id, ) .await - { - Ok(created_dataset_env_var) => Ok(SaveDatasetEnvVarResult::Success( - SaveDatasetEnvVarResultSuccess { - env_var: ViewDatasetEnvVar::new(created_dataset_env_var), - }, - )), - Err(err) => match err { - SaveDatasetEnvVarError::Duplicate(_) => Ok(SaveDatasetEnvVarResult::Duplicate( - SaveDatasetEnvVarResultDuplicate { - dataset_env_var_key: key, - dataset_name: self.dataset_handle.alias.dataset_name.clone().into(), - }, - )), - SaveDatasetEnvVarError::Internal(internal_err) => { - Err(GqlError::Internal(internal_err)) - } - }, - } + .map_err(GqlError::Internal)?; + + Ok(match upsert_result.status { + UpsertDatasetEnvVarStatus::Created => { + UpsertDatasetEnvVarResult::Created(UpsertDatasetEnvVarResultCreated { + env_var: ViewDatasetEnvVar::new(upsert_result.dataset_env_var), + }) + } + UpsertDatasetEnvVarStatus::Updated => { + UpsertDatasetEnvVarResult::Updated(UpsertDatasetEnvVarResultUpdated { + env_var: ViewDatasetEnvVar::new(upsert_result.dataset_env_var), + }) + } + UpsertDatasetEnvVarStatus::UpToDate => { + UpsertDatasetEnvVarResult::UpToDate(UpsertDatasetEnvVarUpToDate) + } + }) } async fn delete_env_variable( @@ -108,65 +106,51 @@ impl DatasetEnvVarsMut { }, } } +} - async fn modify_env_variable( - &self, - ctx: &Context<'_>, - id: DatasetEnvVarID, - new_value: String, - is_secret: bool, - ) -> Result { - utils::check_dataset_write_access(ctx, &self.dataset_handle).await?; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - let dataset_env_var_service = from_catalog_n!(ctx, dyn DatasetEnvVarService); - let dataset_env_var_value = if is_secret { - DatasetEnvVarValue::Secret(SecretString::from(new_value)) - } else { - DatasetEnvVarValue::Regular(new_value) - }; +#[derive(Interface, Debug, Clone)] +#[graphql(field(name = "message", ty = "String"))] +pub enum UpsertDatasetEnvVarResult { + Created(UpsertDatasetEnvVarResultCreated), + Updated(UpsertDatasetEnvVarResultUpdated), + UpToDate(UpsertDatasetEnvVarUpToDate), +} - match dataset_env_var_service - .modify_dataset_env_var(&id.clone().into(), &dataset_env_var_value) - .await - { - Ok(_) => Ok(ModifyDatasetEnvVarResult::Success( - ModifyDatasetEnvVarResultSuccess { - env_var_id: id.clone(), - }, - )), - Err(err) => match err { - ModifyDatasetEnvVarError::NotFound(_) => Ok(ModifyDatasetEnvVarResult::NotFound( - ModifyDatasetEnvVarResultNotFound { - env_var_id: id.clone(), - }, - )), - ModifyDatasetEnvVarError::Internal(internal_err) => { - Err(GqlError::Internal(internal_err)) - } - }, - } +#[derive(Debug, Clone)] +pub struct UpsertDatasetEnvVarUpToDate; + +#[Object] +impl UpsertDatasetEnvVarUpToDate { + pub async fn message(&self) -> String { + "Dataset env var is up to date".to_string() } } -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#[derive(SimpleObject, Debug, Clone)] +#[graphql(complex)] +pub struct UpsertDatasetEnvVarResultCreated { + pub env_var: ViewDatasetEnvVar, +} -#[derive(Interface, Debug, Clone)] -#[graphql(field(name = "message", ty = "String"))] -pub enum SaveDatasetEnvVarResult { - Success(SaveDatasetEnvVarResultSuccess), - Duplicate(SaveDatasetEnvVarResultDuplicate), +#[ComplexObject] +impl UpsertDatasetEnvVarResultCreated { + async fn message(&self) -> String { + "Created".to_string() + } } #[derive(SimpleObject, Debug, Clone)] #[graphql(complex)] -pub struct SaveDatasetEnvVarResultSuccess { +pub struct UpsertDatasetEnvVarResultUpdated { pub env_var: ViewDatasetEnvVar, } #[ComplexObject] -impl SaveDatasetEnvVarResultSuccess { +impl UpsertDatasetEnvVarResultUpdated { async fn message(&self) -> String { - "Success".to_string() + "Updated".to_string() } } diff --git a/src/adapter/graphql/tests/tests/test_gql_dataset_env_vars.rs b/src/adapter/graphql/tests/tests/test_gql_dataset_env_vars.rs index 69d74b3da7..2ee6de5ed7 100644 --- a/src/adapter/graphql/tests/tests/test_gql_dataset_env_vars.rs +++ b/src/adapter/graphql/tests/tests/test_gql_dataset_env_vars.rs @@ -41,7 +41,7 @@ async fn test_create_and_get_dataset_env_var() { let harness = DatasetEnvVarsHarness::new().await; let created_dataset = harness.create_dataset().await; - let mutation_code = DatasetEnvVarsHarness::create_dataset_env( + let mutation_code = DatasetEnvVarsHarness::upsert_dataset_env( created_dataset.dataset_handle.id.to_string().as_str(), "foo", "foo_value", @@ -63,8 +63,8 @@ async fn test_create_and_get_dataset_env_var() { "datasets": { "byId": { "envVars": { - "saveEnvVariable": { - "message": "Success" + "upsertEnvVariable": { + "message": "Created" } } } @@ -147,7 +147,7 @@ async fn test_delete_dataset_env_var() { let harness = DatasetEnvVarsHarness::new().await; let created_dataset = harness.create_dataset().await; - let mutation_code = DatasetEnvVarsHarness::create_dataset_env( + let mutation_code = DatasetEnvVarsHarness::upsert_dataset_env( created_dataset.dataset_handle.id.to_string().as_str(), "foo", "foo_value", @@ -170,7 +170,7 @@ async fn test_delete_dataset_env_var() { "byId": { "envVars": { "saveEnvVariable": { - "message": "Success" + "message": "Created" } } } @@ -227,7 +227,7 @@ async fn test_modify_dataset_env_var() { let harness = DatasetEnvVarsHarness::new().await; let created_dataset = harness.create_dataset().await; - let mutation_code = DatasetEnvVarsHarness::create_dataset_env( + let mutation_code = DatasetEnvVarsHarness::upsert_dataset_env( created_dataset.dataset_handle.id.to_string().as_str(), "foo", "foo_value", @@ -249,8 +249,8 @@ async fn test_modify_dataset_env_var() { "datasets": { "byId": { "envVars": { - "saveEnvVariable": { - "message": "Success" + "upsertEnvVariable": { + "message": "Created" } } } @@ -258,23 +258,9 @@ async fn test_modify_dataset_env_var() { }) ); - let query_code = DatasetEnvVarsHarness::get_dataset_env_vars_with_id( - created_dataset.dataset_handle.id.to_string().as_str(), - ); - let res = schema - .execute( - async_graphql::Request::new(query_code.clone()) - .data(harness.catalog_authorized.clone()), - ) - .await; - let json = serde_json::to_string(&res.data).unwrap(); - let json = serde_json::from_str::(&json).unwrap(); - let created_dataset_env_var_id = - json["datasets"]["byId"]["envVars"]["listEnvVariables"]["nodes"][0]["id"].clone(); - - let mutation_code = DatasetEnvVarsHarness::modify_dataset_env( + let mutation_code = DatasetEnvVarsHarness::upsert_dataset_env( created_dataset.dataset_handle.id.to_string().as_str(), - &created_dataset_env_var_id.to_string(), + "foo", "new_foo_value", false, ); @@ -292,8 +278,8 @@ async fn test_modify_dataset_env_var() { "datasets": { "byId": { "envVars": { - "modifyEnvVariable": { - "message": "Success" + "upsertEnvVariable": { + "message": "Updated" } } } @@ -465,7 +451,7 @@ impl DatasetEnvVarsHarness { .replace("", dataset_env_var_id) } - fn create_dataset_env( + fn upsert_dataset_env( dataset_id: &str, env_var_key: &str, env_var_value: &str, @@ -477,7 +463,7 @@ impl DatasetEnvVarsHarness { datasets { byId(datasetId: "") { envVars { - saveEnvVariable(key: "", value: "", isSecret: ) { + upsertEnvVariable(key: "", value: "", isSecret: ) { message } } @@ -511,31 +497,4 @@ impl DatasetEnvVarsHarness { .replace("", dataset_id) .replace("", env_var_id) } - - fn modify_dataset_env( - dataset_id: &str, - env_var_id: &str, - new_value: &str, - is_secret: bool, - ) -> String { - indoc!( - r#" - mutation { - datasets { - byId(datasetId: "") { - envVars { - modifyEnvVariable(id: , newValue: "", isSecret: ) { - message - } - } - } - } - } - "# - ) - .replace("", dataset_id) - .replace("", env_var_id) - .replace("", new_value) - .replace("", if is_secret { "true" } else { "false" }) - } } diff --git a/src/domain/datasets/domain/src/repos/dataset_env_var_repository.rs b/src/domain/datasets/domain/src/repos/dataset_env_var_repository.rs index 775e1bf1dc..17e1c10879 100644 --- a/src/domain/datasets/domain/src/repos/dataset_env_var_repository.rs +++ b/src/domain/datasets/domain/src/repos/dataset_env_var_repository.rs @@ -19,10 +19,10 @@ use crate::DatasetEnvVar; #[async_trait::async_trait] pub trait DatasetEnvVarRepository: Send + Sync { - async fn save_dataset_env_var( + async fn upsert_dataset_env_var( &self, dataset_env_var: &DatasetEnvVar, - ) -> Result<(), SaveDatasetEnvVarError>; + ) -> Result; async fn get_all_dataset_env_vars_count_by_dataset_id( &self, @@ -50,31 +50,6 @@ pub trait DatasetEnvVarRepository: Send + Sync { &self, dataset_env_var_id: &Uuid, ) -> Result<(), DeleteDatasetEnvVarError>; - - async fn modify_dataset_env_var( - &self, - dataset_env_var_id: &Uuid, - new_value: Vec, - secret_nonce: Option>, - ) -> Result<(), ModifyDatasetEnvVarError>; -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[derive(Error, Debug)] -pub enum SaveDatasetEnvVarError { - #[error(transparent)] - Duplicate(SaveDatasetEnvVarErrorDuplicate), - - #[error(transparent)] - Internal(#[from] InternalError), -} - -#[derive(Error, Debug)] -#[error("Dataset env var not saved, duplicate {dataset_env_var_key} for dataset {dataset_id}")] -pub struct SaveDatasetEnvVarErrorDuplicate { - pub dataset_env_var_key: String, - pub dataset_id: DatasetID, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -107,13 +82,15 @@ pub enum DeleteDatasetEnvVarError { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -#[derive(Error, Debug)] -pub enum ModifyDatasetEnvVarError { - #[error(transparent)] - NotFound(DatasetEnvVarNotFoundError), - - #[error(transparent)] - Internal(#[from] InternalError), +#[derive(Debug)] +pub struct UpsertDatasetEnvVarResult { + pub id: Uuid, + pub status: UpsertDatasetEnvVarStatus, } -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#[derive(Debug, PartialEq, Eq)] +pub enum UpsertDatasetEnvVarStatus { + Created, + Updated, + UpToDate, +} diff --git a/src/domain/datasets/domain/src/services/dataset_env_var_service.rs b/src/domain/datasets/domain/src/services/dataset_env_var_service.rs index ceae4a6ff4..6c6b209841 100644 --- a/src/domain/datasets/domain/src/services/dataset_env_var_service.rs +++ b/src/domain/datasets/domain/src/services/dataset_env_var_service.rs @@ -17,20 +17,19 @@ use crate::{ DatasetEnvVarValue, DeleteDatasetEnvVarError, GetDatasetEnvVarError, - ModifyDatasetEnvVarError, - SaveDatasetEnvVarError, + UpsertDatasetEnvVarStatus, }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[async_trait::async_trait] pub trait DatasetEnvVarService: Sync + Send { - async fn create_dataset_env_var( + async fn upsert_dataset_env_var( &self, dataset_env_var_key: &str, dataset_env_var_value: &DatasetEnvVarValue, dataset_id: &DatasetID, - ) -> Result; + ) -> Result; async fn get_dataset_env_var_by_id( &self, @@ -52,12 +51,6 @@ pub trait DatasetEnvVarService: Sync + Send { &self, dataset_env_var_id: &Uuid, ) -> Result<(), DeleteDatasetEnvVarError>; - - async fn modify_dataset_env_var( - &self, - dataset_env_var_id: &Uuid, - dataset_env_var_new_value: &DatasetEnvVarValue, - ) -> Result<(), ModifyDatasetEnvVarError>; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -67,4 +60,9 @@ pub struct DatasetEnvVarListing { pub total_count: usize, } +pub struct DatasetEnvVarUpsertResult { + pub dataset_env_var: DatasetEnvVar, + pub status: UpsertDatasetEnvVarStatus, +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/datasets/services/src/dataset_env_var_service_impl.rs b/src/domain/datasets/services/src/dataset_env_var_service_impl.rs index c78a25cb7d..14b277f829 100644 --- a/src/domain/datasets/services/src/dataset_env_var_service_impl.rs +++ b/src/domain/datasets/services/src/dataset_env_var_service_impl.rs @@ -11,18 +11,17 @@ use std::sync::Arc; use database_common::PaginationOpts; use dill::*; -use internal_error::{ErrorIntoInternal, InternalError, ResultIntoInternal}; +use internal_error::{InternalError, ResultIntoInternal}; use kamu_datasets::{ DatasetEnvVar, DatasetEnvVarListing, DatasetEnvVarRepository, DatasetEnvVarService, + DatasetEnvVarUpsertResult, DatasetEnvVarValue, DatasetEnvVarsConfig, DeleteDatasetEnvVarError, GetDatasetEnvVarError, - ModifyDatasetEnvVarError, - SaveDatasetEnvVarError, }; use opendatafabric::DatasetID; use secrecy::{ExposeSecret, SecretString}; @@ -66,24 +65,30 @@ impl DatasetEnvVarServiceImpl { #[async_trait::async_trait] impl DatasetEnvVarService for DatasetEnvVarServiceImpl { - async fn create_dataset_env_var( + async fn upsert_dataset_env_var( &self, dataset_env_var_key: &str, dataset_env_var_value: &DatasetEnvVarValue, dataset_id: &DatasetID, - ) -> Result { - let dataset_env_var = DatasetEnvVar::new( + ) -> Result { + let mut dataset_env_var = DatasetEnvVar::new( dataset_env_var_key, self.time_source.now(), dataset_env_var_value, dataset_id, self.dataset_env_var_encryption_key.expose_secret(), ) - .map_err(|err| SaveDatasetEnvVarError::Internal(err.int_err()))?; - self.dataset_env_var_repository - .save_dataset_env_var(&dataset_env_var) + .int_err()?; + let upsert_result = self + .dataset_env_var_repository + .upsert_dataset_env_var(&dataset_env_var) .await?; - Ok(dataset_env_var) + dataset_env_var.id = upsert_result.id; + + Ok(DatasetEnvVarUpsertResult { + dataset_env_var, + status: upsert_result.status, + }) } async fn get_exposed_value( @@ -142,30 +147,4 @@ impl DatasetEnvVarService for DatasetEnvVarServiceImpl { .delete_dataset_env_var(dataset_env_var_id) .await } - - async fn modify_dataset_env_var( - &self, - dataset_env_var_id: &Uuid, - dataset_env_var_new_value: &DatasetEnvVarValue, - ) -> Result<(), ModifyDatasetEnvVarError> { - let existing_dataset_env_var = self - .dataset_env_var_repository - .get_dataset_env_var_by_id(dataset_env_var_id) - .await - .map_err(|err| match err { - GetDatasetEnvVarError::NotFound(e) => ModifyDatasetEnvVarError::NotFound(e), - GetDatasetEnvVarError::Internal(e) => ModifyDatasetEnvVarError::Internal(e), - })?; - - let (new_value, nonce) = existing_dataset_env_var - .generate_new_value( - dataset_env_var_new_value, - self.dataset_env_var_encryption_key.expose_secret(), - ) - .int_err() - .map_err(ModifyDatasetEnvVarError::Internal)?; - self.dataset_env_var_repository - .modify_dataset_env_var(dataset_env_var_id, new_value, nonce) - .await - } } diff --git a/src/domain/datasets/services/src/dataset_env_var_service_null.rs b/src/domain/datasets/services/src/dataset_env_var_service_null.rs index c8e4c922b4..0fefd46d6d 100644 --- a/src/domain/datasets/services/src/dataset_env_var_service_null.rs +++ b/src/domain/datasets/services/src/dataset_env_var_service_null.rs @@ -14,11 +14,10 @@ use kamu_datasets::{ DatasetEnvVar, DatasetEnvVarListing, DatasetEnvVarService, + DatasetEnvVarUpsertResult, DatasetEnvVarValue, DeleteDatasetEnvVarError, GetDatasetEnvVarError, - ModifyDatasetEnvVarError, - SaveDatasetEnvVarError, }; use opendatafabric::DatasetID; use uuid::Uuid; @@ -41,12 +40,12 @@ impl DatasetEnvVarServiceNull { #[async_trait::async_trait] impl DatasetEnvVarService for DatasetEnvVarServiceNull { - async fn create_dataset_env_var( + async fn upsert_dataset_env_var( &self, _dataset_env_var_key: &str, _dataset_env_var_value: &DatasetEnvVarValue, _dataset_id: &DatasetID, - ) -> Result { + ) -> Result { unreachable!() } @@ -81,12 +80,4 @@ impl DatasetEnvVarService for DatasetEnvVarServiceNull { ) -> Result<(), DeleteDatasetEnvVarError> { unreachable!() } - - async fn modify_dataset_env_var( - &self, - _dataset_env_var_id: &Uuid, - _dataset_env_var_new_value: &DatasetEnvVarValue, - ) -> Result<(), ModifyDatasetEnvVarError> { - unreachable!() - } } diff --git a/src/infra/datasets/inmem/src/repos/inmem_dataset_env_var_repository.rs b/src/infra/datasets/inmem/src/repos/inmem_dataset_env_var_repository.rs index 65bd9719ab..8e7d1b4b00 100644 --- a/src/infra/datasets/inmem/src/repos/inmem_dataset_env_var_repository.rs +++ b/src/infra/datasets/inmem/src/repos/inmem_dataset_env_var_repository.rs @@ -62,28 +62,49 @@ impl InMemoryDatasetEnvVarRepository { #[async_trait::async_trait] impl DatasetEnvVarRepository for InMemoryDatasetEnvVarRepository { - async fn save_dataset_env_var( + async fn upsert_dataset_env_var( &self, dataset_env_var: &DatasetEnvVar, - ) -> Result<(), SaveDatasetEnvVarError> { + ) -> Result { let mut guard = self.state.lock().unwrap(); - if let Some(existing_dataset_env_var_key_id) = - guard.dataset_env_var_ids_by_keys.get(&dataset_env_var.key) - && guard + + // Modify env var if such exists + if let Some(existing_dataset_env_var_key_id) = guard + .dataset_env_var_ids_by_keys + .get(&dataset_env_var.key) + .copied() + && let Some(existing_dataset_env_var) = guard .dataset_env_vars_by_ids - .get(existing_dataset_env_var_key_id) - .unwrap() - .dataset_id - == dataset_env_var.dataset_id + .get_mut(&existing_dataset_env_var_key_id) + && existing_dataset_env_var.dataset_id == dataset_env_var.dataset_id { - return Err(SaveDatasetEnvVarError::Duplicate( - SaveDatasetEnvVarErrorDuplicate { - dataset_env_var_key: dataset_env_var.key.clone(), - dataset_id: dataset_env_var.dataset_id.clone(), - }, - )); + let mut upsert_status = UpsertDatasetEnvVarStatus::UpToDate; + if existing_dataset_env_var.value != dataset_env_var.value { + existing_dataset_env_var + .value + .clone_from(&dataset_env_var.value); + upsert_status = UpsertDatasetEnvVarStatus::Updated; + } + if (existing_dataset_env_var.secret_nonce.is_none() + && dataset_env_var.secret_nonce.is_some()) + || (existing_dataset_env_var.secret_nonce.is_some() + && dataset_env_var.secret_nonce.is_none()) + { + existing_dataset_env_var + .secret_nonce + .clone_from(&dataset_env_var.secret_nonce); + upsert_status = UpsertDatasetEnvVarStatus::Updated; + } + existing_dataset_env_var + .secret_nonce + .clone_from(&dataset_env_var.secret_nonce); + return Ok(UpsertDatasetEnvVarResult { + id: existing_dataset_env_var.id, + status: upsert_status, + }); } + // Create a new env var guard .dataset_env_vars_by_ids .insert(dataset_env_var.id, dataset_env_var.clone()); @@ -99,7 +120,10 @@ impl DatasetEnvVarRepository for InMemoryDatasetEnvVarRepository { }; dataset_env_vars_entries.push(dataset_env_var.id); - Ok(()) + return Ok(UpsertDatasetEnvVarResult { + id: dataset_env_var.id, + status: UpsertDatasetEnvVarStatus::Created, + }); } async fn get_all_dataset_env_vars_by_dataset_id( @@ -208,27 +232,6 @@ impl DatasetEnvVarRepository for InMemoryDatasetEnvVarRepository { }, )) } - - async fn modify_dataset_env_var( - &self, - dataset_env_var_id: &Uuid, - new_value: Vec, - secret_nonce: Option>, - ) -> Result<(), ModifyDatasetEnvVarError> { - let mut guard = self.state.lock().unwrap(); - if let Some(existing_dataset_env_var) = - guard.dataset_env_vars_by_ids.get_mut(dataset_env_var_id) - { - existing_dataset_env_var.value = new_value; - existing_dataset_env_var.secret_nonce = secret_nonce; - return Ok(()); - } - return Err(ModifyDatasetEnvVarError::NotFound( - DatasetEnvVarNotFoundError { - dataset_env_var_key: dataset_env_var_id.to_string(), - }, - )); - } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/datasets/inmem/tests/repos/test_inmem_dataset_env_var_repository.rs b/src/infra/datasets/inmem/tests/repos/test_inmem_dataset_env_var_repository.rs index 466385806f..55921dfc63 100644 --- a/src/infra/datasets/inmem/tests/repos/test_inmem_dataset_env_var_repository.rs +++ b/src/infra/datasets/inmem/tests/repos/test_inmem_dataset_env_var_repository.rs @@ -49,7 +49,7 @@ database_transactional_test!( database_transactional_test!( storage = inmem, - fixture = dataset_env_var_repo::test_modify_dataset_env_vars, + fixture = dataset_env_var_repo::test_upsert_dataset_env_vars, harness = InMemoryDatasetEnvVarRepositoryHarness ); diff --git a/src/infra/datasets/postgres/.sqlx/query-1f81f06928fca92051bcaa7aaf42c6c174cc3cbd5eaf76a865e8baa92ca999fb.json b/src/infra/datasets/postgres/.sqlx/query-1f81f06928fca92051bcaa7aaf42c6c174cc3cbd5eaf76a865e8baa92ca999fb.json new file mode 100644 index 0000000000..5ecc2a383b --- /dev/null +++ b/src/infra/datasets/postgres/.sqlx/query-1f81f06928fca92051bcaa7aaf42c6c174cc3cbd5eaf76a865e8baa92ca999fb.json @@ -0,0 +1,39 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO dataset_env_vars (id, key, value, secret_nonce, created_at, dataset_id)\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT (key, dataset_id)\n DO UPDATE SET\n value = EXCLUDED.value,\n secret_nonce = CASE\n WHEN dataset_env_vars.secret_nonce IS NULL AND EXCLUDED.secret_nonce IS NOT NULL THEN EXCLUDED.secret_nonce\n WHEN dataset_env_vars.secret_nonce IS NOT NULL AND EXCLUDED.secret_nonce IS NULL THEN NULL\n ELSE dataset_env_vars.secret_nonce\n END\n RETURNING xmax = 0 AS is_inserted,\n id,\n (\n SELECT value FROM dataset_env_vars WHERE key = $2 and dataset_id = $6\n ) as \"value: Vec\";\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "is_inserted", + "type_info": "Bool" + }, + { + "ordinal": 1, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 2, + "name": "value: Vec", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Varchar", + "Bytea", + "Bytea", + "Timestamptz", + "Varchar" + ] + }, + "nullable": [ + null, + false, + null + ] + }, + "hash": "1f81f06928fca92051bcaa7aaf42c6c174cc3cbd5eaf76a865e8baa92ca999fb" +} diff --git a/src/infra/datasets/postgres/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json b/src/infra/datasets/postgres/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json deleted file mode 100644 index 353fd3ee5a..0000000000 --- a/src/infra/datasets/postgres/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE dataset_env_vars SET value = $1, secret_nonce = $2 where id = $3\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bytea", - "Bytea", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2" -} diff --git a/src/infra/datasets/postgres/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json b/src/infra/datasets/postgres/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json deleted file mode 100644 index cafbdc785a..0000000000 --- a/src/infra/datasets/postgres/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO dataset_env_vars (id, key, value, secret_nonce, created_at, dataset_id)\n VALUES ($1, $2, $3, $4, $5, $6)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Varchar", - "Bytea", - "Bytea", - "Timestamptz", - "Varchar" - ] - }, - "nullable": [] - }, - "hash": "e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87" -} diff --git a/src/infra/datasets/postgres/src/repos/postgres_dataset_env_var_repository.rs b/src/infra/datasets/postgres/src/repos/postgres_dataset_env_var_repository.rs index 4650146ced..4b78bb36fd 100644 --- a/src/infra/datasets/postgres/src/repos/postgres_dataset_env_var_repository.rs +++ b/src/infra/datasets/postgres/src/repos/postgres_dataset_env_var_repository.rs @@ -9,7 +9,7 @@ use database_common::{PaginationOpts, TransactionRef, TransactionRefT}; use dill::{component, interface}; -use internal_error::{ErrorIntoInternal, ResultIntoInternal}; +use internal_error::{InternalError, ResultIntoInternal}; use opendatafabric::DatasetID; use uuid::Uuid; @@ -33,19 +33,31 @@ impl PostgresDatasetEnvVarRepository { #[async_trait::async_trait] impl DatasetEnvVarRepository for PostgresDatasetEnvVarRepository { - async fn save_dataset_env_var( + async fn upsert_dataset_env_var( &self, dataset_env_var: &DatasetEnvVar, - ) -> Result<(), SaveDatasetEnvVarError> { + ) -> Result { let mut tr = self.transaction.lock().await; - let connection_mut = tr.connection_mut().await?; - sqlx::query!( + let result = sqlx::query!( r#" INSERT INTO dataset_env_vars (id, key, value, secret_nonce, created_at, dataset_id) - VALUES ($1, $2, $3, $4, $5, $6) - "#, + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (key, dataset_id) + DO UPDATE SET + value = EXCLUDED.value, + secret_nonce = CASE + WHEN dataset_env_vars.secret_nonce IS NULL AND EXCLUDED.secret_nonce IS NOT NULL THEN EXCLUDED.secret_nonce + WHEN dataset_env_vars.secret_nonce IS NOT NULL AND EXCLUDED.secret_nonce IS NULL THEN NULL + ELSE dataset_env_vars.secret_nonce + END + RETURNING xmax = 0 AS is_inserted, + id, + ( + SELECT value FROM dataset_env_vars WHERE key = $2 and dataset_id = $6 + ) as "value: Vec"; + "#, dataset_env_var.id, dataset_env_var.key, dataset_env_var.value, @@ -53,23 +65,22 @@ impl DatasetEnvVarRepository for PostgresDatasetEnvVarRepository { dataset_env_var.created_at, dataset_env_var.dataset_id.to_string(), ) - .execute(connection_mut) + .fetch_one(connection_mut) .await - .map_err(|e: sqlx::Error| match e { - sqlx::Error::Database(e) => { - if e.is_unique_violation() { - SaveDatasetEnvVarError::Duplicate(SaveDatasetEnvVarErrorDuplicate { - dataset_env_var_key: dataset_env_var.key.clone(), - dataset_id: dataset_env_var.dataset_id.clone(), - }) - } else { - SaveDatasetEnvVarError::Internal(e.int_err()) - } - } - _ => SaveDatasetEnvVarError::Internal(e.int_err()), - })?; + .int_err()?; - Ok(()) + let status = if result.is_inserted.unwrap_or(false) { + UpsertDatasetEnvVarStatus::Created + } else if dataset_env_var.value == result.value.unwrap() { + UpsertDatasetEnvVarStatus::UpToDate + } else { + UpsertDatasetEnvVarStatus::Updated + }; + + Ok(UpsertDatasetEnvVarResult { + id: result.id, + status, + }) } async fn get_all_dataset_env_vars_by_dataset_id( @@ -234,38 +245,6 @@ impl DatasetEnvVarRepository for PostgresDatasetEnvVarRepository { } Ok(()) } - - async fn modify_dataset_env_var( - &self, - dataset_env_var_id: &Uuid, - new_value: Vec, - secret_nonce: Option>, - ) -> Result<(), ModifyDatasetEnvVarError> { - let mut tr = self.transaction.lock().await; - - let connection_mut = tr.connection_mut().await?; - - let update_result = sqlx::query!( - r#" - UPDATE dataset_env_vars SET value = $1, secret_nonce = $2 where id = $3 - "#, - new_value, - secret_nonce, - dataset_env_var_id, - ) - .execute(&mut *connection_mut) - .await - .int_err()?; - - if update_result.rows_affected() == 0 { - return Err(ModifyDatasetEnvVarError::NotFound( - DatasetEnvVarNotFoundError { - dataset_env_var_key: dataset_env_var_id.to_string(), - }, - )); - } - Ok(()) - } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/datasets/postgres/tests/repos/test_postgres_dataset_env_var_repository.rs b/src/infra/datasets/postgres/tests/repos/test_postgres_dataset_env_var_repository.rs index 548d6f432e..29d3c79eeb 100644 --- a/src/infra/datasets/postgres/tests/repos/test_postgres_dataset_env_var_repository.rs +++ b/src/infra/datasets/postgres/tests/repos/test_postgres_dataset_env_var_repository.rs @@ -51,7 +51,7 @@ database_transactional_test!( database_transactional_test!( storage = postgres, - fixture = dataset_env_var_repo::test_modify_dataset_env_vars, + fixture = dataset_env_var_repo::test_upsert_dataset_env_vars, harness = PostgresDatasetEnvVarRepositoryHarness ); diff --git a/src/infra/datasets/repo-tests/src/dataset_env_var_repository_test_suite.rs b/src/infra/datasets/repo-tests/src/dataset_env_var_repository_test_suite.rs index ba8510411c..b05fbf19ff 100644 --- a/src/infra/datasets/repo-tests/src/dataset_env_var_repository_test_suite.rs +++ b/src/infra/datasets/repo-tests/src/dataset_env_var_repository_test_suite.rs @@ -20,7 +20,7 @@ use kamu_datasets::{ DatasetEnvVarValue, DeleteDatasetEnvVarError, GetDatasetEnvVarError, - ModifyDatasetEnvVarError, + UpsertDatasetEnvVarStatus, SAMPLE_DATASET_ENV_VAR_ENCRYPTION_KEY, }; use opendatafabric::DatasetID; @@ -90,7 +90,7 @@ pub async fn test_insert_and_get_dataset_env_var(catalog: &Catalog) { ) .unwrap(); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_dataset_env_var) + .upsert_dataset_env_var(&new_dataset_env_var) .await; assert!(save_result.is_ok()); @@ -164,11 +164,11 @@ pub async fn test_insert_and_get_multiple_dataset_env_vars(catalog: &Catalog) { .unwrap(); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_dataset_env_var) + .upsert_dataset_env_var(&new_dataset_env_var) .await; assert!(save_result.is_ok()); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_secret_dataset_env_var) + .upsert_dataset_env_var(&new_secret_dataset_env_var) .await; assert!(save_result.is_ok()); @@ -229,11 +229,11 @@ pub async fn test_delete_dataset_env_vars(catalog: &Catalog) { ) .unwrap(); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_dataset_env_var) + .upsert_dataset_env_var(&new_dataset_env_var) .await; assert!(save_result.is_ok()); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_bar_dataset_env_var) + .upsert_dataset_env_var(&new_bar_dataset_env_var) .await; assert!(save_result.is_ok()); @@ -265,7 +265,7 @@ pub async fn test_delete_dataset_env_vars(catalog: &Catalog) { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -pub async fn test_modify_dataset_env_vars(catalog: &Catalog) { +pub async fn test_upsert_dataset_env_vars(catalog: &Catalog) { let dataset_env_var_repo = catalog.get_one::().unwrap(); let dataset_entry_repo = catalog.get_one::().unwrap(); let account_repo = catalog.get_one::().unwrap(); @@ -278,7 +278,7 @@ pub async fn test_modify_dataset_env_vars(catalog: &Catalog) { .await .unwrap(); - let new_dataset_env_var = DatasetEnvVar::new( + let mut new_dataset_env_var = DatasetEnvVar::new( "foo", Utc::now().round_subsecs(6), &DatasetEnvVarValue::Regular("foo".to_string()), @@ -286,32 +286,30 @@ pub async fn test_modify_dataset_env_vars(catalog: &Catalog) { SAMPLE_DATASET_ENV_VAR_ENCRYPTION_KEY, ) .unwrap(); - let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_dataset_env_var) + let upsert_result = dataset_env_var_repo + .upsert_dataset_env_var(&new_dataset_env_var) .await; - assert!(save_result.is_ok()); + assert_matches!(upsert_result, Ok(res) if res.status == UpsertDatasetEnvVarStatus::Created); - let modify_result = dataset_env_var_repo - .modify_dataset_env_var(&Uuid::new_v4(), vec![], None) + let upsert_result = dataset_env_var_repo + .upsert_dataset_env_var(&new_dataset_env_var) .await; + assert_matches!(upsert_result, Ok(res) if res.status == UpsertDatasetEnvVarStatus::UpToDate); - assert_matches!(modify_result, Err(ModifyDatasetEnvVarError::NotFound(_))); let (new_value, new_nonce) = new_dataset_env_var .generate_new_value( &DatasetEnvVarValue::Regular("new_foo".to_string()), SAMPLE_DATASET_ENV_VAR_ENCRYPTION_KEY, ) .unwrap(); + new_dataset_env_var.value.clone_from(&new_value); + new_dataset_env_var.secret_nonce.clone_from(&new_nonce); - let modify_result = dataset_env_var_repo - .modify_dataset_env_var( - &new_dataset_env_var.id, - new_value.clone(), - new_nonce.clone(), - ) + let upsert_result = dataset_env_var_repo + .upsert_dataset_env_var(&new_dataset_env_var) .await; - assert!(modify_result.is_ok()); + assert_matches!(upsert_result, Ok(res) if res.status == UpsertDatasetEnvVarStatus::Updated); let db_dataset_env_var = dataset_env_var_repo .get_dataset_env_var_by_id(&new_dataset_env_var.id) @@ -358,11 +356,11 @@ pub async fn test_delete_all_dataset_env_vars(catalog: &Catalog) { ) .unwrap(); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_dataset_env_var) + .upsert_dataset_env_var(&new_dataset_env_var) .await; assert!(save_result.is_ok()); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_bar_dataset_env_var) + .upsert_dataset_env_var(&new_bar_dataset_env_var) .await; assert!(save_result.is_ok()); diff --git a/src/infra/datasets/sqlite/.sqlx/query-5e5b7fb1fefebae2c03722d043aec7ea26b2e7e8a920332e6676abff09192f87.json b/src/infra/datasets/sqlite/.sqlx/query-5e5b7fb1fefebae2c03722d043aec7ea26b2e7e8a920332e6676abff09192f87.json new file mode 100644 index 0000000000..f2e2e1a93d --- /dev/null +++ b/src/infra/datasets/sqlite/.sqlx/query-5e5b7fb1fefebae2c03722d043aec7ea26b2e7e8a920332e6676abff09192f87.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO dataset_env_vars (id, key, value, secret_nonce, created_at, dataset_id)\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT (key, dataset_id)\n DO UPDATE SET\n value = EXCLUDED.value,\n secret_nonce = CASE\n WHEN dataset_env_vars.secret_nonce IS NULL AND excluded.secret_nonce IS NOT NULL THEN excluded.secret_nonce\n WHEN dataset_env_vars.secret_nonce IS NOT NULL AND excluded.secret_nonce IS NULL THEN NULL\n ELSE dataset_env_vars.secret_nonce\n END\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 6 + }, + "nullable": [] + }, + "hash": "5e5b7fb1fefebae2c03722d043aec7ea26b2e7e8a920332e6676abff09192f87" +} diff --git a/src/infra/datasets/sqlite/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json b/src/infra/datasets/sqlite/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json deleted file mode 100644 index 9760ae9d6c..0000000000 --- a/src/infra/datasets/sqlite/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n UPDATE dataset_env_vars SET value = $1, secret_nonce = $2 where id = $3\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - }, - "hash": "a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2" -} diff --git a/src/infra/datasets/sqlite/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json b/src/infra/datasets/sqlite/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json deleted file mode 100644 index aa56d7440f..0000000000 --- a/src/infra/datasets/sqlite/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n INSERT INTO dataset_env_vars (id, key, value, secret_nonce, created_at, dataset_id)\n VALUES ($1, $2, $3, $4, $5, $6)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 6 - }, - "nullable": [] - }, - "hash": "e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87" -} diff --git a/src/infra/datasets/sqlite/src/lib.rs b/src/infra/datasets/sqlite/src/lib.rs index 3a5b60937e..5fba9310d8 100644 --- a/src/infra/datasets/sqlite/src/lib.rs +++ b/src/infra/datasets/sqlite/src/lib.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +#![feature(let_chains)] + // Re-exports pub use kamu_datasets as domain; diff --git a/src/infra/datasets/sqlite/src/repos/sqlite_dataset_env_var_repository.rs b/src/infra/datasets/sqlite/src/repos/sqlite_dataset_env_var_repository.rs index 229d0b9387..7001b13018 100644 --- a/src/infra/datasets/sqlite/src/repos/sqlite_dataset_env_var_repository.rs +++ b/src/infra/datasets/sqlite/src/repos/sqlite_dataset_env_var_repository.rs @@ -9,7 +9,7 @@ use database_common::{PaginationOpts, TransactionRef, TransactionRefT}; use dill::{component, interface}; -use internal_error::{ErrorIntoInternal, ResultIntoInternal}; +use internal_error::{InternalError, ResultIntoInternal}; use opendatafabric::DatasetID; use uuid::Uuid; @@ -33,50 +33,73 @@ impl SqliteDatasetEnvVarRepository { #[async_trait::async_trait] impl DatasetEnvVarRepository for SqliteDatasetEnvVarRepository { - async fn save_dataset_env_var( + async fn upsert_dataset_env_var( &self, dataset_env_var: &DatasetEnvVar, - ) -> Result<(), SaveDatasetEnvVarError> { + ) -> Result { let mut tr = self.transaction.lock().await; - let connection_mut = tr.connection_mut().await?; - let dataset_env_var_id = dataset_env_var.id; - let dataset_env_var_key = &dataset_env_var.key; - let dataset_env_var_value = &dataset_env_var.value; - let dataset_env_var_secret_nonce = &dataset_env_var.secret_nonce; - let dataset_env_var_created_at = dataset_env_var.created_at; - let dataset_env_var_dataset_id = dataset_env_var.dataset_id.to_string(); + let old_record = sqlx::query_as!( + DatasetEnvVarRowModel, + r#" + SELECT + id as "id: Uuid", + key, + value as "value: _", + secret_nonce as "secret_nonce: _", + created_at as "created_at: _", + dataset_id as "dataset_id: _" + FROM dataset_env_vars + WHERE id = $1 + "#, + dataset_env_var.id, + ) + .fetch_optional(&mut *connection_mut) + .await + .int_err()?; + + if let Some(record) = &old_record + && dataset_env_var.value == record.value + { + return Ok(UpsertDatasetEnvVarResult { + id: record.id, + status: UpsertDatasetEnvVarStatus::UpToDate, + }); + } + let dataset_env_var_dataset_id = dataset_env_var.dataset_id.to_string(); sqlx::query!( r#" INSERT INTO dataset_env_vars (id, key, value, secret_nonce, created_at, dataset_id) - VALUES ($1, $2, $3, $4, $5, $6) - "#, - dataset_env_var_id, - dataset_env_var_key, - dataset_env_var_value, - dataset_env_var_secret_nonce, - dataset_env_var_created_at, + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (key, dataset_id) + DO UPDATE SET + value = EXCLUDED.value, + secret_nonce = CASE + WHEN dataset_env_vars.secret_nonce IS NULL AND excluded.secret_nonce IS NOT NULL THEN excluded.secret_nonce + WHEN dataset_env_vars.secret_nonce IS NOT NULL AND excluded.secret_nonce IS NULL THEN NULL + ELSE dataset_env_vars.secret_nonce + END + "#, + dataset_env_var.id, + dataset_env_var.key, + dataset_env_var.value, + dataset_env_var.secret_nonce, + dataset_env_var.created_at, dataset_env_var_dataset_id, ) .execute(connection_mut) .await - .map_err(|e: sqlx::Error| match e { - sqlx::Error::Database(e) => { - if e.is_unique_violation() { - SaveDatasetEnvVarError::Duplicate(SaveDatasetEnvVarErrorDuplicate { - dataset_env_var_key: dataset_env_var.key.clone(), - dataset_id: dataset_env_var.dataset_id.clone(), - }) - } else { - SaveDatasetEnvVarError::Internal(e.int_err()) - } - } - _ => SaveDatasetEnvVarError::Internal(e.int_err()), - })?; + .int_err()?; - Ok(()) + let (id, status) = if let Some(record) = old_record { + (record.id, UpsertDatasetEnvVarStatus::Updated) + } else { + (dataset_env_var.id, UpsertDatasetEnvVarStatus::Created) + }; + + Ok(UpsertDatasetEnvVarResult { id, status }) } async fn get_all_dataset_env_vars_by_dataset_id( @@ -251,38 +274,6 @@ impl DatasetEnvVarRepository for SqliteDatasetEnvVarRepository { } Ok(()) } - - async fn modify_dataset_env_var( - &self, - dataset_env_var_id: &Uuid, - new_value: Vec, - secret_nonce: Option>, - ) -> Result<(), ModifyDatasetEnvVarError> { - let mut tr = self.transaction.lock().await; - - let connection_mut = tr.connection_mut().await?; - - let update_result = sqlx::query!( - r#" - UPDATE dataset_env_vars SET value = $1, secret_nonce = $2 where id = $3 - "#, - new_value, - secret_nonce, - dataset_env_var_id, - ) - .execute(&mut *connection_mut) - .await - .int_err()?; - - if update_result.rows_affected() == 0 { - return Err(ModifyDatasetEnvVarError::NotFound( - DatasetEnvVarNotFoundError { - dataset_env_var_key: dataset_env_var_id.to_string(), - }, - )); - } - Ok(()) - } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/datasets/sqlite/tests/repos/test_sqlite_dataset_env_var_repository.rs b/src/infra/datasets/sqlite/tests/repos/test_sqlite_dataset_env_var_repository.rs index 2117dc000a..56722e0ef7 100644 --- a/src/infra/datasets/sqlite/tests/repos/test_sqlite_dataset_env_var_repository.rs +++ b/src/infra/datasets/sqlite/tests/repos/test_sqlite_dataset_env_var_repository.rs @@ -51,7 +51,7 @@ database_transactional_test!( database_transactional_test!( storage = sqlite, - fixture = dataset_env_var_repo::test_modify_dataset_env_vars, + fixture = dataset_env_var_repo::test_delete_all_dataset_env_vars, harness = SqliteDatasetEnvVarRepositoryHarness ); @@ -59,7 +59,7 @@ database_transactional_test!( database_transactional_test!( storage = sqlite, - fixture = dataset_env_var_repo::test_delete_all_dataset_env_vars, + fixture = dataset_env_var_repo::test_upsert_dataset_env_vars, harness = SqliteDatasetEnvVarRepositoryHarness );