Skip to content

Commit

Permalink
feat: upgrade gocron to v2 & support sentry cron
Browse files Browse the repository at this point in the history
  • Loading branch information
MuZhou233 committed Feb 23, 2024
1 parent 23ea8ab commit 019993b
Show file tree
Hide file tree
Showing 14 changed files with 187 additions and 53 deletions.
7 changes: 6 additions & 1 deletion app/sephirah/cmd/sephirah/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions app/sephirah/internal/biz/bizchesed/chesed.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewChesed(
modelbinah.DownloadEmpty,
nil,
)
err := cron.BySeconds(60, c.ScanImage, context.Background()) //nolint:gomnd //TODO
err := cron.BySeconds("ChesedScanImage", 60, c.ScanImage, context.Background()) //nolint:gomnd //TODO
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -140,27 +140,27 @@ func (c *Chesed) UploadImageCallback(ctx context.Context, id model.InternalID) e
return nil
}

func (c *Chesed) ScanImage(ctx context.Context) {
func (c *Chesed) ScanImage(ctx context.Context) error {
if c.muScanImage.TryLock() {
defer c.muScanImage.Unlock()
} else {
return
return nil
}
images, err0 := c.repo.ListImageNeedScan(ctx)
if err0 != nil {
return
return err0
}
if len(images) == 0 {
return
return nil
}
for _, image := range images {
data, err := c.b.PresignedGetObject(ctx, bizbinah.BucketDefault, strconv.FormatInt(int64(image.ID), 10), libtime.Day)
if err != nil {
return
return err
}
results, err := c.miner.RecognizeImageURL(ctx, &miner.RecognizeImageURLRequest{Url: data})
if err != nil {
return
return err
}
var desReq string
for _, r := range results.GetResults() {
Expand All @@ -172,12 +172,13 @@ func (c *Chesed) ScanImage(ctx context.Context) {
searcherpb.DescribeIDRequest_DESCRIBE_MODE_APPEND,
searcherpb.Index_INDEX_CHESED_IMAGE,
); err != nil {
return
return err
}
if err = c.repo.SetImageStatus(ctx, image.ID, modelchesed.ImageStatusScanned); err != nil {
return
return err
}
}
return nil
}

func (c *Chesed) ListImages(ctx context.Context, paging model.Paging) ([]model.InternalID, int64, *errors.Error) {
Expand Down
13 changes: 7 additions & 6 deletions app/sephirah/internal/biz/biztiphereth/porter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,30 @@ import (
"github.com/go-kratos/kratos/v2/errors"
)

func (t *Tiphereth) updatePorters(ctx context.Context) {
func (t *Tiphereth) updatePorters(ctx context.Context) error {
if t.supv.KnownInstancesRequireUpdate() {
porters, _, err := t.repo.ListPorters(ctx, model.Paging{
PageSize: 1000, //nolint:gomnd // TODO
PageNum: 1,
})
if err != nil {
logger.Errorf("list porters failed: %s", err.Error())
return
return err
}
t.supv.UpdateKnownInstances(porters)
}
newPorters, err := t.supv.RefreshAliveInstances(ctx)
if err != nil {
logger.Errorf("refresh alive instances failed: %s", err.Error())
return
return err
}
if len(newPorters) == 0 {
return
return nil
}
ids, err := t.searcher.NewBatchIDs(ctx, len(newPorters))
if err != nil {
logger.Errorf("new batch ids failed: %s", err.Error())
return
return err
}
for i, porter := range newPorters {
porter.ID = ids[i]
Expand All @@ -45,8 +45,9 @@ func (t *Tiphereth) updatePorters(ctx context.Context) {
err = t.repo.UpsertPorters(ctx, newPorters)
if err != nil {
logger.Errorf("upsert porters failed: %s", err.Error())
return
return err
}
return nil
}

func (t *Tiphereth) ListPorters(
Expand Down
6 changes: 5 additions & 1 deletion app/sephirah/internal/biz/biztiphereth/tiphereth.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ func NewTiphereth(
searcher: sClient,
pullAccount: pullAccount,
}
err := cron.BySeconds(60, t.updatePorters, context.Background()) //nolint:gomnd // hard code min interval
err := cron.BySeconds(
"TipherethUpdatePorter",
60, //nolint:gomnd // hard code min interval
t.updatePorters, context.Background(),
)
if err != nil {
return nil, err
}
Expand Down
10 changes: 7 additions & 3 deletions app/sephirah/internal/biz/bizyesod/yesod.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,22 @@ func NewYesod(
searcher: sClient,
pullFeed: pullFeed,
}
err := cron.BySeconds(60, y.PullFeeds, context.Background()) //nolint:gomnd // hard code min interval
err := cron.BySeconds("YesodPullFeeds", 60, y.PullFeeds, context.Background()) //nolint:gomnd // hard code min interval
if err != nil {
return nil, err
}
return y, nil
}

func (y *Yesod) PullFeeds(ctx context.Context) {
func (y *Yesod) PullFeeds(ctx context.Context) error {
configs, err := y.repo.ListFeedConfigNeedPull(ctx, nil,
[]modelyesod.FeedConfigStatus{modelyesod.FeedConfigStatusActive},
modelyesod.ListFeedOrderNextPull, time.Now(), 32) //nolint:gomnd // TODO
if err != nil {
logger.Errorf("%s", err.Error())
return
return err
}
var errRes error
for _, c := range configs {
err = y.pullFeed.Publish(ctx, modelyesod.PullFeed{
InternalID: c.ID,
Expand All @@ -79,11 +80,14 @@ func (y *Yesod) PullFeeds(ctx context.Context) {
})
if err != nil {
logger.Errorf("%s", err.Error())
errRes = err
continue
}
err = y.repo.UpdateFeedConfigAsInQueue(ctx, c.ID)
if err != nil {
logger.Errorf("%s", err.Error())
errRes = err
}
}
return errRes
}
6 changes: 5 additions & 1 deletion cmd/librarian/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/fullstorydev/grpchan v1.1.1
github.com/getsentry/sentry-go v0.27.0
github.com/getsentry/sentry-go/otel v0.27.0
github.com/go-co-op/gocron v1.37.0
github.com/go-co-op/gocron/v2 v2.2.4
github.com/go-kratos/kratos/contrib/log/zap/v2 v2.0.0-20240119085030-a556a2b53120
github.com/go-kratos/kratos/contrib/registry/consul/v2 v2.0.0-20240119085030-a556a2b53120
github.com/go-kratos/kratos/v2 v2.7.2
Expand Down Expand Up @@ -113,6 +113,7 @@ require (
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jhump/protoreflect v1.14.1 // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/karrick/godirwalk v1.17.0 // indirect
Expand Down
15 changes: 6 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/gin-gonic/gin v1.8.1 h1:4+fr/el88TOO3ewCmQr8cx/CtZ/umlIRIs5M4NTNjf8=
github.com/gin-gonic/gin v1.8.1/go.mod h1:ji8BvRH1azfM+SYow9zQ6SZMvR8qOMZHmsCuWR9tTTk=
github.com/go-co-op/gocron v1.37.0 h1:ZYDJGtQ4OMhTLKOKMIch+/CY70Brbb1dGdooLEhh7b0=
github.com/go-co-op/gocron v1.37.0/go.mod h1:3L/n6BkO7ABj+TrfSVXLRzsP26zmikL4ISkLQ0O8iNY=
github.com/go-co-op/gocron/v2 v2.2.4 h1:fL6a8/U+BJQ9UbaeqKxua8wY02w4ftKZsxPzLSNOCKk=
github.com/go-co-op/gocron/v2 v2.2.4/go.mod h1:igssOwzZkfcnu3m2kwnCf/mYj4SmhP9ecSgmYjCOHkk=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
Expand Down Expand Up @@ -460,7 +460,6 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/wire v0.6.0 h1:HBkoIh4BdSxoyo9PveV8giw7ZsaBOvzWKfcg/6MrVwI=
Expand Down Expand Up @@ -595,6 +594,8 @@ github.com/johannesboyne/gofakes3 v0.0.0-20240117152127-f7e9c41d81b2 h1:SRQawDd/
github.com/johannesboyne/gofakes3 v0.0.0-20240117152127-f7e9c41d81b2/go.mod h1:AxgWC4DDX54O2WDoQO1Ceabtn6IbktjU/7bigor+66g=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
Expand Down Expand Up @@ -633,8 +634,6 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down Expand Up @@ -844,9 +843,7 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.3.2/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.4.0/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.5.0/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
Expand Down Expand Up @@ -1020,8 +1017,8 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
Expand Down
20 changes: 15 additions & 5 deletions internal/conf/base.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/conf/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ message Consul {

message Sentry {
string dsn = 1;
string environment = 2;
}
42 changes: 29 additions & 13 deletions internal/lib/libcron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,55 @@ import (
"context"
"time"

"github.com/go-co-op/gocron"
"github.com/go-co-op/gocron/v2"
"github.com/google/wire"
)

var ProviderSet = wire.NewSet(NewCron)

type Cron struct {
scheduler *gocron.Scheduler
scheduler gocron.Scheduler
sentryListener *sentryListener
}

func NewCron() *Cron {
s := gocron.NewScheduler(time.UTC)
func NewCron() (*Cron, error) {
sl := newSentryListener()
s, err := gocron.NewScheduler(
gocron.WithLocation(time.UTC),
gocron.WithLogger(newCronLogger()),
gocron.WithGlobalJobOptions(
gocron.WithEventListeners(sl.EventListeners()...),
),
)

if err != nil {
return nil, err
}

return &Cron{
s,
}
sl,
}, nil
}

func (c *Cron) Start(ctx context.Context) error {
c.scheduler.StartBlocking()
c.scheduler.Start()
return nil
}
func (c *Cron) Stop(ctx context.Context) error {
c.scheduler.Stop()
return nil
return c.scheduler.StopJobs()
}

func (c *Cron) ByCronExpr(expr string, jobFunc interface{}, params ...interface{}) error {
_, err := c.scheduler.Cron(expr).Do(jobFunc, params...)
return err
func (c *Cron) BySeconds(name string, seconds int, jobFunc interface{}, params ...interface{}) error {
return c.Duration(name, time.Duration(seconds)*time.Second, jobFunc, params...)
}

func (c *Cron) BySeconds(seconds int, jobFunc interface{}, params ...interface{}) error {
_, err := c.scheduler.Every(seconds).Seconds().Do(jobFunc, params...)
func (c *Cron) Duration(name string, duration time.Duration, jobFunc interface{}, params ...interface{}) error {
c.sentryListener.NewDurationJob(name, duration)
_, err := c.scheduler.NewJob(
gocron.DurationJob(duration),
gocron.NewTask(jobFunc, params...),
gocron.WithName(name),
)
return err
}
Loading

0 comments on commit 019993b

Please sign in to comment.