From c3e2b4eb48e796c9f7cc81732e2d2d970e3a953b Mon Sep 17 00:00:00 2001 From: Dmitriy Borzenko Date: Mon, 16 Dec 2024 23:19:36 +0200 Subject: [PATCH] integrated scheduling tab --- resources/schema.graphql | 6 + src/app/api/dataset-flow.api.ts | 44 ++++ .../fragment-flow-history-data.graphql | 7 + .../dataset-flow-batching.graphql | 49 ---- .../dataset-flow-schedule.graphql | 48 ---- .../dataset-flow-set-triggers.graphql | 39 +++ .../dataset-flow-triggers.graphql | 29 +++ src/app/api/kamu.graphql.interface.ts | 189 ++++++++++++++ .../flow-details-history-tab.helpers.ts | 19 ++ .../services/dataset-scheduling.service.ts | 35 +++ ...set-settings-scheduling-tab.component.html | 134 +++++----- ...taset-settings-scheduling-tab.component.ts | 235 ++++++++++-------- ...settings-scheduling-tab.component.types.ts | 1 + 13 files changed, 568 insertions(+), 267 deletions(-) delete mode 100644 src/app/api/gql/scheduling-dataset/dataset-flow-batching.graphql delete mode 100644 src/app/api/gql/scheduling-dataset/dataset-flow-schedule.graphql create mode 100644 src/app/api/gql/scheduling-dataset/dataset-flow-set-triggers.graphql create mode 100644 src/app/api/gql/scheduling-dataset/dataset-flow-triggers.graphql diff --git a/resources/schema.graphql b/resources/schema.graphql index c8d2c19f..b725fa1a 100644 --- a/resources/schema.graphql +++ b/resources/schema.graphql @@ -1012,6 +1012,12 @@ type FlowAbortedResult { message: String! } +type FlowConfigSnapshotModified implements FlowEvent { + eventId: EventID! + eventTime: DateTime! + configSnapshot: FlowConfigurationSnapshot! +} + type FlowConfiguration { ingest: FlowConfigurationIngest compaction: FlowConfigurationCompaction diff --git a/src/app/api/dataset-flow.api.ts b/src/app/api/dataset-flow.api.ts index eebf6d4b..683ea9e8 100644 --- a/src/app/api/dataset-flow.api.ts +++ b/src/app/api/dataset-flow.api.ts @@ -17,14 +17,19 @@ import { DatasetTriggerFlowMutation, FlowConfigurationInput, FlowRunConfiguration, + FlowTriggerInput, GetDatasetFlowConfigsGQL, GetDatasetFlowConfigsQuery, + GetDatasetFlowTriggersGQL, + GetDatasetFlowTriggersQuery, GetDatasetListFlowsGQL, GetDatasetListFlowsQuery, GetFlowByIdGQL, GetFlowByIdQuery, SetDatasetFlowConfigGQL, SetDatasetFlowConfigMutation, + SetDatasetFlowTriggersGQL, + SetDatasetFlowTriggersMutation, } from "./kamu.graphql.interface"; import { Observable, first, map } from "rxjs"; import { ApolloQueryResult } from "@apollo/client"; @@ -46,6 +51,8 @@ export class DatasetFlowApi { // private datasetFlowCompactionGQL = inject(DatasetFlowCompactionGQL); private datasetFlowsInitiatorsGQL = inject(DatasetFlowsInitiatorsGQL); private setDatasetFlowConfigGQL = inject(SetDatasetFlowConfigGQL); + private setDatasetFlowTriggersGQL = inject(SetDatasetFlowTriggersGQL); + private getDatasetFlowTriggersGQL = inject(GetDatasetFlowTriggersGQL); public datasetTriggerFlow(params: { accountId: string; @@ -103,6 +110,43 @@ export class DatasetFlowApi { ); } + public setDatasetFlowTriggers(params: { + datasetId: string; + datasetFlowType: DatasetFlowType; + paused: boolean; + triggerInput: FlowTriggerInput; + }): Observable { + return this.setDatasetFlowTriggersGQL.mutate(params).pipe( + first(), + map((result: MutationResult) => { + /* istanbul ignore else */ + if (result.data) { + return result.data; + } else { + throw new DatasetOperationError(result.errors ?? []); + } + }), + ); + } + + public getDatasetFlowTriggers(params: { + datasetId: string; + datasetFlowType: DatasetFlowType; + }): Observable { + return this.getDatasetFlowTriggersGQL + .watch(params, { + ...noCacheFetchPolicy, + context: { + skipLoading: true, + }, + }) + .valueChanges.pipe( + map((result: ApolloQueryResult) => { + return result.data; + }), + ); + } + // public setDatasetFlowSchedule(params: { // accountId: string; // datasetId: string; diff --git a/src/app/api/gql/flows-dataset/fragments/fragment-flow-history-data.graphql b/src/app/api/gql/flows-dataset/fragments/fragment-flow-history-data.graphql index a343f6d7..aef0b53c 100644 --- a/src/app/api/gql/flows-dataset/fragments/fragment-flow-history-data.graphql +++ b/src/app/api/gql/flows-dataset/fragments/fragment-flow-history-data.graphql @@ -5,6 +5,7 @@ fragment FlowHistoryData on FlowEvent { ... on FlowEventAborted { __typename } + ... on FlowEventInitiated { trigger { __typename @@ -90,4 +91,10 @@ fragment FlowHistoryData on FlowEvent { } } } + + ... on FlowConfigSnapshotModified { + configSnapshot { + __typename + } + } } diff --git a/src/app/api/gql/scheduling-dataset/dataset-flow-batching.graphql b/src/app/api/gql/scheduling-dataset/dataset-flow-batching.graphql deleted file mode 100644 index 2b8dc50f..00000000 --- a/src/app/api/gql/scheduling-dataset/dataset-flow-batching.graphql +++ /dev/null @@ -1,49 +0,0 @@ -# mutation datasetFlowBatching( -# $datasetId: DatasetID! -# $datasetFlowType: DatasetFlowType! -# $paused: Boolean! -# $transform: TransformConditionInput! -# ) { -# datasets { -# byId(datasetId: $datasetId) { -# flows { -# configs { -# setConfigTransform(datasetFlowType: $datasetFlowType, paused: $paused, transform: $transform) { -# __typename -# ... on SetFlowConfigSuccess { -# message -# config { -# transform { -# maxBatchingInterval { -# ...TimeDeltaData -# } -# minRecordsToAwait -# } -# } -# } - -# ... on FlowIncompatibleDatasetKind { -# message -# expectedDatasetKind -# actualDatasetKind -# } - -# ... on FlowInvalidTransformConfig { -# message -# reason -# } - -# ... on FlowPreconditionsNotMet { -# message -# preconditions -# } - -# ... on FlowTypeIsNotSupported { -# message -# } -# } -# } -# } -# } -# } -# } diff --git a/src/app/api/gql/scheduling-dataset/dataset-flow-schedule.graphql b/src/app/api/gql/scheduling-dataset/dataset-flow-schedule.graphql deleted file mode 100644 index 00020c04..00000000 --- a/src/app/api/gql/scheduling-dataset/dataset-flow-schedule.graphql +++ /dev/null @@ -1,48 +0,0 @@ -# mutation DatasetFlowSchedule( -# $datasetId: DatasetID! -# $datasetFlowType: DatasetFlowType! -# $paused: Boolean! -# $ingest: IngestConditionInput! -# ) { -# datasets { -# byId(datasetId: $datasetId) { -# flows { -# configs { -# setConfigIngest(datasetFlowType: $datasetFlowType, paused: $paused, ingest: $ingest) { -# __typename -# ... on SetFlowConfigSuccess { -# message -# config { -# ingest { -# schedule { -# ... on TimeDelta { -# ...TimeDeltaData -# } -# ... on Cron5ComponentExpression { -# cron5ComponentExpression -# } -# } -# fetchUncacheable -# } -# } -# } - -# ... on FlowIncompatibleDatasetKind { -# message -# expectedDatasetKind -# actualDatasetKind -# } - -# ... on FlowPreconditionsNotMet { -# message -# } - -# ... on FlowTypeIsNotSupported { -# message -# } -# } -# } -# } -# } -# } -# } diff --git a/src/app/api/gql/scheduling-dataset/dataset-flow-set-triggers.graphql b/src/app/api/gql/scheduling-dataset/dataset-flow-set-triggers.graphql new file mode 100644 index 00000000..c7de8785 --- /dev/null +++ b/src/app/api/gql/scheduling-dataset/dataset-flow-set-triggers.graphql @@ -0,0 +1,39 @@ +mutation setDatasetFlowTriggers( + $datasetId: DatasetID! + $datasetFlowType: DatasetFlowType! + $paused: Boolean! + $triggerInput: FlowTriggerInput! +) { + datasets { + byId(datasetId: $datasetId) { + flows { + triggers { + setTrigger(datasetFlowType: $datasetFlowType, paused: $paused, triggerInput: $triggerInput) { + ... on SetFlowTriggerSuccess { + message + } + + ... on FlowIncompatibleDatasetKind { + message + expectedDatasetKind + actualDatasetKind + } + + ... on FlowPreconditionsNotMet { + message + } + + ... on FlowTypeIsNotSupported { + message + } + + ... on FlowInvalidTriggerInputError { + message + reason + } + } + } + } + } + } +} diff --git a/src/app/api/gql/scheduling-dataset/dataset-flow-triggers.graphql b/src/app/api/gql/scheduling-dataset/dataset-flow-triggers.graphql new file mode 100644 index 00000000..106afc88 --- /dev/null +++ b/src/app/api/gql/scheduling-dataset/dataset-flow-triggers.graphql @@ -0,0 +1,29 @@ +query getDatasetFlowTriggers($datasetId: DatasetID!, $datasetFlowType: DatasetFlowType!) { + datasets { + byId(datasetId: $datasetId) { + flows { + triggers { + byType(datasetFlowType: $datasetFlowType) { + paused + schedule { + ... on TimeDelta { + ...TimeDeltaData + } + + ... on Cron5ComponentExpression { + cron5ComponentExpression + } + } + + batching { + maxBatchingInterval { + ...TimeDeltaData + } + minRecordsToAwait + } + } + } + } + } + } +} diff --git a/src/app/api/kamu.graphql.interface.ts b/src/app/api/kamu.graphql.interface.ts index e5f19d55..700ba1f6 100644 --- a/src/app/api/kamu.graphql.interface.ts +++ b/src/app/api/kamu.graphql.interface.ts @@ -1111,6 +1111,13 @@ export type FlowAbortedResult = { message: Scalars["String"]; }; +export type FlowConfigSnapshotModified = FlowEvent & { + __typename?: "FlowConfigSnapshotModified"; + configSnapshot: FlowConfigurationSnapshot; + eventId: Scalars["EventID"]; + eventTime: Scalars["DateTime"]; +}; + export type FlowConfiguration = { __typename?: "FlowConfiguration"; compaction?: Maybe; @@ -3033,6 +3040,9 @@ export type GetFlowByIdQuery = { flow: { __typename?: "Flow"; history: Array< + | ({ + __typename?: "FlowConfigSnapshotModified"; + } & FlowHistoryData_FlowConfigSnapshotModified_Fragment) | ({ __typename?: "FlowEventAborted"; } & FlowHistoryData_FlowEventAborted_Fragment) @@ -3319,6 +3329,16 @@ export type FlowConnectionDataFragment = { edges: Array<{ __typename?: "FlowEdge"; node: { __typename?: "Flow" } & FlowSummaryDataFragment }>; }; +type FlowHistoryData_FlowConfigSnapshotModified_Fragment = { + __typename: "FlowConfigSnapshotModified"; + eventId: string; + eventTime: string; + configSnapshot: + | { __typename: "FlowConfigurationCompactionRule" } + | { __typename: "FlowConfigurationIngest" } + | { __typename: "FlowConfigurationReset" }; +}; + type FlowHistoryData_FlowEventAborted_Fragment = { __typename: "FlowEventAborted"; eventId: string; eventTime: string }; type FlowHistoryData_FlowEventInitiated_Fragment = { @@ -3390,6 +3410,7 @@ type FlowHistoryData_FlowEventTriggerAdded_Fragment = { }; export type FlowHistoryDataFragment = + | FlowHistoryData_FlowConfigSnapshotModified_Fragment | FlowHistoryData_FlowEventAborted_Fragment | FlowHistoryData_FlowEventInitiated_Fragment | FlowHistoryData_FlowEventScheduledForActivation_Fragment @@ -4127,6 +4148,74 @@ export type SetDatasetFlowConfigMutation = { }; }; +export type SetDatasetFlowTriggersMutationVariables = Exact<{ + datasetId: Scalars["DatasetID"]; + datasetFlowType: DatasetFlowType; + paused: Scalars["Boolean"]; + triggerInput: FlowTriggerInput; +}>; + +export type SetDatasetFlowTriggersMutation = { + __typename?: "Mutation"; + datasets: { + __typename?: "DatasetsMut"; + byId?: { + __typename?: "DatasetMut"; + flows: { + __typename?: "DatasetFlowsMut"; + triggers: { + __typename?: "DatasetFlowTriggersMut"; + setTrigger: + | { + __typename?: "FlowIncompatibleDatasetKind"; + message: string; + expectedDatasetKind: DatasetKind; + actualDatasetKind: DatasetKind; + } + | { __typename?: "FlowInvalidTriggerInputError"; message: string; reason: string } + | { __typename?: "FlowPreconditionsNotMet"; message: string } + | { __typename?: "FlowTypeIsNotSupported"; message: string } + | { __typename?: "SetFlowTriggerSuccess"; message: string }; + }; + }; + } | null; + }; +}; + +export type GetDatasetFlowTriggersQueryVariables = Exact<{ + datasetId: Scalars["DatasetID"]; + datasetFlowType: DatasetFlowType; +}>; + +export type GetDatasetFlowTriggersQuery = { + __typename?: "Query"; + datasets: { + __typename?: "Datasets"; + byId?: { + __typename?: "Dataset"; + flows: { + __typename?: "DatasetFlows"; + triggers: { + __typename?: "DatasetFlowTriggers"; + byType?: { + __typename?: "FlowTrigger"; + paused: boolean; + schedule?: + | { __typename?: "Cron5ComponentExpression"; cron5ComponentExpression: string } + | ({ __typename?: "TimeDelta" } & TimeDeltaDataFragment) + | null; + batching?: { + __typename?: "FlowTriggerBatchingRule"; + minRecordsToAwait: number; + maxBatchingInterval: { __typename?: "TimeDelta" } & TimeDeltaDataFragment; + } | null; + } | null; + }; + }; + } | null; + }; +}; + export type TimeDeltaDataFragment = { __typename?: "TimeDelta"; every: number; unit: TimeUnit }; export type SearchDatasetsAutocompleteQueryVariables = Exact<{ @@ -4582,6 +4671,11 @@ export const FlowHistoryDataFragmentDoc = gql` } } } + ... on FlowConfigSnapshotModified { + configSnapshot { + __typename + } + } } ${AccountFragmentDoc} ${DatasetBasicsFragmentDoc} @@ -6911,6 +7005,101 @@ export class SetDatasetFlowConfigGQL extends Apollo.Mutation< super(apollo); } } +export const SetDatasetFlowTriggersDocument = gql` + mutation setDatasetFlowTriggers( + $datasetId: DatasetID! + $datasetFlowType: DatasetFlowType! + $paused: Boolean! + $triggerInput: FlowTriggerInput! + ) { + datasets { + byId(datasetId: $datasetId) { + flows { + triggers { + setTrigger(datasetFlowType: $datasetFlowType, paused: $paused, triggerInput: $triggerInput) { + ... on SetFlowTriggerSuccess { + message + } + ... on FlowIncompatibleDatasetKind { + message + expectedDatasetKind + actualDatasetKind + } + ... on FlowPreconditionsNotMet { + message + } + ... on FlowTypeIsNotSupported { + message + } + ... on FlowInvalidTriggerInputError { + message + reason + } + } + } + } + } + } + } +`; + +@Injectable({ + providedIn: "root", +}) +export class SetDatasetFlowTriggersGQL extends Apollo.Mutation< + SetDatasetFlowTriggersMutation, + SetDatasetFlowTriggersMutationVariables +> { + document = SetDatasetFlowTriggersDocument; + + constructor(apollo: Apollo.Apollo) { + super(apollo); + } +} +export const GetDatasetFlowTriggersDocument = gql` + query getDatasetFlowTriggers($datasetId: DatasetID!, $datasetFlowType: DatasetFlowType!) { + datasets { + byId(datasetId: $datasetId) { + flows { + triggers { + byType(datasetFlowType: $datasetFlowType) { + paused + schedule { + ... on TimeDelta { + ...TimeDeltaData + } + ... on Cron5ComponentExpression { + cron5ComponentExpression + } + } + batching { + maxBatchingInterval { + ...TimeDeltaData + } + minRecordsToAwait + } + } + } + } + } + } + } + ${TimeDeltaDataFragmentDoc} +`; + +@Injectable({ + providedIn: "root", +}) +export class GetDatasetFlowTriggersGQL extends Apollo.Query< + GetDatasetFlowTriggersQuery, + GetDatasetFlowTriggersQueryVariables +> { + document = GetDatasetFlowTriggersDocument; + + constructor(apollo: Apollo.Apollo) { + super(apollo); + } +} export const SearchDatasetsAutocompleteDocument = gql` query searchDatasetsAutocomplete($query: String!, $perPage: Int, $page: Int) { search { diff --git a/src/app/dataset-flow/dataset-flow-details/tabs/flow-details-history-tab/flow-details-history-tab.helpers.ts b/src/app/dataset-flow/dataset-flow-details/tabs/flow-details-history-tab/flow-details-history-tab.helpers.ts index 989b355e..e78d0e34 100644 --- a/src/app/dataset-flow/dataset-flow-details/tabs/flow-details-history-tab/flow-details-history-tab.helpers.ts +++ b/src/app/dataset-flow/dataset-flow-details/tabs/flow-details-history-tab/flow-details-history-tab.helpers.ts @@ -1,5 +1,6 @@ import moment from "moment"; import { + FlowConfigSnapshotModified, FlowEventInitiated, FlowEventScheduledForActivation, FlowEventStartConditionUpdated, @@ -42,6 +43,8 @@ export class DatasetFlowDetailsHelpers { } case "FlowEventScheduledForActivation": return "Flow scheduled for activation"; + case "FlowConfigSnapshotModified": + return "Flow configuration was modified"; /* istanbul ignore next */ default: throw new Error("Unknown event typename"); @@ -64,6 +67,8 @@ export class DatasetFlowDetailsHelpers { return { icon: "downloading", class: "text-muted" }; case "FlowEventScheduledForActivation": return { icon: "timer", class: "text-muted" }; + case "FlowConfigSnapshotModified": + return { icon: "downloading", class: "text-muted" }; case "FlowEventTaskChanged": { const event = flowEvent as FlowEventTaskChanged; switch (event.taskStatus) { @@ -108,6 +113,20 @@ export class DatasetFlowDetailsHelpers { case "FlowEventInitiated": case "FlowEventTriggerAdded": return this.describeTriggerDetails((flowEvent as FlowEventInitiated).trigger); + case "FlowConfigSnapshotModified": { + const event = flowEvent as FlowConfigSnapshotModified; + switch (event.configSnapshot.__typename) { + case "FlowConfigurationCompactionRule": + return "Modified by compaction rule"; + case "FlowConfigurationIngest": + return "Modified by ingest rule"; + case "FlowConfigurationReset": + return "Modified by reset rule"; + /* istanbul ignore next */ + default: + throw new Error("Unknown configSnapshot typename"); + } + } case "FlowEventAborted": return ""; case "FlowEventScheduledForActivation": { diff --git a/src/app/dataset-view/additional-components/dataset-settings-component/services/dataset-scheduling.service.ts b/src/app/dataset-view/additional-components/dataset-settings-component/services/dataset-scheduling.service.ts index f2b6dd16..37cb7661 100644 --- a/src/app/dataset-view/additional-components/dataset-settings-component/services/dataset-scheduling.service.ts +++ b/src/app/dataset-view/additional-components/dataset-settings-component/services/dataset-scheduling.service.ts @@ -6,8 +6,11 @@ import { DatasetFlowApi } from "src/app/api/dataset-flow.api"; import { DatasetFlowType, FlowConfigurationInput, + FlowTriggerInput, GetDatasetFlowConfigsQuery, + GetDatasetFlowTriggersQuery, SetDatasetFlowConfigMutation, + SetDatasetFlowTriggersMutation, } from "src/app/api/kamu.graphql.interface"; import AppValues from "src/app/common/app.values"; import { DatasetViewTypeEnum } from "src/app/dataset-view/dataset-view.interface"; @@ -45,6 +48,38 @@ export class DatasetSchedulingService { ); } + public fetchDatasetFlowTriggers( + datasetId: string, + datasetFlowType: DatasetFlowType, + ): Observable { + return this.datasetFlowApi.getDatasetFlowTriggers({ datasetId, datasetFlowType }); + } + + public setDatasetTriggers(params: { + datasetId: string; + datasetFlowType: DatasetFlowType; + paused: boolean; + triggerInput: FlowTriggerInput; + datasetInfo: DatasetInfo; + }): Observable { + return this.datasetFlowApi.setDatasetFlowTriggers(params).pipe( + map((data: SetDatasetFlowTriggersMutation) => { + const triggers = data.datasets.byId?.flows.triggers.setTrigger; + if (triggers?.__typename === "SetFlowTriggerSuccess") { + setTimeout(() => { + this.navigationService.navigateToDatasetView({ + accountName: params.datasetInfo.accountName, + datasetName: params.datasetInfo.datasetName, + tab: DatasetViewTypeEnum.Flows, + }); + }, AppValues.SIMULATION_START_CONDITION_DELAY_MS); + } else { + this.toastrService.error(triggers?.message); + } + }), + ); + } + // public setDatasetFlowSchedule(params: { // accountId: string; // datasetId: string; diff --git a/src/app/dataset-view/additional-components/dataset-settings-component/tabs/scheduling/dataset-settings-scheduling-tab.component.html b/src/app/dataset-view/additional-components/dataset-settings-component/tabs/scheduling/dataset-settings-scheduling-tab.component.html index ff59918f..a5534ea0 100644 --- a/src/app/dataset-view/additional-components/dataset-settings-component/tabs/scheduling/dataset-settings-scheduling-tab.component.html +++ b/src/app/dataset-view/additional-components/dataset-settings-component/tabs/scheduling/dataset-settings-scheduling-tab.component.html @@ -5,7 +5,7 @@

Scheduled updates

-
+
Triggers
@@ -45,12 +45,12 @@

Scheduled updates

/>
- {{ pollingGroup.get("every")?.errors?.range.message }} + {{ pollingForm.get("every")?.errors?.range.message }}
@@ -71,9 +71,9 @@

Scheduled updates

@@ -85,7 +85,7 @@

Scheduled updates

- Next time: {{ nextTime }} @@ -107,7 +107,7 @@

Scheduled updates

@@ -141,60 +141,72 @@

Scheduled updates

-
-
- Max batching interval: - - - -
- {{ batchingForm.get("every")?.errors?.range.message }} -
-
-
-
- Min records to await: - - -
- Minimum value 1 -
-
+
+ Triggers +
+
+ + Enable/Disable update schedule + +
+
+ Max batching interval: + + + +
+ {{ batchingForm.get("every")?.errors?.range.message }} +
+
+
+
+ Min records to await: + + +
+ Minimum value 1 +
+
+
-
-
diff --git a/src/app/dataset-view/additional-components/dataset-settings-component/tabs/scheduling/dataset-settings-scheduling-tab.component.ts b/src/app/dataset-view/additional-components/dataset-settings-component/tabs/scheduling/dataset-settings-scheduling-tab.component.ts index 3b66c03b..bb68617c 100644 --- a/src/app/dataset-view/additional-components/dataset-settings-component/tabs/scheduling/dataset-settings-scheduling-tab.component.ts +++ b/src/app/dataset-view/additional-components/dataset-settings-component/tabs/scheduling/dataset-settings-scheduling-tab.component.ts @@ -8,6 +8,9 @@ import { DatasetFlowType, DatasetKind, DatasetPermissionsFragment, + FlowTriggerInput, + GetDatasetFlowConfigsQuery, + GetDatasetFlowTriggersQuery, IngestConditionInput, TimeUnit, } from "src/app/api/kamu.graphql.interface"; @@ -17,7 +20,6 @@ import { cronExpressionNextTime, logError } from "src/app/common/app.helpers"; import { BatchingFormType, IngestConfigurationFormType, - PollingFormType, PollingGroupType, } from "./dataset-settings-scheduling-tab.component.types"; import { takeUntilDestroyed } from "@angular/core/rxjs-interop"; @@ -34,30 +36,28 @@ export class DatasetSettingsSchedulingTabComponent extends BaseComponent impleme public readonly pollingGroupEnum: typeof PollingGroupEnum = PollingGroupEnum; public readonly throttlingGroupEnum: typeof ThrottlingGroupEnum = ThrottlingGroupEnum; public readonly timeUnit: typeof TimeUnit = TimeUnit; - private scheduleOptions: IngestConditionInput; private everyTimeMapperValidators: Record = everyTimeMapperValidators; public ingestConfigurationForm = new FormGroup({ fetchUncacheable: new FormControl(false, { nonNullable: true }), }); - public pollingForm = new FormGroup({ - pollingGroup: new FormGroup({ - updatesState: new FormControl(false, { nonNullable: true }), - __typename: new FormControl(PollingGroupEnum.TIME_DELTA, [Validators.required]), - every: new FormControl>({ value: null, disabled: false }, [ - Validators.required, - Validators.min(1), - ]), - unit: new FormControl>({ value: null, disabled: false }, [Validators.required]), - cronExpression: new FormControl>({ value: "", disabled: true }, [ - Validators.required, - cronExpressionValidator(), - ]), - }), + public pollingForm = new FormGroup({ + updatesState: new FormControl(false, { nonNullable: true }), + __typename: new FormControl(PollingGroupEnum.TIME_DELTA, [Validators.required]), + every: new FormControl>({ value: null, disabled: false }, [ + Validators.required, + Validators.min(1), + ]), + unit: new FormControl>({ value: null, disabled: false }, [Validators.required]), + cronExpression: new FormControl>({ value: "", disabled: true }, [ + Validators.required, + cronExpressionValidator(), + ]), }); public batchingForm = new FormGroup({ + updatesState: new FormControl(false, { nonNullable: true }), every: new FormControl>({ value: null, disabled: false }, [ Validators.required, Validators.min(1), @@ -71,12 +71,8 @@ export class DatasetSettingsSchedulingTabComponent extends BaseComponent impleme private datasetSchedulingService = inject(DatasetSchedulingService); - public get pollingGroup(): FormGroup { - return this.pollingForm.get("pollingGroup") as FormGroup; - } - public get pollingType(): AbstractControl { - return this.pollingGroup.controls.__typename; + return this.pollingForm.controls.__typename; } public get batchingEveryTime(): AbstractControl { @@ -91,16 +87,20 @@ export class DatasetSettingsSchedulingTabComponent extends BaseComponent impleme return this.batchingForm.controls.minRecordsToAwait; } + public get batchingUpdateState(): AbstractControl { + return this.batchingForm.controls.updatesState; + } + public get pollingEveryTime(): AbstractControl { - return this.pollingGroup.controls.every; + return this.pollingForm.controls.every; } public get pollingUnitTime(): AbstractControl { - return this.pollingGroup.controls.unit; + return this.pollingForm.controls.unit; } public get cronExpression(): AbstractControl { - return this.pollingGroup.controls.cronExpression; + return this.pollingForm.controls.cronExpression; } public get nextTime(): string { @@ -115,6 +115,10 @@ export class DatasetSettingsSchedulingTabComponent extends BaseComponent impleme return this.ingestConfigurationForm.controls.fetchUncacheable; } + public get pollingUpdateState(): AbstractControl { + return this.pollingForm.controls.updatesState; + } + public ngOnInit() { if (!this.datasetPermissions.permissions.canSchedule) { this.pollingForm.disable(); @@ -187,104 +191,117 @@ export class DatasetSettingsSchedulingTabComponent extends BaseComponent impleme private checkStatusSection(): void { if (this.datasetBasics.kind === DatasetKind.Root) { - this.batchingForm.disable(); - this.pollingGroup.enable(); - this.cronExpression.disable(); + // this.cronExpression.disable(); + //Init configs this.datasetSchedulingService .fetchDatasetFlowConfigs(this.datasetBasics.id, DatasetFlowType.Ingest) .pipe(takeUntilDestroyed(this.destroyRef)) - .subscribe((data) => { + .subscribe((data: GetDatasetFlowConfigsQuery) => { const flowConfiguration = data.datasets.byId?.flows.configs.byType?.ingest; this.ingestConfigurationForm.patchValue({ fetchUncacheable: flowConfiguration?.fetchUncacheable }); - console.log("==>", flowConfiguration); - // const paused = data.datasets.byId?.flows.configs.byType?.paused; - // if (flowConfiguration?.schedule) { - // this.pollingForm.patchValue({ updatesState: !paused }); - // this.pollingGroup.patchValue({ - // ...flowConfiguration.schedule, - // fetchUncacheable: flowConfiguration.fetchUncacheable, - // }); - // if (flowConfiguration.schedule.__typename === "Cron5ComponentExpression") { - // this.pollingGroup.patchValue({ - // // splice for sync with cron parser - // cronExpression: flowConfiguration.schedule.cron5ComponentExpression, - // }); - // } - // } + }); + //Init triggers + this.datasetSchedulingService + .fetchDatasetFlowTriggers(this.datasetBasics.id, DatasetFlowType.Ingest) + .pipe(takeUntilDestroyed(this.destroyRef)) + .subscribe((data: GetDatasetFlowTriggersQuery) => { + const flowTriggers = data.datasets.byId?.flows.triggers.byType; + const schedule = flowTriggers?.schedule; + if (schedule && schedule.__typename === PollingGroupEnum.TIME_DELTA) { + this.pollingForm.patchValue({ + updatesState: !flowTriggers.paused, + __typename: schedule?.__typename as PollingGroupEnum, + every: schedule.every, + unit: schedule.unit, + }); + } + if (schedule && schedule.__typename === PollingGroupEnum.CRON_5_COMPONENT_EXPRESSION) { + this.pollingForm.patchValue({ + updatesState: !flowTriggers.paused, + __typename: schedule.__typename as PollingGroupEnum, + cronExpression: schedule.cron5ComponentExpression, + }); + } + }); + } else { + this.datasetSchedulingService + .fetchDatasetFlowTriggers(this.datasetBasics.id, DatasetFlowType.ExecuteTransform) + .pipe(takeUntilDestroyed(this.destroyRef)) + .subscribe((data: GetDatasetFlowTriggersQuery) => { + const flowTriggers = data.datasets.byId?.flows.triggers.byType; + const batching = flowTriggers?.batching; + if (batching) { + this.batchingForm.patchValue({ + ...batching.maxBatchingInterval, + minRecordsToAwait: batching.minRecordsToAwait, + updatesState: !flowTriggers.paused, + }); + } }); } - // else { - // this.pollingGroup.disable(); - // this.batchingForm.enable(); - // this.datasetSchedulingService - // .fetchDatasetFlowConfigs(this.datasetBasics.id, DatasetFlowType.ExecuteTransform) - // .pipe(takeUntilDestroyed(this.destroyRef)) - // .subscribe((data) => { - // const flowConfiguration = data.datasets.byId?.flows.configs.byType; - // const paused = data.datasets.byId?.flows.configs.byType?.paused; - // if (flowConfiguration?.transform) { - // const batchingConfig = flowConfiguration.transform; - // this.pollingForm.patchValue({ updatesState: !paused }); - // this.batchingForm.patchValue({ - // ...batchingConfig.maxBatchingInterval, - // minRecordsToAwait: batchingConfig.minRecordsToAwait, - // }); - // } - // }); - // } } - public onSubmit(): void { - // if (this.datasetBasics.kind === DatasetKind.Root) { - // this.setScheduleOptions(); - // this.datasetSchedulingService - // .setDatasetFlowSchedule({ - // accountId: this.datasetBasics.owner.id, - // datasetId: this.datasetBasics.id, - // datasetFlowType: DatasetFlowType.Ingest, - // paused: !(this.updateState.value as boolean), - // ingest: this.scheduleOptions, - // datasetInfo: { - // accountName: this.datasetBasics.owner.accountName, - // datasetName: this.datasetBasics.name, - // }, - // }) - // .pipe(takeUntilDestroyed(this.destroyRef)) - // .subscribe(); - // } else { - // this.datasetSchedulingService - // .setDatasetFlowBatching({ - // accountId: this.datasetBasics.owner.id, - // datasetId: this.datasetBasics.id, - // datasetFlowType: DatasetFlowType.ExecuteTransform, - // paused: !(this.updateState.value as boolean), - // transform: { - // minRecordsToAwait: this.batchingMinRecordsToAwait.value as number, - // maxBatchingInterval: { - // every: this.batchingEveryTime.value as number, - // unit: this.batchingUnitTime.value as TimeUnit, - // }, - // }, - // datasetInfo: { - // accountName: this.datasetBasics.owner.accountName, - // datasetName: this.datasetBasics.name, - // }, - // }) - // .pipe(takeUntilDestroyed(this.destroyRef)) - // .subscribe(); - // } + public saveBatchingTriggers(): void { + this.datasetSchedulingService + .setDatasetTriggers({ + datasetId: this.datasetBasics.id, + datasetFlowType: DatasetFlowType.ExecuteTransform, + paused: !(this.batchingUpdateState.value as boolean), + triggerInput: this.setBatchingTriggerInput(), + datasetInfo: { + accountName: this.datasetBasics.owner.accountName, + datasetName: this.datasetBasics.name, + }, + }) + .pipe(takeUntilDestroyed(this.destroyRef)) + .subscribe(); } - private setScheduleOptions(): void { - if (this.pollingGroup.controls.__typename.value === PollingGroupEnum.TIME_DELTA) { - this.scheduleOptions = { - fetchUncacheable: this.fetchUncacheable.value, + public savePollingTriggers(): void { + this.datasetSchedulingService + .setDatasetTriggers({ + datasetId: this.datasetBasics.id, + datasetFlowType: DatasetFlowType.Ingest, + paused: !(this.pollingUpdateState.value as boolean), + triggerInput: this.setPollingTriggerInput(), + datasetInfo: { + accountName: this.datasetBasics.owner.accountName, + datasetName: this.datasetBasics.name, + }, + }) + .pipe(takeUntilDestroyed(this.destroyRef)) + .subscribe(); + } + + private setPollingTriggerInput(): FlowTriggerInput { + if (this.pollingForm.controls.__typename.value === PollingGroupEnum.TIME_DELTA) { + return { + schedule: { + timeDelta: { + every: this.pollingEveryTime.value as number, + unit: this.pollingUnitTime.value as TimeUnit, + }, + }, }; - } - if (this.pollingGroup.controls.__typename.value === PollingGroupEnum.CRON_5_COMPONENT_EXPRESSION) { - this.scheduleOptions = { - fetchUncacheable: this.fetchUncacheable.value, + } else { + return { + schedule: { + // sync with server validator + cron5ComponentExpression: this.cronExpression.value as string, + }, }; } } + + private setBatchingTriggerInput(): FlowTriggerInput { + return { + batching: { + minRecordsToAwait: this.batchingMinRecordsToAwait.value as number, + maxBatchingInterval: { + every: this.batchingEveryTime.value as number, + unit: this.batchingUnitTime.value as TimeUnit, + }, + }, + }; + } } diff --git a/src/app/dataset-view/additional-components/dataset-settings-component/tabs/scheduling/dataset-settings-scheduling-tab.component.types.ts b/src/app/dataset-view/additional-components/dataset-settings-component/tabs/scheduling/dataset-settings-scheduling-tab.component.types.ts index dfd21e7b..629fd69f 100644 --- a/src/app/dataset-view/additional-components/dataset-settings-component/tabs/scheduling/dataset-settings-scheduling-tab.component.types.ts +++ b/src/app/dataset-view/additional-components/dataset-settings-component/tabs/scheduling/dataset-settings-scheduling-tab.component.types.ts @@ -16,6 +16,7 @@ export interface PollingGroupType { } export interface BatchingFormType { + updatesState: FormControl; every: FormControl>; unit: FormControl>; minRecordsToAwait: FormControl>;