Skip to content

Commit

Permalink
integrated scheduling tab
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy Borzenko committed Dec 16, 2024
1 parent 9e5c38c commit c3e2b4e
Show file tree
Hide file tree
Showing 13 changed files with 568 additions and 267 deletions.
6 changes: 6 additions & 0 deletions resources/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,12 @@ type FlowAbortedResult {
message: String!
}

type FlowConfigSnapshotModified implements FlowEvent {
eventId: EventID!
eventTime: DateTime!
configSnapshot: FlowConfigurationSnapshot!
}

type FlowConfiguration {
ingest: FlowConfigurationIngest
compaction: FlowConfigurationCompaction
Expand Down
44 changes: 44 additions & 0 deletions src/app/api/dataset-flow.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ import {
DatasetTriggerFlowMutation,
FlowConfigurationInput,
FlowRunConfiguration,
FlowTriggerInput,
GetDatasetFlowConfigsGQL,
GetDatasetFlowConfigsQuery,
GetDatasetFlowTriggersGQL,
GetDatasetFlowTriggersQuery,
GetDatasetListFlowsGQL,
GetDatasetListFlowsQuery,
GetFlowByIdGQL,
GetFlowByIdQuery,
SetDatasetFlowConfigGQL,
SetDatasetFlowConfigMutation,
SetDatasetFlowTriggersGQL,
SetDatasetFlowTriggersMutation,
} from "./kamu.graphql.interface";
import { Observable, first, map } from "rxjs";
import { ApolloQueryResult } from "@apollo/client";
Expand All @@ -46,6 +51,8 @@ export class DatasetFlowApi {
// private datasetFlowCompactionGQL = inject(DatasetFlowCompactionGQL);
private datasetFlowsInitiatorsGQL = inject(DatasetFlowsInitiatorsGQL);
private setDatasetFlowConfigGQL = inject(SetDatasetFlowConfigGQL);
private setDatasetFlowTriggersGQL = inject(SetDatasetFlowTriggersGQL);
private getDatasetFlowTriggersGQL = inject(GetDatasetFlowTriggersGQL);

public datasetTriggerFlow(params: {
accountId: string;
Expand Down Expand Up @@ -103,6 +110,43 @@ export class DatasetFlowApi {
);
}

public setDatasetFlowTriggers(params: {
datasetId: string;
datasetFlowType: DatasetFlowType;
paused: boolean;
triggerInput: FlowTriggerInput;
}): Observable<SetDatasetFlowTriggersMutation> {
return this.setDatasetFlowTriggersGQL.mutate(params).pipe(
first(),
map((result: MutationResult<SetDatasetFlowTriggersMutation>) => {
/* istanbul ignore else */
if (result.data) {
return result.data;
} else {
throw new DatasetOperationError(result.errors ?? []);
}
}),
);
}

public getDatasetFlowTriggers(params: {
datasetId: string;
datasetFlowType: DatasetFlowType;
}): Observable<GetDatasetFlowTriggersQuery> {
return this.getDatasetFlowTriggersGQL
.watch(params, {
...noCacheFetchPolicy,
context: {
skipLoading: true,
},
})
.valueChanges.pipe(
map((result: ApolloQueryResult<GetDatasetFlowTriggersQuery>) => {
return result.data;
}),
);
}

// public setDatasetFlowSchedule(params: {
// accountId: string;
// datasetId: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ fragment FlowHistoryData on FlowEvent {
... on FlowEventAborted {
__typename
}

... on FlowEventInitiated {
trigger {
__typename
Expand Down Expand Up @@ -90,4 +91,10 @@ fragment FlowHistoryData on FlowEvent {
}
}
}

... on FlowConfigSnapshotModified {
configSnapshot {
__typename
}
}
}
49 changes: 0 additions & 49 deletions src/app/api/gql/scheduling-dataset/dataset-flow-batching.graphql

This file was deleted.

48 changes: 0 additions & 48 deletions src/app/api/gql/scheduling-dataset/dataset-flow-schedule.graphql

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
mutation setDatasetFlowTriggers(
$datasetId: DatasetID!
$datasetFlowType: DatasetFlowType!
$paused: Boolean!
$triggerInput: FlowTriggerInput!
) {
datasets {
byId(datasetId: $datasetId) {
flows {
triggers {
setTrigger(datasetFlowType: $datasetFlowType, paused: $paused, triggerInput: $triggerInput) {
... on SetFlowTriggerSuccess {
message
}

... on FlowIncompatibleDatasetKind {
message
expectedDatasetKind
actualDatasetKind
}

... on FlowPreconditionsNotMet {
message
}

... on FlowTypeIsNotSupported {
message
}

... on FlowInvalidTriggerInputError {
message
reason
}
}
}
}
}
}
}
29 changes: 29 additions & 0 deletions src/app/api/gql/scheduling-dataset/dataset-flow-triggers.graphql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
query getDatasetFlowTriggers($datasetId: DatasetID!, $datasetFlowType: DatasetFlowType!) {
datasets {
byId(datasetId: $datasetId) {
flows {
triggers {
byType(datasetFlowType: $datasetFlowType) {
paused
schedule {
... on TimeDelta {
...TimeDeltaData
}

... on Cron5ComponentExpression {
cron5ComponentExpression
}
}

batching {
maxBatchingInterval {
...TimeDeltaData
}
minRecordsToAwait
}
}
}
}
}
}
}
Loading

0 comments on commit c3e2b4e

Please sign in to comment.