diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index 1764f885c..7ea6038ac 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -168,6 +168,12 @@ type S3Store struct { // diskWriteDurationMetric holds the prometheus instance for storing the time it takes to write chunks to disk. diskWriteDurationMetric prometheus.Summary + + // uploadSemaphoreDemandMetric holds the prometheus instance for storing the demand on the upload semaphore + uploadSemaphoreDemandMetric prometheus.Gauge + + // uploadSemaphoreLimitMetric holds the prometheus instance for storing the limit on the upload semaphore + uploadSemaphoreLimitMetric prometheus.Gauge } // The labels to use for observing and storing request duration. One label per operation. @@ -216,25 +222,40 @@ func New(bucket string, service S3API) S3Store { Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }) - return S3Store{ - Bucket: bucket, - Service: service, - MaxPartSize: 5 * 1024 * 1024 * 1024, - MinPartSize: 5 * 1024 * 1024, - PreferredPartSize: 50 * 1024 * 1024, - MaxMultipartParts: 10000, - MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, - MaxBufferedParts: 20, - TemporaryDirectory: "", - uploadSemaphore: semaphore.New(10), - requestDurationMetric: requestDurationMetric, - diskWriteDurationMetric: diskWriteDurationMetric, + uploadSemaphoreDemandMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tusd_s3_upload_semaphore_demand", + Help: "Number of goroutines wanting to acquire the upload lock or having it acquired", + }) + + uploadSemaphoreLimitMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tusd_s3_upload_semaphore_limit", + Help: "Limit of concurrent acquisitions of upload semaphore", + }) + + store := S3Store{ + Bucket: bucket, + Service: service, + MaxPartSize: 5 * 1024 * 1024 * 1024, + MinPartSize: 5 * 1024 * 1024, + PreferredPartSize: 50 * 1024 * 1024, + MaxMultipartParts: 10000, + MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, + MaxBufferedParts: 20, + TemporaryDirectory: "", + requestDurationMetric: requestDurationMetric, + diskWriteDurationMetric: diskWriteDurationMetric, + uploadSemaphoreDemandMetric: uploadSemaphoreDemandMetric, + uploadSemaphoreLimitMetric: uploadSemaphoreLimitMetric, } + + store.SetConcurrentPartUploads(10) + return store } // SetConcurrentPartUploads changes the limit on how many concurrent part uploads to S3 are allowed. func (store *S3Store) SetConcurrentPartUploads(limit int) { store.uploadSemaphore = semaphore.New(limit) + store.uploadSemaphoreLimitMetric.Set(float64(limit)) } // UseIn sets this store as the core data store in the passed composer and adds @@ -249,6 +270,8 @@ func (store S3Store) UseIn(composer *handler.StoreComposer) { func (store S3Store) RegisterMetrics(registry prometheus.Registerer) { registry.MustRegister(store.requestDurationMetric) registry.MustRegister(store.diskWriteDurationMetric) + registry.MustRegister(store.uploadSemaphoreDemandMetric) + registry.MustRegister(store.uploadSemaphoreLimitMetric) } func (store S3Store) observeRequestDuration(start time.Time, label string) { @@ -455,10 +478,10 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re // starting many goroutines, most of which are just waiting for the lock. // We also acquire the semaphore before reading from the channel to reduce // the number of part files are laying around on disk without being used. - upload.store.uploadSemaphore.Acquire() + upload.store.acquireUploadSemaphore() fileChunk, more := <-fileChan if !more { - upload.store.uploadSemaphore.Release() + upload.store.releaseUploadSemaphore() break } @@ -477,7 +500,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re wg.Add(1) go func(file io.ReadSeeker, part *s3Part, closePart func()) { - defer upload.store.uploadSemaphore.Release() + defer upload.store.releaseUploadSemaphore() defer wg.Done() defer closePart() @@ -499,7 +522,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re } else { wg.Add(1) go func(file io.ReadSeeker, closePart func()) { - defer upload.store.uploadSemaphore.Release() + defer upload.store.releaseUploadSemaphore() defer wg.Done() defer closePart() @@ -1214,3 +1237,13 @@ func (store S3Store) metadataKeyWithPrefix(key string) *string { return aws.String(prefix + key) } + +func (store S3Store) acquireUploadSemaphore() { + store.uploadSemaphoreDemandMetric.Inc() + store.uploadSemaphore.Acquire() +} + +func (store S3Store) releaseUploadSemaphore() { + store.uploadSemaphore.Release() + store.uploadSemaphoreDemandMetric.Dec() +}