Skip to content

Commit

Permalink
Merge pull request #4874 from jackchenjc/issue-4834-8th
Browse files Browse the repository at this point in the history
feat: Support Querying ScheduleJob by Labels
  • Loading branch information
cloudxxx8 authored Aug 27, 2024
2 parents 3aa01a4 + f270cd5 commit 872e8ff
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 46 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.52
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.12
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.39
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.42
github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.31
github.com/edgexfoundry/go-mod-secrets/v3 v3.2.0-dev.9
github.com/fxamacker/cbor/v2 v2.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.52 h1:fv78Ky8/i3AOBOevstL
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.52/go.mod h1:oOuvWXdu6YaB2J17pe4X0ey66AZFyTzOmAZDQxPGGmM=
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.12 h1:JGZ9fsyCZOgbNkg+qdW9JN63NKIEX95v5zJhCVdlp10=
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.12/go.mod h1:v7CvWGVmTh8dKItDNtfdBnYTeLhfZP5YmFiLsGJL9KU=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.39 h1:z6E1KOAzHyFR/87A+0CxYlui0kq6tz7yctS6dXcowk0=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.39/go.mod h1:d/FCa9Djq/pb7RYGEEhrR7fnKo+JK5IQ2YGW4LIHAqE=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.42 h1:m31BNRTIod/DV1Rm9GnoO0D6qIgHCxDvwKyG39hPVo4=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.42/go.mod h1:d/FCa9Djq/pb7RYGEEhrR7fnKo+JK5IQ2YGW4LIHAqE=
github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.31 h1:mC0ZguoK8HjVxeD7dIiXRqKswM0y7gnPQJt1fLOh/v4=
github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.31/go.mod h1:gcHtufkjd6oa3ZLqfzp66bCyCPx8MZe8Pwzh+2ITFnw=
github.com/edgexfoundry/go-mod-registry/v3 v3.2.0-dev.13 h1:LkaF2eOpSz4eUiGpah4a9r+cB/A0Pea3Nh7aTU9hlKs=
Expand Down
9 changes: 6 additions & 3 deletions internal/pkg/infrastructure/postgres/scheduleactionrecord.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ func (c *Client) DeleteScheduleActionRecordByAge(ctx context.Context, age int64)
}

func addScheduleActionRecord(ctx context.Context, connPool *pgxpool.Pool, scheduleActionRecord model.ScheduleActionRecord) (model.ScheduleActionRecord, errors.EdgeX) {
actionId := scheduleActionRecord.Action.GetBaseScheduleAction().Id
// Remove the payload from the action before storing it in the database to reduce the size of the record
copiedScheduleAction := scheduleActionRecord.Action.WithEmptyPayload()
copiedScheduleAction := scheduleActionRecord.Action.WithEmptyPayloadAndId()

// Marshal the action to store it in the database
actionJSONBytes, err := json.Marshal(copiedScheduleAction)
Expand All @@ -178,7 +179,7 @@ func addScheduleActionRecord(ctx context.Context, connPool *pgxpool.Pool, schedu
ctx,
sqlInsert(scheduleActionRecordTable, idCol, actionIdCol, jobNameCol, actionCol, statusCol, scheduledAtCol),
scheduleActionRecord.Id,
copiedScheduleAction.GetBaseScheduleAction().Id,
actionId,
scheduleActionRecord.JobName,
actionJSONBytes,
scheduleActionRecord.Status,
Expand Down Expand Up @@ -213,8 +214,10 @@ func queryScheduleActionRecords(ctx context.Context, connPool *pgxpool.Pool, sql
if err != nil {
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "unable to JSON unmarshal schedule action record", err)
}
// Set the action ID back to models.ScheduleAction
actionWithId := action.WithId(actionId)

record.Action = action
record.Action = actionWithId
record.Created = created.UnixMilli()
record.ScheduledAt = scheduledAt.UnixMilli()
scheduleActionRecords = append(scheduleActionRecords, record)
Expand Down
29 changes: 24 additions & 5 deletions internal/pkg/infrastructure/postgres/schedulejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,23 @@ func (c *Client) AddScheduleJob(ctx context.Context, scheduleJob model.ScheduleJ
}

// AllScheduleJobs queries the schedule jobs with the given range, offset, and limit
func (c *Client) AllScheduleJobs(ctx context.Context, offset, limit int) ([]model.ScheduleJob, errors.EdgeX) {
func (c *Client) AllScheduleJobs(ctx context.Context, labels []string, offset, limit int) (jobs []model.ScheduleJob, err errors.EdgeX) {
offset, limit = getValidOffsetAndLimit(offset, limit)
jobs, err := queryScheduleJobs(ctx, c.ConnPool, sqlQueryAllWithPagination(scheduleJobTable), offset, limit)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all schedule jobs", err)
if len(labels) > 0 {
c.loggingClient.Debugf("Querying schedule jobs by labels: %v", labels)
labelsJSON, err := json.Marshal(labels)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "unable to JSON marshal labels", err)
}
jobs, err = queryScheduleJobs(ctx, c.ConnPool, sqlQueryAllByContentLabelsWithPagination(scheduleJobTable), labelsJSON, offset, limit)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all schedule jobs by labels", err)
}
} else {
jobs, err = queryScheduleJobs(ctx, c.ConnPool, sqlQueryAllWithPagination(scheduleJobTable), offset, limit)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all schedule jobs", err)
}
}

