diff --git a/README.md b/README.md index daa7771..4986d15 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ DD_API_KEY=your-api-key # required DD_SITE=datadoghq.com # optional, defaults to "datadoghq.com" OTEL_PROFILING_AGENT_SERVICE=my-service # optional, defaults to "otel-profiling-agent-dev" OTEL_PROFILING_AGENT_REPORTER_INTERVAL=10s # optional, defaults to 60s +DD_EXPERIMENTAL_LOCAL_SYMBOL_UPLOAD=true # optional, defaults to false ``` Then, you can run the agent with the following command: diff --git a/docker-compose.yml b/docker-compose.yml index dd6d5d0..ee59ee9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,10 +8,15 @@ services: - arch=${ARCH:?error} privileged: true pid: "host" + environment: + DD_SITE: ${DD_SITE:-datadoghq.com} + DD_EXPERIMENTAL_LOCAL_SYMBOL_UPLOAD: ${DD_EXPERIMENTAL_LOCAL_SYMBOL_UPLOAD:-false} volumes: - .:/agent - /var/run/docker.sock:/var/run/docker.sock:ro - command: bash -c "sudo mount -t debugfs none /sys/kernel/debug && make && sudo /agent/otel-profiling-agent -tags 'service:${OTEL_PROFILING_AGENT_SERVICE:-otel-profiling-agent-dev};remote_symbols:yes' -collection-agent "http://datadog-agent:8126" -reporter-interval ${OTEL_PROFILING_AGENT_REPORTER_INTERVAL:-60s} -samples-per-second 20 -save-cpuprofile" + secrets: + - dd-api-key + command: ['/bin/sh', '-c', 'export DD_API_KEY=$$(cat /run/secrets/dd-api-key); sudo mount -t debugfs none /sys/kernel/debug && make && sudo -E /agent/otel-profiling-agent -tags "service:${OTEL_PROFILING_AGENT_SERVICE:-otel-profiling-agent-dev};remote_symbols:yes" -collection-agent "http://datadog-agent:8126" -reporter-interval ${OTEL_PROFILING_AGENT_REPORTER_INTERVAL:-60s} -samples-per-second 20 -save-cpuprofile'] datadog-agent: image: gcr.io/datadoghq/agent:7 @@ -24,7 +29,7 @@ services: - /sys/fs/cgroup/:/host/sys/fs/cgroup:ro secrets: - dd-api-key - entrypoint: [ '/bin/sh', '-c', 'export DD_API_KEY=$$(cat /run/secrets/dd-api-key) ; /bin/entrypoint.sh' ] + entrypoint: ['/bin/sh', '-c', 'export DD_API_KEY=$$(cat /run/secrets/dd-api-key) ; /bin/entrypoint.sh'] secrets: dd-api-key: diff --git a/libpf/pfelf/file.go b/libpf/pfelf/file.go index 30ab927..65df7d5 100644 --- a/libpf/pfelf/file.go +++ b/libpf/pfelf/file.go @@ -31,6 +31,7 @@ import ( "io" "os" "path/filepath" + "slices" "sort" "syscall" "unsafe" @@ -49,8 +50,12 @@ const ( // parsed sections (e.g. symbol tables and string tables; libxul // has about 4MB .dynstr) maxBytesLargeSection = 16 * 1024 * 1024 + buildIDSectionName = ".note.gnu.build-id" ) +var debugStrSectionNames = []string{".debug_str", ".zdebug_str", ".debug_str.dwo"} +var debugInfoSectionNames = []string{".debug_info", ".zdebug_info"} + // ErrSymbolNotFound is returned when requested symbol was not found var ErrSymbolNotFound = errors.New("symbol not found") @@ -109,6 +114,11 @@ type File struct { // bias is the load bias for ELF files inside core dump bias libpf.Address + // filePath is the path of the ELF file as opened by os.Open() + // This can be a path to a mapping file, or a path to the original ELF binary. + // This is empty is the file is opened from a coredump. + filePath string + // InsideCore indicates that this ELF is mapped from a coredump ELF InsideCore bool @@ -168,7 +178,7 @@ func Open(name string) (*File, error) { return nil, err } - ff, err := newFile(buffered, f, 0, false) + ff, err := newFile(name, buffered, f, 0, false) if err != nil { f.Close() return nil, err @@ -186,13 +196,15 @@ func (f *File) Close() (err error) { } // NewFile creates a new ELF file object that borrows the given reader. -func NewFile(r io.ReaderAt, loadAddress uint64, hasMusl bool) (*File, error) { - return newFile(r, nil, loadAddress, hasMusl) +func NewFile(path string, r io.ReaderAt, loadAddress uint64, hasMusl bool) (*File, error) { + return newFile(path, r, nil, loadAddress, hasMusl) } -func newFile(r io.ReaderAt, closer io.Closer, loadAddress uint64, hasMusl bool) (*File, error) { +func newFile(path string, r io.ReaderAt, closer io.Closer, + loadAddress uint64, hasMusl bool) (*File, error) { f := &File{ elfReader: r, + filePath: path, InsideCore: loadAddress != 0, closer: closer, } @@ -882,3 +894,49 @@ func (f *File) DynString(tag elf.DynTag) ([]string, error) { func (f *File) IsGolang() bool { return f.Section(".go.buildinfo") != nil || f.Section(".gopclntab") != nil } + +func (f *File) FilePath() (string, error) { + if f.InsideCore { + return "", errors.New("file path not available for ELF inside coredump") + } + return f.filePath, nil +} + +// HasDWARFData is a copy of pfelf.HasDWARFData, but for the libpf.File interface. +func (f *File) HasDWARFData() bool { + hasBuildID := false + hasDebugStr := false + for _, section := range f.Sections { + // NOBITS indicates that the section is actually empty, regardless of the size in the + // section header. + if section.Type == elf.SHT_NOBITS { + continue + } + + if section.Name == buildIDSectionName { + hasBuildID = true + } + + if slices.Contains(debugStrSectionNames, section.Name) { + hasDebugStr = section.Size > 0 + } + + // Some files have suspicious near-empty, partially stripped sections; consider them as not + // having DWARF data. + // The simplest binary gcc 10 can generate ("return 0") has >= 48 bytes for each section. + // Let's not worry about executables that may not verify this, as they would not be of + // interest to us. + if section.Size < 32 { + continue + } + + if slices.Contains(debugInfoSectionNames, section.Name) { + return true + } + } + + // Some alternate debug files only have a .debug_str section. For these we want to return true. + // Use the absence of program headers and presence of a Build ID as heuristic to identify + // alternate debug files. + return len(f.Progs) == 0 && hasBuildID && hasDebugStr +} diff --git a/libpf/process/coredump.go b/libpf/process/coredump.go index a32cda7..1683817 100644 --- a/libpf/process/coredump.go +++ b/libpf/process/coredump.go @@ -564,5 +564,5 @@ func (cf *CoredumpFile) ReadAt(p []byte, addr int64) (int, error) { // The returned `pfelf.File` is borrowing the coredump file. Closing it will not close the // underlying CoredumpFile. func (cf *CoredumpFile) OpenELF() (*pfelf.File, error) { - return pfelf.NewFile(cf, cf.Base, cf.parent.hasMusl) + return pfelf.NewFile(cf.Name, cf, cf.Base, cf.parent.hasMusl) } diff --git a/libpf/process/process.go b/libpf/process/process.go index 580e740..120fb19 100644 --- a/libpf/process/process.go +++ b/libpf/process/process.go @@ -230,7 +230,7 @@ func (sp *systemProcess) OpenELF(file string) (*pfelf.File, error) { if err != nil { return nil, fmt.Errorf("failed to extract VDSO: %v", err) } - return pfelf.NewFile(vdso, 0, false) + return pfelf.NewFile(file, vdso, 0, false) } ef, err := pfelf.Open(sp.GetMappingFile(m)) if err == nil { diff --git a/main.go b/main.go index 65a45a9..d56924f 100644 --- a/main.go +++ b/main.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/otel-profiling-agent/metrics/agentmetrics" "github.com/elastic/otel-profiling-agent/reporter" + "github.com/elastic/otel-profiling-agent/symbolication" "github.com/elastic/otel-profiling-agent/tracer" log "github.com/sirupsen/logrus" @@ -333,8 +334,22 @@ func mainWithExitCode() exitCode { // Start reporter metric reporting with 60 second intervals. defer reportermetrics.Start(mainCtx, rep, 60*time.Second)() + uploader := symbolication.NewNoopUploader() + + ddSymbolUpload := os.Getenv("DD_EXPERIMENTAL_LOCAL_SYMBOL_UPLOAD") + if ddSymbolUpload == "true" { + log.Infof("Enabling Datadog local symbol upload") + uploader, err = symbolication.NewDatadogUploader() + if err != nil { + log.Errorf( + "Failed to create Datadog symbol uploader, symbol upload will be disabled: %v", + err, + ) + } + } + // Load the eBPF code and map definitions - trc, err := tracer.NewTracer(mainCtx, rep, times, includeTracers, !argSendErrorFrames) + trc, err := tracer.NewTracer(mainCtx, rep, uploader, times, includeTracers, !argSendErrorFrames) if err != nil { msg := fmt.Sprintf("Failed to load eBPF tracer: %s", err) log.Error(msg) diff --git a/processmanager/execinfomanager/manager.go b/processmanager/execinfomanager/manager.go index 2c99be2..325817e 100644 --- a/processmanager/execinfomanager/manager.go +++ b/processmanager/execinfomanager/manager.go @@ -7,9 +7,11 @@ package execinfomanager import ( + "context" "errors" "fmt" "os" + "time" "github.com/elastic/otel-profiling-agent/config" "github.com/elastic/otel-profiling-agent/host" @@ -29,6 +31,7 @@ import ( "github.com/elastic/otel-profiling-agent/metrics" pmebpf "github.com/elastic/otel-profiling-agent/processmanager/ebpf" "github.com/elastic/otel-profiling-agent/support" + "github.com/elastic/otel-profiling-agent/symbolication" "github.com/elastic/otel-profiling-agent/tpbase" log "github.com/sirupsen/logrus" "go.uber.org/multierr" @@ -72,6 +75,9 @@ type ExecutableInfoManager struct { // sdp allows fetching stack deltas for executables. sdp nativeunwind.StackDeltaProvider + // uploader is used to upload symbolication data. + uploader symbolication.Uploader + // state bundles up all mutable state of the manager. state xsync.RWMutex[executableInfoManagerState] } @@ -79,6 +85,7 @@ type ExecutableInfoManager struct { // NewExecutableInfoManager creates a new instance of the executable info manager. func NewExecutableInfoManager( sdp nativeunwind.StackDeltaProvider, + uploader symbolication.Uploader, ebpf pmebpf.EbpfHandler, includeTracers []bool, ) *ExecutableInfoManager { @@ -104,7 +111,8 @@ func NewExecutableInfoManager( } return &ExecutableInfoManager{ - sdp: sdp, + sdp: sdp, + uploader: uploader, state: xsync.NewRWMutex(executableInfoManagerState{ interpreterLoaders: interpreterLoaders, executables: map[host.FileID]*entry{}, @@ -119,7 +127,7 @@ func NewExecutableInfoManager( // // The return value is copied instead of returning a pointer in order to spare us the use // of getters and more complicated locking semantics. -func (mgr *ExecutableInfoManager) AddOrIncRef(fileID host.FileID, +func (mgr *ExecutableInfoManager) AddOrIncRef(hostFileID host.FileID, fileID libpf.FileID, elfRef *pfelf.Reference) (ExecutableInfo, error) { var ( intervalData sdtypes.IntervalData @@ -131,7 +139,7 @@ func (mgr *ExecutableInfoManager) AddOrIncRef(fileID host.FileID, // Fast path for executable info that is already present. state := mgr.state.WLock() - info, ok := state.executables[fileID] + info, ok := state.executables[hostFileID] if ok { defer mgr.state.WUnlock(&state) info.rc++ @@ -142,7 +150,7 @@ func (mgr *ExecutableInfoManager) AddOrIncRef(fileID host.FileID, // so we release the lock before doing this. mgr.state.WUnlock(&state) - if err = mgr.sdp.GetIntervalStructuresForFile(fileID, elfRef, &intervalData); err != nil { + if err = mgr.sdp.GetIntervalStructuresForFile(hostFileID, elfRef, &intervalData); err != nil { return ExecutableInfo{}, fmt.Errorf("failed to extract interval data: %w", err) } @@ -156,20 +164,19 @@ func (mgr *ExecutableInfoManager) AddOrIncRef(fileID host.FileID, // Re-take the lock and check whether another thread beat us to // inserting the data while we were waiting for the write lock. state = mgr.state.WLock() - defer mgr.state.WUnlock(&state) - if info, ok = state.executables[fileID]; ok { + if info, ok = state.executables[hostFileID]; ok { info.rc++ return info.ExecutableInfo, nil } // Load the data into BPF maps. - ref, gaps, err = state.loadDeltas(fileID, intervalData.Deltas) + ref, gaps, err = state.loadDeltas(hostFileID, intervalData.Deltas) if err != nil { return ExecutableInfo{}, fmt.Errorf("failed to load deltas: %w", err) } // Create the LoaderInfo for interpreter detection - loaderInfo := interpreter.NewLoaderInfo(fileID, elfRef, gaps) + loaderInfo := interpreter.NewLoaderInfo(hostFileID, elfRef, gaps) // Insert a corresponding record into our map. info = &entry{ @@ -180,7 +187,19 @@ func (mgr *ExecutableInfoManager) AddOrIncRef(fileID host.FileID, mapRef: ref, rc: 1, } - state.executables[fileID] = info + state.executables[hostFileID] = info + mgr.state.WUnlock(&state) + + // Processing symbols for upload can take a while, so we release the lock + // before doing this. + // We also use a timeout to avoid blocking the process manager for too long. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + err = mgr.uploader.HandleExecutable(ctx, elfRef, fileID) + if err != nil { + log.Errorf("Failed to handle executable %v: %v", elfRef.FileName(), err) + } return info.ExecutableInfo, nil } diff --git a/processmanager/manager.go b/processmanager/manager.go index 47df281..2d04031 100644 --- a/processmanager/manager.go +++ b/processmanager/manager.go @@ -28,6 +28,7 @@ import ( pmebpf "github.com/elastic/otel-profiling-agent/processmanager/ebpf" eim "github.com/elastic/otel-profiling-agent/processmanager/execinfomanager" "github.com/elastic/otel-profiling-agent/reporter" + "github.com/elastic/otel-profiling-agent/symbolication" ) const ( @@ -63,7 +64,8 @@ var ( // the default implementation. func New(ctx context.Context, includeTracers []bool, monitorInterval time.Duration, ebpf pmebpf.EbpfHandler, fileIDMapper FileIDMapper, symbolReporter reporter.SymbolReporter, - sdp nativeunwind.StackDeltaProvider, filterErrorFrames bool) (*ProcessManager, error) { + uploader symbolication.Uploader, sdp nativeunwind.StackDeltaProvider, + filterErrorFrames bool) (*ProcessManager, error) { if fileIDMapper == nil { var err error fileIDMapper, err = newFileIDMapper(lruFileIDCacheSize) @@ -79,7 +81,7 @@ func New(ctx context.Context, includeTracers []bool, monitorInterval time.Durati } elfInfoCache.SetLifetime(elfInfoCacheTTL) - em := eim.NewExecutableInfoManager(sdp, ebpf, includeTracers) + em := eim.NewExecutableInfoManager(sdp, uploader, ebpf, includeTracers) interpreters := make(map[libpf.PID]map[libpf.OnDiskFileIdentifier]interpreter.Instance) diff --git a/processmanager/manager_test.go b/processmanager/manager_test.go index 693ada1..a7fb7e4 100644 --- a/processmanager/manager_test.go +++ b/processmanager/manager_test.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/otel-profiling-agent/lpm" "github.com/elastic/otel-profiling-agent/metrics" pmebpf "github.com/elastic/otel-profiling-agent/processmanager/ebpf" + "github.com/elastic/otel-profiling-agent/symbolication" ) // dummyProcess implements pfelf.Process for testing purposes @@ -302,6 +303,7 @@ func TestInterpreterConvertTrace(t *testing.T) { nil, nil, nil, + nil, true) if err != nil { t.Fatalf("Failed to initialize new process manager: %v", err) @@ -400,6 +402,7 @@ func TestNewMapping(t *testing.T) { ebpfMockup, NewMapFileIDMapper(), nil, + symbolication.NewNoopUploader(), &dummyProvider, true) if err != nil { @@ -598,6 +601,7 @@ func TestProcExit(t *testing.T) { ebpfMockup, NewMapFileIDMapper(), nil, + symbolication.NewNoopUploader(), &dummyProvider, true) if err != nil { diff --git a/processmanager/processinfo.go b/processmanager/processinfo.go index 0bdc2bc..267a8bf 100644 --- a/processmanager/processinfo.go +++ b/processmanager/processinfo.go @@ -216,8 +216,15 @@ func (pm *ProcessManager) handleNewInterpreter(pr process.Process, m *Mapping, // handleNewMapping processes new file backed mappings func (pm *ProcessManager) handleNewMapping(pr process.Process, m *Mapping, elfRef *pfelf.Reference) error { + fileID, ok := pm.FileIDMapper.Get(m.FileID) + if !ok { + log.Debugf("file ID lookup failed for PID %d, file ID %d", + pr.PID(), m.FileID) + fileID = libpf.UnsymbolizedFileID + } + // Resolve executable info first - ei, err := pm.eim.AddOrIncRef(m.FileID, elfRef) + ei, err := pm.eim.AddOrIncRef(m.FileID, fileID, elfRef) if err != nil { return err } diff --git a/symbolication/datadog_uploader.go b/symbolication/datadog_uploader.go new file mode 100644 index 0000000..7aa4db5 --- /dev/null +++ b/symbolication/datadog_uploader.go @@ -0,0 +1,255 @@ +package symbolication + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "mime/multipart" + "net/http" + "net/textproto" + "net/url" + "os" + "os/exec" + "runtime" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/elastic/otel-profiling-agent/libpf" + "github.com/elastic/otel-profiling-agent/libpf/pfelf" + "github.com/elastic/otel-profiling-agent/libpf/vc" +) + +const sourceMapEndpoint = "/api/v2/srcmap" + +type DatadogUploader struct { + ddAPIKey string + intakeURL string +} + +var _ Uploader = (*DatadogUploader)(nil) + +func NewDatadogUploader() (Uploader, error) { + err := exec.Command("objcopy", "--version").Run() + if err != nil { + return nil, fmt.Errorf("objcopy is not available: %w", err) + } + + ddAPIKey := os.Getenv("DD_API_KEY") + if ddAPIKey == "" { + return nil, fmt.Errorf("DD_API_KEY is not set") + } + + ddSite := os.Getenv("DD_SITE") + if ddSite == "" { + return nil, fmt.Errorf("DD_SITE is not set") + } + + intakeURL, err := url.JoinPath("https://sourcemap-intake."+ddSite, sourceMapEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to parse URL: %w", err) + } + + return &DatadogUploader{ + ddAPIKey: ddAPIKey, + intakeURL: intakeURL, + }, nil +} + +func (d *DatadogUploader) HandleExecutable(ctx context.Context, elfRef *pfelf.Reference, + fileID libpf.FileID) error { + fileName := elfRef.FileName() + ef, err := elfRef.GetELF() + // If the ELF file is not found, we ignore it + // This can happen for short-lived processes that are already gone by the time + // we try to upload symbols + if errors.Is(err, fs.ErrNotExist) { + log.Debugf("Skipping executable %s as it does not exist", fileName) + return nil + } + if err != nil { + return fmt.Errorf("could not get ELF: %w", err) + } + + // We only upload symbols for executables that have DWARF data + if !ef.HasDWARFData() { + log.Debugf("Skipping executable %s as it does not have DWARF data", fileName) + return nil + } + + e, err := newExecutableMetadata(fileName, ef, fileID) + if err != nil { + return err + } + + inputFilePath, err := ef.FilePath() + if err != nil { + return fmt.Errorf("failed to get ELF file path: %w", err) + } + + symbolFile, err := os.CreateTemp("", "objcopy-debug") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + + err = d.copySymbols(ctx, inputFilePath, symbolFile.Name()) + if err != nil { + return fmt.Errorf("failed to copy symbols: %w", err) + } + + // TODO: + // This will launch a goroutine to upload the symbols, per executable + // which would potentially lead to a large number of goroutines + // if there are many executables. + // Ideally, we should limit the number of concurrent uploads + go func() { + d.uploadSymbols(symbolFile, e) + symbolFile.Close() + os.Remove(symbolFile.Name()) + }() + + return nil +} + +type executableMetadata struct { + Arch string `json:"arch"` + GNUBuildID string `json:"gnu_build_id"` + GoBuildID string `json:"go_build_id"` + FileHash string `json:"file_hash"` + Platform string `json:"platform"` + Type string `json:"type"` + + fileName string +} + +func newExecutableMetadata(fileName string, elf *pfelf.File, + fileID libpf.FileID) (*executableMetadata, error) { + buildID, err := elf.GetBuildID() + if err != nil { + return nil, fmt.Errorf("failed to get build id: %w", err) + } + + goBuildID := "" + if elf.IsGolang() { + goBuildID, err = elf.GetGoBuildID() + if err != nil { + return nil, fmt.Errorf("failed to get go build id: %w", err) + } + } + + return &executableMetadata{ + Arch: runtime.GOARCH, + GNUBuildID: buildID, + GoBuildID: goBuildID, + FileHash: fileID.StringNoQuotes(), + Platform: "elf", + Type: "elf_symbol_file", + + fileName: fileName, + }, nil +} + +func (d *DatadogUploader) copySymbols(ctx context.Context, inputPath, outputPath string) error { + args := []string{ + "--only-keep-debug", + "--remove-section=.gdb_index", + inputPath, + outputPath, + } + err := exec.CommandContext(ctx, "objcopy", args...).Run() + if err != nil { + return fmt.Errorf("failed to extract debug symbols: %w", err) + } + return nil +} + +func (d *DatadogUploader) uploadSymbols(symbolFile *os.File, e *executableMetadata) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + req, err := d.buildSymbolUploadRequest(ctx, symbolFile, e) + if err != nil { + log.Errorf("Failed to build symbol upload request: %v", err) + return + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Errorf("Failed to upload symbols: %v", err) + return + } + + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + respBody, _ := io.ReadAll(resp.Body) + + log.Errorf("Failed to upload symbols: %s, %s", resp.Status, string(respBody)) + return + } + + log.Infof("Symbols uploaded successfully for executable: %+v", e) +} + +func (d *DatadogUploader) buildSymbolUploadRequest(ctx context.Context, symbolFile *os.File, + e *executableMetadata) (*http.Request, error) { + b := new(bytes.Buffer) + + gzipped := gzip.NewWriter(b) + + mw := multipart.NewWriter(gzipped) + + // Copy the symbol file into the multipart writer + filePart, err := mw.CreateFormFile("elf_symbol_file", "elf_symbol_file") + if err != nil { + return nil, fmt.Errorf("failed to create form file: %w", err) + } + + _, err = io.Copy(filePart, symbolFile) + if err != nil { + return nil, fmt.Errorf("failed to copy symbol file: %w", err) + } + + // Write the event metadata into the multipart writer + eventPart, err := mw.CreatePart(textproto.MIMEHeader{ + "Content-Disposition": []string{`form-data; name="event"; filename="event.json"`}, + "Content-Type": []string{"application/json"}, + }) + if err != nil { + return nil, fmt.Errorf("failed to create event part: %w", err) + } + + err = json.NewEncoder(eventPart).Encode(e) + if err != nil { + return nil, fmt.Errorf("failed to write JSON metadata: %w", err) + } + + // Close the multipart writer then the gzip writer + err = mw.Close() + if err != nil { + return nil, fmt.Errorf("failed to close multipart writer: %w", err) + } + + err = gzipped.Close() + if err != nil { + return nil, fmt.Errorf("failed to close gzip writer: %w", err) + } + + r, err := http.NewRequestWithContext(ctx, http.MethodPost, d.intakeURL, b) + if err != nil { + log.Error("Failed to create request", err) + return nil, err + } + + r.Header.Set("DD-API-KEY", d.ddAPIKey) + r.Header.Set("DD-EVP-ORIGIN", "otel-profiling-agent") + r.Header.Set("DD-EVP-ORIGIN-VERSION", vc.Version()) + r.Header.Set("Content-Type", mw.FormDataContentType()) + r.Header.Set("Content-Encoding", "gzip") + return r, nil +} diff --git a/symbolication/iface.go b/symbolication/iface.go new file mode 100644 index 0000000..fdafbce --- /dev/null +++ b/symbolication/iface.go @@ -0,0 +1,11 @@ +package symbolication + +import ( + "context" + "github.com/elastic/otel-profiling-agent/libpf" + "github.com/elastic/otel-profiling-agent/libpf/pfelf" +) + +type Uploader interface { + HandleExecutable(ctx context.Context, elfRef *pfelf.Reference, fileID libpf.FileID) error +} diff --git a/symbolication/uploader.go b/symbolication/uploader.go new file mode 100644 index 0000000..cea1ae9 --- /dev/null +++ b/symbolication/uploader.go @@ -0,0 +1,20 @@ +package symbolication + +import ( + "context" + "github.com/elastic/otel-profiling-agent/libpf" + "github.com/elastic/otel-profiling-agent/libpf/pfelf" +) + +var _ Uploader = (*NoopUploader)(nil) + +type NoopUploader struct{} + +func (n *NoopUploader) HandleExecutable(_ context.Context, _ *pfelf.Reference, + _ libpf.FileID) error { + return nil +} + +func NewNoopUploader() Uploader { + return &NoopUploader{} +} diff --git a/tracer/tracer.go b/tracer/tracer.go index 7ef9a89..7aaeb6d 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -42,6 +42,7 @@ import ( pmebpf "github.com/elastic/otel-profiling-agent/processmanager/ebpf" "github.com/elastic/otel-profiling-agent/reporter" "github.com/elastic/otel-profiling-agent/support" + "github.com/elastic/otel-profiling-agent/symbolication" ) /* @@ -200,8 +201,8 @@ func collectIntervalCacheMetrics(ctx context.Context, cache nativeunwind.Interva // NewTracer loads eBPF code and map definitions from the ELF module at the configured // path. -func NewTracer(ctx context.Context, rep reporter.SymbolReporter, intervals Intervals, - includeTracers []bool, filterErrorFrames bool) (*Tracer, error) { +func NewTracer(ctx context.Context, rep reporter.SymbolReporter, uploader symbolication.Uploader, + intervals Intervals, includeTracers []bool, filterErrorFrames bool) (*Tracer, error) { kernelSymbols, err := proc.GetKallsyms("/proc/kallsyms") if err != nil { return nil, fmt.Errorf("failed to read kernel symbols: %v", err) @@ -236,7 +237,7 @@ func NewTracer(ctx context.Context, rep reporter.SymbolReporter, intervals Inter hasBatchOperations := ebpfHandler.SupportsGenericBatchOperations() processManager, err := pm.New(ctx, includeTracers, intervals.MonitorInterval(), ebpfHandler, - nil, rep, localStackDeltaProvider, filterErrorFrames) + nil, rep, uploader, localStackDeltaProvider, filterErrorFrames) if err != nil { return nil, fmt.Errorf("failed to create processManager: %v", err) } diff --git a/utils/coredump/coredump.go b/utils/coredump/coredump.go index 2a70254..2b83da0 100644 --- a/utils/coredump/coredump.go +++ b/utils/coredump/coredump.go @@ -231,7 +231,7 @@ func ExtractTraces(ctx context.Context, pr process.Process, debug bool, } manager, err := pm.New(todo, includeTracers, monitorInterval, &coredumpEbpfMaps, - pm.NewMapFileIDMapper(), symCache, coredumpOpener, false) + pm.NewMapFileIDMapper(), symCache, nil, coredumpOpener, false) if err != nil { return nil, fmt.Errorf("failed to get Interpreter manager: %v", err) } diff --git a/utils/coredump/storecoredump.go b/utils/coredump/storecoredump.go index 0f16077..ee883eb 100644 --- a/utils/coredump/storecoredump.go +++ b/utils/coredump/storecoredump.go @@ -54,7 +54,7 @@ func (scd *StoreCoredump) OpenMappingFile(m *process.Mapping) (process.ReadAtClo func (scd *StoreCoredump) OpenELF(path string) (*pfelf.File, error) { file, err := scd.openFile(path) if err == nil { - return pfelf.NewFile(file, 0, false) + return pfelf.NewFile(path, file, 0, false) } if !errors.Is(err, os.ErrNotExist) { return nil, err @@ -70,7 +70,7 @@ func OpenStoreCoredump(store *modulestore.Store, coreFileRef modulestore.ID, mod if err != nil { return nil, fmt.Errorf("failed to open coredump file reader: %w", err) } - coreELF, err := pfelf.NewFile(reader, 0, false) + coreELF, err := pfelf.NewFile("", reader, 0, false) if err != nil { return nil, fmt.Errorf("failed to open coredump ELF: %w", err) }