Skip to content

Commit

Permalink
s3store: Add metrics for demand on upload semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
Acconut committed Aug 11, 2022
1 parent 5d8c2be commit 870c434
Showing 1 changed file with 50 additions and 17 deletions.
67 changes: 50 additions & 17 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ type S3Store struct {

// diskWriteDurationMetric holds the prometheus instance for storing the time it takes to write chunks to disk.
diskWriteDurationMetric prometheus.Summary

// uploadSemaphoreDemandMetric holds the prometheus instance for storing the demand on the upload semaphore
uploadSemaphoreDemandMetric prometheus.Gauge

// uploadSemaphoreLimitMetric holds the prometheus instance for storing the limit on the upload semaphore
uploadSemaphoreLimitMetric prometheus.Gauge
}

// The labels to use for observing and storing request duration. One label per operation.
Expand Down Expand Up @@ -216,25 +222,40 @@ func New(bucket string, service S3API) S3Store {
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})

return S3Store{
Bucket: bucket,
Service: service,
MaxPartSize: 5 * 1024 * 1024 * 1024,
MinPartSize: 5 * 1024 * 1024,
PreferredPartSize: 50 * 1024 * 1024,
MaxMultipartParts: 10000,
MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024,
MaxBufferedParts: 20,
TemporaryDirectory: "",
uploadSemaphore: semaphore.New(10),
requestDurationMetric: requestDurationMetric,
diskWriteDurationMetric: diskWriteDurationMetric,
uploadSemaphoreDemandMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tusd_s3_upload_semaphore_demand",
Help: "Number of goroutines wanting to acquire the upload lock or having it acquired",
})

uploadSemaphoreLimitMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tusd_s3_upload_semaphore_limit",
Help: "Limit of concurrent acquisitions of upload semaphore",
})

store := S3Store{
Bucket: bucket,
Service: service,
MaxPartSize: 5 * 1024 * 1024 * 1024,
MinPartSize: 5 * 1024 * 1024,
PreferredPartSize: 50 * 1024 * 1024,
MaxMultipartParts: 10000,
MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024,
MaxBufferedParts: 20,
TemporaryDirectory: "",
requestDurationMetric: requestDurationMetric,
diskWriteDurationMetric: diskWriteDurationMetric,
uploadSemaphoreDemandMetric: uploadSemaphoreDemandMetric,
uploadSemaphoreLimitMetric: uploadSemaphoreLimitMetric,
}

store.SetConcurrentPartUploads(10)
return store
}

// SetConcurrentPartUploads changes the limit on how many concurrent part uploads to S3 are allowed.
func (store *S3Store) SetConcurrentPartUploads(limit int) {
store.uploadSemaphore = semaphore.New(limit)
store.uploadSemaphoreLimitMetric.Set(float64(limit))
}

// UseIn sets this store as the core data store in the passed composer and adds
Expand All @@ -249,6 +270,8 @@ func (store S3Store) UseIn(composer *handler.StoreComposer) {
func (store S3Store) RegisterMetrics(registry prometheus.Registerer) {
registry.MustRegister(store.requestDurationMetric)
registry.MustRegister(store.diskWriteDurationMetric)
registry.MustRegister(store.uploadSemaphoreDemandMetric)
registry.MustRegister(store.uploadSemaphoreLimitMetric)
}

func (store S3Store) observeRequestDuration(start time.Time, label string) {
Expand Down Expand Up @@ -455,10 +478,10 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
// starting many goroutines, most of which are just waiting for the lock.
// We also acquire the semaphore before reading from the channel to reduce
// the number of part files are laying around on disk without being used.
upload.store.uploadSemaphore.Acquire()
upload.store.acquireUploadSemaphore()
fileChunk, more := <-fileChan
if !more {
upload.store.uploadSemaphore.Release()
upload.store.releaseUploadSemaphore()
break
}

Expand All @@ -477,7 +500,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re

wg.Add(1)
go func(file io.ReadSeeker, part *s3Part, closePart func()) {
defer upload.store.uploadSemaphore.Release()
defer upload.store.releaseUploadSemaphore()
defer wg.Done()
defer closePart()

Expand All @@ -499,7 +522,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
} else {
wg.Add(1)
go func(file io.ReadSeeker, closePart func()) {
defer upload.store.uploadSemaphore.Release()
defer upload.store.releaseUploadSemaphore()
defer wg.Done()
defer closePart()

Expand Down Expand Up @@ -1214,3 +1237,13 @@ func (store S3Store) metadataKeyWithPrefix(key string) *string {

return aws.String(prefix + key)
}

func (store S3Store) acquireUploadSemaphore() {
store.uploadSemaphoreDemandMetric.Inc()
store.uploadSemaphore.Acquire()
}

func (store S3Store) releaseUploadSemaphore() {
store.uploadSemaphore.Release()
store.uploadSemaphoreDemandMetric.Dec()
}

0 comments on commit 870c434

Please sign in to comment.