return jobs, nil
Expand Down Expand Up @@ -88,7 +100,14 @@ func (c *Client) ScheduleJobByName(ctx context.Context, name string) (model.Sche
}

// ScheduleJobTotalCount returns the total count of schedule jobs
func (c *Client) ScheduleJobTotalCount(ctx context.Context) (uint32, errors.EdgeX) {
func (c *Client) ScheduleJobTotalCount(ctx context.Context, labels []string) (uint32, errors.EdgeX) {
if len(labels) > 0 {
labelsJSON, err := json.Marshal(labels)
if err != nil {
return 0, errors.NewCommonEdgeX(errors.KindContractInvalid, "unable to JSON marshal labels", err)
}
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryCountContentLabels(scheduleJobTable), labelsJSON)
}
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryCount(scheduleJobTable))
}

Expand Down
15 changes: 14 additions & 1 deletion internal/pkg/infrastructure/postgres/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ import (
"strings"
)

// Constants for common column names in the database.
const (
// Constants for common column names in the database.
contentCol = "content"
createdCol = "created"
idCol = "id"
modifiedCol = "modified"
nameCol = "name"
statusCol = "status"

// Constants for common json field names in the database.
labelsField = "Labels"
)

// ----------------------------------------------------------------------------------
Expand Down Expand Up @@ -92,6 +95,11 @@ func sqlQueryAllByColWithPagination(table string, columns ...string) string {
return fmt.Sprintf("SELECT * FROM %s WHERE %s OFFSET $%d LIMIT $%d", table, whereCondition, columnCount+1, columnCount+2)
}

// sqlQueryAllByLabelsWithPagination returns the SQL statement for selecting all rows from the table by the given labels from content with pagination
func sqlQueryAllByContentLabelsWithPagination(table string) string {
return fmt.Sprintf("SELECT * FROM %s WHERE content->'%s' @> $1::jsonb ORDER BY %s OFFSET $2 LIMIT $3", table, labelsField, createdCol)
}

