Skip to content

Commit

Permalink
feat: add pipeline_definition_cleanup provider (#6213)
Browse files Browse the repository at this point in the history
* feat: add pipeline_definition_cleanup provider

* feat: add unit-test

* add method example

* add dryrun

* rename field
  • Loading branch information
Malyue authored Jan 18, 2024
1 parent d8eadd6 commit f9316bd
Show file tree
Hide file tree
Showing 17 changed files with 2,500 additions and 13 deletions.
4 changes: 2 additions & 2 deletions cmd/pipeline/bootstrap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ app: {}
cancel: {}
resource: {}
pprof: {}
definition-cleanup: {}
edge-reporter:
target:
url: "${OPENAPI_PUBLIC_URL}"
Expand Down Expand Up @@ -158,5 +159,4 @@ erda.core.pipeline.aop.plugins.pipeline.definition-report:
erda.core.pipeline.aop.plugins.task.autotest-cookie-keep-before:
erda.core.pipeline.aop.plugins.task.unit-test-report:
erda.core.pipeline.aop.plugins.task.autotest-cookie-keep-after:
erda.core.pipeline.aop.plugins.task.definition-report:

erda.core.pipeline.aop.plugins.task.definition-report:
1 change: 1 addition & 0 deletions cmd/pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
_ "github.com/erda-project/erda/internal/tools/pipeline/providers/cron/compensator"
_ "github.com/erda-project/erda/internal/tools/pipeline/providers/cron/daemon"
_ "github.com/erda-project/erda/internal/tools/pipeline/providers/dbgc"
_ "github.com/erda-project/erda/internal/tools/pipeline/providers/dbgc/definition_cleanup"
_ "github.com/erda-project/erda/internal/tools/pipeline/providers/definition"
_ "github.com/erda-project/erda/internal/tools/pipeline/providers/edgepipeline_register"
_ "github.com/erda-project/erda/internal/tools/pipeline/providers/graph"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ require (
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f
github.com/libgit2/git2go/v33 v33.0.9
github.com/magiconair/properties v1.8.6
github.com/mattn/go-sqlite3 v2.0.1+incompatible
github.com/mcuadros/go-version v0.0.0-20190830083331-035f6764e8d2
github.com/mholt/archiver v2.1.0+incompatible
github.com/minio/md5-simd v1.1.2
Expand Down Expand Up @@ -351,7 +352,6 @@ require (
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.10 // indirect
github.com/mattn/go-sqlite3 v2.0.1+incompatible // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.43 // indirect
github.com/mitchellh/copystructure v1.1.1 // indirect
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ github.com/Chronokeeper/anyxml v0.0.0-20160530174208-54457d8e98c6 h1:Etfj2lhXyrY
github.com/Chronokeeper/anyxml v0.0.0-20160530174208-54457d8e98c6/go.mod h1:YzuYAe2hrrwKXkM9kqjbkTLlkbA+/xw2MA46f1+ENxc=
github.com/ClickHouse/ch-go v0.53.0 h1:gD9oP15FW+1oTTYyVzmuVfM+bk5cB5wqdscBIIw/mRA=
github.com/ClickHouse/ch-go v0.53.0/go.mod h1:B9htMJ0hii/zrC2hljUKdnagRBuLqtRG/GrU3jqCwRk=
github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/ClickHouse/clickhouse-go/v2 v2.10.1 h1:WCnusqEeCO/9sLFVIv57le/O1ydUb+x9+SYYhJ11fsY=
github.com/ClickHouse/clickhouse-go/v2 v2.10.1/go.mod h1:teXfZNM90iQ99Jnuht+dxQXCuhDZ8nvvMoTJOFrcmcg=
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53 h1:sR+/8Yb4slttB4vD+b9btVEnWgL3Q00OBTzVT8B9C0c=
Expand Down Expand Up @@ -297,7 +296,6 @@ github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down Expand Up @@ -563,8 +561,6 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrp
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/erda-project/elastic v0.0.1-ex h1:5ajfxQ5S5YjpzFqY9LzL9hiKWCn6q/JDT4n8sNv7+pU=
github.com/erda-project/elastic v0.0.1-ex/go.mod h1:iAVsas6fcmt9pxtge1+dErMhecv+RLSXlD4rnZRJVW0=
github.com/erda-project/erda-infra v1.0.9-0.20231213094429-67b1657b3593 h1:foLm8eCYTpjz2yDccu3+oz8fXv536wXn0TR/xhqWmCo=
github.com/erda-project/erda-infra v1.0.9-0.20231213094429-67b1657b3593/go.mod h1:TS2BPw1crvFCMRqIi3gxZSaJvWw/XIjYmISiPJljFPU=
github.com/erda-project/erda-infra v1.0.10-0.20240112020002-e4c6c25a3fcf h1:s15eIENYaTZW6ome6IxfQCSMPsIJPRz7x0SLnSjsV+w=
github.com/erda-project/erda-infra v1.0.10-0.20240112020002-e4c6c25a3fcf/go.mod h1:TS2BPw1crvFCMRqIi3gxZSaJvWw/XIjYmISiPJljFPU=
github.com/erda-project/erda-infra/tools v0.0.0-20231213094429-67b1657b3593 h1:yqny9kVbick5hUsDQJU4yp9rpZBteNf2FKkUxce/2GU=
Expand Down Expand Up @@ -1315,7 +1311,6 @@ github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQ
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mitchellh/reflectwalk v1.0.1 h1:FVzMWA5RllMAKIdUSC8mdWo3XtwoecrH79BY70sEEpE=
github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mkevac/debugcharts v0.0.0-20191222103121-ae1c48aa8615/go.mod h1:Ad7oeElCZqA1Ufj0U9/liOF4BtVepxRcTvr2ey7zTvM=
github.com/moby/ipvs v1.0.1/go.mod h1:2pngiyseZbIKXNv7hsKj3O9UEz30c53MT9005gt2hxQ=
github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc=
github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8=
Expand Down Expand Up @@ -2013,9 +2008,6 @@ go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mI
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489 h1:1JFLBqwIgdyHN1ZtgjTBwO+blA6gVOmZurpiMEsETKo=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489/go.mod h1:yVHk9ub3CSBatqGNg7GRmsnfLWtoW60w4eDYfh7vHDg=
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g=
go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ=
go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
Expand Down
93 changes: 93 additions & 0 deletions internal/tools/pipeline/dbclient/op_pipeline_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package dbclient

import (
"fmt"
"time"

"github.com/erda-project/erda/apistructs"
definitiondb "github.com/erda-project/erda/internal/tools/pipeline/providers/definition/db"
Expand All @@ -24,6 +25,23 @@ import (
"github.com/erda-project/erda/pkg/crypto/uuid"
)

type PipelineBaseFilter struct {
ID []uint64
PipelineSource []apistructs.PipelineSource
PipelineYmName []string
ClusterName []string
Status []apistructs.PipelineStatus
Type []apistructs.PipelineType
TriggerMode []apistructs.PipelineTriggerMode
CronID []uint64
IsSnippet []uint64
StartTimeCreated *time.Time
EndTimeCreated *time.Time
StartTimeUpdated *time.Time
EndTimeUpdated *time.Time
PipelineDefinitionID []string
}

func (client *Client) ListPipelineBasesByIDs(pipelineIDs []uint64, ops ...SessionOption) (map[uint64]spec.PipelineBase, error) {
session := client.NewSession(ops...)
defer session.Close()
Expand Down Expand Up @@ -88,6 +106,72 @@ func (client *Client) GetPipelineBase(id uint64, ops ...SessionOption) (spec.Pip
return base, found, nil
}

func (client *Client) GetPipelineBaseByFilter(filter *PipelineBaseFilter, ops ...SessionOption) ([]spec.PipelineBase, error) {
session := client.NewSession(ops...)
defer session.Close()

var baseList []spec.PipelineBase

if len(filter.ID) > 0 {
session.In(string(spec.FieldID), filter.ID)
}
if len(filter.PipelineSource) > 0 {
session.In(string(spec.FieldPipelineSource), filter.PipelineSource)
}

if len(filter.PipelineYmName) > 0 {
session.In(string(spec.FieldPipelineYmlName), filter.PipelineYmName)
}

if len(filter.ClusterName) > 0 {
session.In(string(spec.FieldClusterName), filter.ClusterName)
}

if len(filter.Status) > 0 {
session.In(string(spec.FieldStatus), filter.Status)
}

if len(filter.Type) > 0 {
session.In(string(spec.FieldType), filter.Type)
}

if len(filter.TriggerMode) > 0 {
session.In(string(spec.FieldTriggerMode), filter.TriggerMode)
}

if len(filter.CronID) > 0 {
session.In(string(spec.FieldCronID), filter.CronID)
}

if len(filter.IsSnippet) > 0 {
session.In(string(spec.FieldIsSnippet), filter.IsSnippet)
}

if filter.StartTimeCreated != nil {
session.Where(string(spec.FieldTimeCreated)+" >= ?", filter.StartTimeCreated)
}

if filter.EndTimeCreated != nil {
session.Where(string(spec.FieldTimeCreated)+" <= ?", filter.StartTimeCreated)
}

if filter.StartTimeUpdated != nil {
session.Where(string(spec.FieldTimeUpdated)+" >= ?", filter.StartTimeCreated)
}

if filter.EndTimeUpdated != nil {
session.Where(string(spec.FieldTimeUpdated)+" <= ?", filter.StartTimeCreated)
}

if len(filter.PipelineDefinitionID) > 0 {
session.In(string(spec.FieldPipelineDefinitionID), filter.PipelineDefinitionID)
}

err := session.Desc(string(spec.FieldTimeCreated)).Find(&baseList)

return baseList, err
}

func (client *Client) GetPipelineStatus(id uint64, ops ...SessionOption) (apistructs.PipelineStatus, error) {
session := client.NewSession(ops...)
defer session.Close()
Expand All @@ -110,3 +194,12 @@ func (client *Client) UpdatePipelineBaseStatus(id uint64, status apistructs.Pipe
_, err := session.ID(id).Cols("status").Update(&spec.PipelineBase{Status: status})
return err
}

func (client *Client) BatchUpdatePipelineBaseByDefinitionIDs(ids []string, updateMap map[spec.Field]interface{}, ops ...SessionOption) error {
session := client.NewSession(ops...)
defer session.Close()

_, err := session.Table(&spec.PipelineBase{}).In(string(spec.FieldPipelineDefinitionID), ids).Update(updateMap)

return err
}
23 changes: 23 additions & 0 deletions internal/tools/pipeline/providers/cron/db/define_extra.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ func (client *Client) GetPipelineCron(id interface{}, ops ...mysqlxorm.SessionOp
return cron, true, nil
}

func (client *Client) BatchGetPipelineCronByDefinitionID(definitionIDs []string, ops ...mysqlxorm.SessionOption) (cronList []PipelineCron, err error) {
session := client.NewSession(ops...)
defer session.Close()

err = session.Table(&PipelineCron{}).In("pipeline_definition_id", definitionIDs).Desc("time_updated").Find(&cronList)

return cronList, err
}

func (client *Client) UpdatePipelineCron(id interface{}, cron *PipelineCron, ops ...mysqlxorm.SessionOption) error {
session := client.NewSession(ops...)
defer session.Close()
Expand Down Expand Up @@ -190,6 +199,20 @@ func (client *Client) DeletePipelineCron(id interface{}, ops ...mysqlxorm.Sessio
return nil
}

func (client *Client) BatchDeletePipelineCron(ids []uint64, ops ...mysqlxorm.SessionOption) error {
session := client.NewSession(ops...)
defer session.Close()

if len(ids) <= 0 {
return nil
}

if _, err := session.In("id", ids).Delete(&PipelineCron{}); err != nil {
return errors.Errorf("failed to delete pipeline cron, ids: %d, err: %v", ids, err)
}
return nil
}

func (client *Client) UpdatePipelineCronWillUseDefault(id interface{}, cron *PipelineCron, columns []string, ops ...mysqlxorm.SessionOption) error {
session := client.NewSession(ops...)
defer session.Close()
Expand Down
Loading

0 comments on commit f9316bd

Please sign in to comment.