From 1e4ada9834a1a7bd6dfb007ad5de282ee4c633be Mon Sep 17 00:00:00 2001 From: Nayef Ghattas Date: Fri, 21 Jun 2024 22:06:24 +0000 Subject: [PATCH] uploader: avoid re-uploading already successfully uploaded file --- symbolication/datadog_uploader.go | 50 ++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/symbolication/datadog_uploader.go b/symbolication/datadog_uploader.go index 24384d4..262fec2 100644 --- a/symbolication/datadog_uploader.go +++ b/symbolication/datadog_uploader.go @@ -16,6 +16,7 @@ import ( "runtime" "time" + lru "github.com/elastic/go-freelru" log "github.com/sirupsen/logrus" "github.com/elastic/otel-profiling-agent/libpf" @@ -23,11 +24,15 @@ import ( "github.com/elastic/otel-profiling-agent/libpf/vc" ) +const binaryCacheSize = 1000 + const sourceMapEndpoint = "/api/v2/srcmap" type DatadogUploader struct { ddAPIKey string intakeURL string + + uploadCache *lru.SyncedLRU[libpf.FileID, struct{}] } var _ Uploader = (*DatadogUploader)(nil) @@ -53,14 +58,27 @@ func NewDatadogUploader() (Uploader, error) { return nil, fmt.Errorf("failed to parse URL: %w", err) } + uploadCache, err := lru.NewSynced[libpf.FileID, struct{}](binaryCacheSize, libpf.FileID.Hash32) + if err != nil { + return nil, fmt.Errorf("failed to create cache: %w", err) + } + return &DatadogUploader{ ddAPIKey: ddAPIKey, intakeURL: intakeURL, + + uploadCache: uploadCache, }, nil } func (d *DatadogUploader) HandleExecutable(ctx context.Context, elfRef *pfelf.Reference, fileID libpf.FileID) error { + _, ok := d.uploadCache.Peek(fileID) + if ok { + log.Debugf("Skipping symbol upload for executable %s: already uploaded", + elfRef.FileName()) + return nil + } fileName := elfRef.FileName() ef, err := elfRef.GetELF() // If the ELF file is not found, we ignore it @@ -99,13 +117,20 @@ func (d *DatadogUploader) HandleExecutable(ctx context.Context, elfRef *pfelf.Re return fmt.Errorf("failed to copy symbols: %w", err) } + d.uploadCache.Add(fileID, struct{}{}) // TODO: // This will launch a goroutine to upload the symbols, per executable // which would potentially lead to a large number of goroutines // if there are many executables. // Ideally, we should limit the number of concurrent uploads go func() { - d.uploadSymbols(symbolFile, e) + err = d.uploadSymbols(symbolFile, e) + if err != nil { + log.Errorf("Failed to upload symbols: %v for executable: %s", err, e) + d.uploadCache.Remove(fileID) + } else { + log.Infof("Symbols uploaded successfully for executable: %s", e) + } symbolFile.Close() os.Remove(symbolFile.Name()) }() @@ -151,6 +176,13 @@ func newExecutableMetadata(fileName string, elf *pfelf.File, }, nil } +func (e *executableMetadata) String() string { + return fmt.Sprintf( + "%s, arch=%s, gnu_build_id=%s, go_build_id=%s, file_hash=%s, platform=%s, type=%s", + e.fileName, e.Arch, e.GNUBuildID, e.GoBuildID, e.FileHash, e.Platform, e.Type, + ) +} + func (d *DatadogUploader) copySymbols(ctx context.Context, inputPath, outputPath string) error { args := []string{ "--only-keep-debug", @@ -165,20 +197,18 @@ func (d *DatadogUploader) copySymbols(ctx context.Context, inputPath, outputPath return nil } -func (d *DatadogUploader) uploadSymbols(symbolFile *os.File, e *executableMetadata) { +func (d *DatadogUploader) uploadSymbols(symbolFile *os.File, e *executableMetadata) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() req, err := d.buildSymbolUploadRequest(ctx, symbolFile, e) if err != nil { - log.Errorf("Failed to build symbol upload request: %v", err) - return + return fmt.Errorf("failed to build symbol upload request: %w", err) } resp, err := http.DefaultClient.Do(req) if err != nil { - log.Errorf("Failed to upload symbols: %v", err) - return + return err } defer resp.Body.Close() @@ -186,11 +216,10 @@ func (d *DatadogUploader) uploadSymbols(symbolFile *os.File, e *executableMetada if resp.StatusCode < 200 || resp.StatusCode >= 300 { respBody, _ := io.ReadAll(resp.Body) - log.Errorf("Failed to upload symbols: %s, %s", resp.Status, string(respBody)) - return + return fmt.Errorf("error while uploading symbols: %s, %s", resp.Status, string(respBody)) } - log.Infof("Symbols uploaded successfully for executable: %+v", e) + return nil } func (d *DatadogUploader) buildSymbolUploadRequest(ctx context.Context, symbolFile *os.File, @@ -239,8 +268,7 @@ func (d *DatadogUploader) buildSymbolUploadRequest(ctx context.Context, symbolFi r, err := http.NewRequestWithContext(ctx, http.MethodPost, d.intakeURL, b) if err != nil { - log.Error("Failed to create request", err) - return nil, err + return nil, fmt.Errorf("failed to create request: %w", err) } r.Header.Set("DD-API-KEY", d.ddAPIKey)