// sqlQueryAllWithPaginationAndTimeRange returns the SQL statement for selecting all rows from the table with pagination and a time range.
func sqlQueryAllWithPaginationAndTimeRange(table string) string {
return fmt.Sprintf("SELECT * FROM %s WHERE %s >= $1 AND %s <= $2 ORDER BY %s OFFSET $3 LIMIT $4", table, createdCol, createdCol, createdCol)
Expand Down Expand Up @@ -177,6 +185,11 @@ func sqlQueryCountByCol(table string, columns ...string) string {
return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE %s", table, whereCondition)
}

// sqlQueryCountContentLabels returns the SQL statement for counting the number of rows in the table by the given labels.
func sqlQueryCountContentLabels(table string) string {
return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE content->'%s' @> $1::jsonb", table, labelsField)
}

// sqlQueryCountByTimeRangeCol returns the SQL statement for counting the number of rows in the table
// by the given time range of the specified column
func sqlQueryCountByTimeRangeCol(table string, timeRangeCol string, arrayColNames []string, columns ...string) string {
Expand Down
10 changes: 5 additions & 5 deletions internal/support/cronscheduler/application/schedulejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func AddScheduleJob(ctx context.Context, job models.ScheduleJob, dic *di.Contain

// Add the ID for each action
for i, action := range job.Actions {
job.Actions[i] = action.WithId()
job.Actions[i] = action.WithId("")
}

err := schedulerManager.AddScheduleJob(job, correlationId)
Expand Down Expand Up @@ -86,11 +86,11 @@ func ScheduleJobByName(ctx context.Context, name string, dic *di.Container) (dto
}

// AllScheduleJobs queries all the schedule jobs with offset and limit
func AllScheduleJobs(ctx context.Context, offset, limit int, dic *di.Container) (scheduleJobDTOs []dtos.ScheduleJob, totalCount uint32, err errors.EdgeX) {
func AllScheduleJobs(ctx context.Context, labels []string, offset, limit int, dic *di.Container) (scheduleJobDTOs []dtos.ScheduleJob, totalCount uint32, err errors.EdgeX) {
dbClient := container.DBClientFrom(dic.Get)
jobs, err := dbClient.AllScheduleJobs(ctx, offset, limit)
jobs, err := dbClient.AllScheduleJobs(ctx, labels, offset, limit)
if err == nil {
totalCount, err = dbClient.ScheduleJobTotalCount(ctx)
totalCount, err = dbClient.ScheduleJobTotalCount(ctx, labels)
}
if err != nil {
return scheduleJobDTOs, totalCount, errors.NewCommonEdgeXWrapper(err)
Expand Down Expand Up @@ -182,7 +182,7 @@ func LoadScheduleJobsToSchedulerManager(ctx context.Context, dic *di.Container)
ctx, correlationId := correlation.FromContextOrNew(ctx)
config := container.ConfigurationFrom(dic.Get)

jobs, err := dbClient.AllScheduleJobs(context.Background(), 0, config.Service.MaxResultCount)
jobs, err := dbClient.AllScheduleJobs(context.Background(), nil, 0, config.Service.MaxResultCount)
if err != nil {
return errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to load all existing scheduled jobs", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/support/cronscheduler/controller/http/schedulejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func (jc *ScheduleJobController) AllScheduleJobs(c echo.Context) error {
config := schedulerContainer.ConfigurationFrom(jc.dic.Get)

// parse URL query string for offset and limit
offset, limit, _, err := utils.ParseGetAllObjectsRequestQueryString(c, 0, math.MaxInt32, -1, config.Service.MaxResultCount)
offset, limit, labels, err := utils.ParseGetAllObjectsRequestQueryString(c, 0, math.MaxInt32, -1, config.Service.MaxResultCount)
if err != nil {
return utils.WriteErrorResponse(w, ctx, lc, err, "")
}
jobs, totalCount, err := application.AllScheduleJobs(ctx, offset, limit, jc.dic)
jobs, totalCount, err := application.AllScheduleJobs(ctx, labels, offset, limit, jc.dic)
if err != nil {
return utils.WriteErrorResponse(w, ctx, lc, err, "")
}
Expand Down
22 changes: 15 additions & 7 deletions internal/support/cronscheduler/controller/http/schedulejob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,13 @@ func TestScheduleJobByName(t *testing.T) {
func TestAllScheduleJobs(t *testing.T) {
expectedTotalScheduleJobsCount := uint32(0)
dic := mockDic()
var emptyLabels []string
dbClientMock := &csMock.DBClient{}
dbClientMock.On("ScheduleJobTotalCount", context.Background()).Return(expectedTotalScheduleJobsCount, nil)
dbClientMock.On("AllScheduleJobs", context.Background(), 0, 20).Return([]models.ScheduleJob{}, nil)
dbClientMock.On("AllScheduleJobs", context.Background(), 0, 1).Return([]models.ScheduleJob{}, nil)
dbClientMock.On("ScheduleJobTotalCount", context.Background(), emptyLabels).Return(expectedTotalScheduleJobsCount, nil)
dbClientMock.On("ScheduleJobTotalCount", context.Background(), testScheduleJobLabels).Return(expectedTotalScheduleJobsCount, nil)
dbClientMock.On("AllScheduleJobs", context.Background(), emptyLabels, 0, 20).Return([]models.ScheduleJob{}, nil)
dbClientMock.On("AllScheduleJobs", context.Background(), emptyLabels, 0, 1).Return([]models.ScheduleJob{}, nil)
dbClientMock.On("AllScheduleJobs", context.Background(), testScheduleJobLabels, 0, 1).Return([]models.ScheduleJob{}, nil)
dic.Update(di.ServiceConstructorMap{
container.DBClientInterfaceName: func(get di.Get) any {
return dbClientMock
Expand All @@ -267,22 +270,27 @@ func TestAllScheduleJobs(t *testing.T) {

tests := []struct {
name string
labels string
offset string
limit string
errorExpected bool
expectedTotalCount uint32
expectedStatusCode int
}{
{"Valid - get scheduled jobs without offset and limit", "", "", false, expectedTotalScheduleJobsCount, http.StatusOK},
{"Valid - get scheduled jobs with offset and limit", "0", "1", false, expectedTotalScheduleJobsCount, http.StatusOK},
{"Invalid - invalid offset format", "aaa", "1", true, expectedTotalScheduleJobsCount, http.StatusBadRequest},
{"Invalid - invalid limit format", "1", "aaa", true, expectedTotalScheduleJobsCount, http.StatusBadRequest},
{"Valid - get scheduled jobs without offset and limit", "", "", "", false, expectedTotalScheduleJobsCount, http.StatusOK},
{"Valid - get scheduled jobs with offset and limit", "", "0", "1", false, expectedTotalScheduleJobsCount, http.StatusOK},
{"Valid - get scheduled jobs by labels", strings.Join(testScheduleJobLabels, ","), "0", "1", false, expectedTotalScheduleJobsCount, http.StatusOK},
{"Invalid - invalid offset format", "", "aaa", "1", true, expectedTotalScheduleJobsCount, http.StatusBadRequest},
{"Invalid - invalid limit format", "", "1", "aaa", true, expectedTotalScheduleJobsCount, http.StatusBadRequest},
}
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
e := echo.New()
req, err := http.NewRequest(http.MethodGet, common.ApiAllScheduleJobRoute, http.NoBody)
query := req.URL.Query()
if testCase.labels != "" {
query.Add(common.Labels, testCase.labels)
}
if testCase.offset != "" {
query.Add(common.Offset, testCase.offset)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ type DBClient interface {
CloseSession()

AddScheduleJob(ctx context.Context, scheduleJob model.ScheduleJob) (model.ScheduleJob, errors.EdgeX)
AllScheduleJobs(ctx context.Context, offset, limit int) ([]model.ScheduleJob, errors.EdgeX)
AllScheduleJobs(ctx context.Context, labels []string, offset, limit int) ([]model.ScheduleJob, errors.EdgeX)
UpdateScheduleJob(ctx context.Context, scheduleJob model.ScheduleJob) errors.EdgeX
DeleteScheduleJobByName(ctx context.Context, name string) errors.EdgeX
ScheduleJobById(ctx context.Context, id string) (model.ScheduleJob, errors.EdgeX)
ScheduleJobByName(ctx context.Context, name string) (model.ScheduleJob, errors.EdgeX)
ScheduleJobTotalCount(ctx context.Context) (uint32, errors.EdgeX)
ScheduleJobTotalCount(ctx context.Context, labels []string) (uint32, errors.EdgeX)

AddScheduleActionRecord(ctx context.Context, scheduleActionRecord model.ScheduleActionRecord) (model.ScheduleActionRecord, errors.EdgeX)
AddScheduleActionRecords(ctx context.Context, scheduleActionRecord []model.ScheduleActionRecord) ([]model.ScheduleActionRecord, errors.EdgeX)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions openapi/v3/support-cron-scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,13 @@ components:
type: integer
description: "The creation timestamp of the last item in the result set, in milliseconds."
example: 1722479650
labelsParam:
in: query
name: labels
required: false
schema:
type: string
description: "Allows for querying a given object by associated user-defined label. More than one label may be specified via a comma-delimited list."
limitParam:
in: query
name: limit
Expand Down Expand Up @@ -800,6 +807,7 @@ paths:
/job/all:
parameters:
- $ref: '#/components/parameters/correlatedRequestHeader'
- $ref: '#/components/parameters/labelsParam'
- $ref: '#/components/parameters/offsetParam'
- $ref: '#/components/parameters/limitParam'
get:
Expand Down

0 comments on commit 872e8ff

Please sign in to comment.