diff --git a/README.md b/README.md index 2a914cd..5aa4dd1 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,18 @@ sudo otel-profiling-agent -tags 'service:myservice;remote_symbols:yes' -collecti For this to work you need to run a Datadog agent that listens for APM traffic at `localhost:8126`. If your agent is reachable under a different address, you can modify the `-collection-agent` parameter accordingly. +## Configuration + +### Local symbol upload (Experimental) + +For compiled languages (C/C++/Rust/Go), the profiling-agent can upload local symbols (when available) to Datadog for symbolication. Symbols need to be available locally (unstripped binaries). + +To enable local symbol upload: +1. Set the `DD_EXPERIMENTAL_LOCAL_SYMBOL_UPLOAD` environment variable to `true`. +2. Provide a Datadog API key through the `DD_API_KEY` environment variable. +3. Set the `DD_SITE` environment variable to [your Datadog site](https://docs.datadoghq.com/getting_started/site/#access-the-datadog-site) (e.g. `datadoghq.com`). + + ## Development A `docker-compose.yml` file is provided to help run the agent in a container for local development. @@ -45,6 +57,7 @@ DD_API_KEY=your-api-key # required DD_SITE=datadoghq.com # optional, defaults to "datadoghq.com" OTEL_PROFILING_AGENT_SERVICE=my-service # optional, defaults to "otel-profiling-agent-dev" OTEL_PROFILING_AGENT_REPORTER_INTERVAL=10s # optional, defaults to 60s +DD_EXPERIMENTAL_LOCAL_SYMBOL_UPLOAD=true # optional, defaults to false ``` Then, you can run the agent with the following command: diff --git a/docker-compose.yml b/docker-compose.yml index 8eba0f4..0afcac8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,9 @@ services: - arch=${ARCH:?error} privileged: true pid: "host" + environment: + DD_SITE: ${DD_SITE:-datadoghq.com} + DD_EXPERIMENTAL_LOCAL_SYMBOL_UPLOAD: ${DD_EXPERIMENTAL_LOCAL_SYMBOL_UPLOAD:-false} volumes: - .:/agent - /var/run/docker.sock:/var/run/docker.sock:ro @@ -26,7 +29,7 @@ services: - /sys/fs/cgroup/:/host/sys/fs/cgroup:ro secrets: - dd-api-key - entrypoint: [ '/bin/sh', '-c', 'export DD_API_KEY=$$(cat /run/secrets/dd-api-key) ; /bin/entrypoint.sh' ] + entrypoint: ['/bin/sh', '-c', 'export DD_API_KEY=$$(cat /run/secrets/dd-api-key) ; /bin/entrypoint.sh'] secrets: dd-api-key: diff --git a/libpf/pfelf/file.go b/libpf/pfelf/file.go index af62439..f3cef1c 100644 --- a/libpf/pfelf/file.go +++ b/libpf/pfelf/file.go @@ -31,6 +31,7 @@ import ( "io" "os" "path/filepath" + "slices" "sort" "syscall" "unsafe" @@ -49,8 +50,12 @@ const ( // parsed sections (e.g. symbol tables and string tables; libxul // has about 4MB .dynstr) maxBytesLargeSection = 16 * 1024 * 1024 + buildIDSectionName = ".note.gnu.build-id" ) +var debugStrSectionNames = []string{".debug_str", ".zdebug_str", ".debug_str.dwo"} +var debugInfoSectionNames = []string{".debug_info", ".zdebug_info"} + // ErrSymbolNotFound is returned when requested symbol was not found var ErrSymbolNotFound = errors.New("symbol not found") @@ -109,6 +114,11 @@ type File struct { // bias is the load bias for ELF files inside core dump bias libpf.Address + // filePath is the path of the ELF file as opened by os.Open() + // This can be a path to a mapping file, or a path to the original ELF binary. + // This is empty is the file is opened from a coredump. + filePath string + // InsideCore indicates that this ELF is mapped from a coredump ELF InsideCore bool @@ -168,7 +178,7 @@ func Open(name string) (*File, error) { return nil, err } - ff, err := newFile(buffered, f, 0, false) + ff, err := newFile(name, buffered, f, 0, false) if err != nil { f.Close() return nil, err @@ -186,13 +196,15 @@ func (f *File) Close() (err error) { } // NewFile creates a new ELF file object that borrows the given reader. -func NewFile(r io.ReaderAt, loadAddress uint64, hasMusl bool) (*File, error) { - return newFile(r, nil, loadAddress, hasMusl) +func NewFile(path string, r io.ReaderAt, loadAddress uint64, hasMusl bool) (*File, error) { + return newFile(path, r, nil, loadAddress, hasMusl) } -func newFile(r io.ReaderAt, closer io.Closer, loadAddress uint64, hasMusl bool) (*File, error) { +func newFile(path string, r io.ReaderAt, closer io.Closer, + loadAddress uint64, hasMusl bool) (*File, error) { f := &File{ elfReader: r, + filePath: path, InsideCore: loadAddress != 0, closer: closer, } @@ -971,3 +983,49 @@ func (f *File) DynString(tag elf.DynTag) ([]string, error) { func (f *File) IsGolang() bool { return f.Section(".go.buildinfo") != nil || f.Section(".gopclntab") != nil } + +func (f *File) FilePath() (string, error) { + if f.InsideCore { + return "", errors.New("file path not available for ELF inside coredump") + } + return f.filePath, nil +} + +// HasDWARFData is a copy of pfelf.HasDWARFData, but for the libpf.File interface. +func (f *File) HasDWARFData() bool { + hasBuildID := false + hasDebugStr := false + for _, section := range f.Sections { + // NOBITS indicates that the section is actually empty, regardless of the size in the + // section header. + if section.Type == elf.SHT_NOBITS { + continue + } + + if section.Name == buildIDSectionName { + hasBuildID = true + } + + if slices.Contains(debugStrSectionNames, section.Name) { + hasDebugStr = section.Size > 0 + } + + // Some files have suspicious near-empty, partially stripped sections; consider them as not + // having DWARF data. + // The simplest binary gcc 10 can generate ("return 0") has >= 48 bytes for each section. + // Let's not worry about executables that may not verify this, as they would not be of + // interest to us. + if section.Size < 32 { + continue + } + + if slices.Contains(debugInfoSectionNames, section.Name) { + return true + } + } + + // Some alternate debug files only have a .debug_str section. For these we want to return true. + // Use the absence of program headers and presence of a Build ID as heuristic to identify + // alternate debug files. + return len(f.Progs) == 0 && hasBuildID && hasDebugStr +} diff --git a/main.go b/main.go index 2fd9742..913d93d 100644 --- a/main.go +++ b/main.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/otel-profiling-agent/metrics/agentmetrics" "github.com/elastic/otel-profiling-agent/reporter" + "github.com/elastic/otel-profiling-agent/symbolication" "github.com/elastic/otel-profiling-agent/tracer" log "github.com/sirupsen/logrus" @@ -343,8 +344,23 @@ func mainWithExitCode() exitCode { // Start reporter metric reporting with 60 second intervals. defer reportermetrics.Start(mainCtx, rep, 60*time.Second)() + uploader := symbolication.NewNoopUploader() + + ddSymbolUpload := os.Getenv("DD_EXPERIMENTAL_LOCAL_SYMBOL_UPLOAD") + if ddSymbolUpload == "true" { + log.Infof("Enabling Datadog local symbol upload") + uploader, err = symbolication.NewDatadogUploader() + if err != nil { + log.Errorf( + "Failed to create Datadog symbol uploader, symbol upload will be disabled: %v", + err, + ) + uploader = symbolication.NewNoopUploader() + } + } + // Load the eBPF code and map definitions - trc, err := tracer.NewTracer(mainCtx, rep, times, includeTracers, !argSendErrorFrames) + trc, err := tracer.NewTracer(mainCtx, rep, uploader, times, includeTracers, !argSendErrorFrames) if err != nil { msg := fmt.Sprintf("Failed to load eBPF tracer: %s", err) log.Error(msg) diff --git a/process/coredump.go b/process/coredump.go index 80534ed..3215c80 100644 --- a/process/coredump.go +++ b/process/coredump.go @@ -564,5 +564,5 @@ func (cf *CoredumpFile) ReadAt(p []byte, addr int64) (int, error) { // The returned `pfelf.File` is borrowing the coredump file. Closing it will not close the // underlying CoredumpFile. func (cf *CoredumpFile) OpenELF() (*pfelf.File, error) { - return pfelf.NewFile(cf, cf.Base, cf.parent.hasMusl) + return pfelf.NewFile(cf.Name, cf, cf.Base, cf.parent.hasMusl) } diff --git a/process/process.go b/process/process.go index 3b37200..d2d6d81 100644 --- a/process/process.go +++ b/process/process.go @@ -248,7 +248,7 @@ func (sp *systemProcess) OpenELF(file string) (*pfelf.File, error) { if err != nil { return nil, fmt.Errorf("failed to extract VDSO: %v", err) } - return pfelf.NewFile(vdso, 0, false) + return pfelf.NewFile(file, vdso, 0, false) } return pfelf.Open(sp.getMappingFile(m)) } diff --git a/processmanager/execinfomanager/manager.go b/processmanager/execinfomanager/manager.go index fa2a72f..cafeef7 100644 --- a/processmanager/execinfomanager/manager.go +++ b/processmanager/execinfomanager/manager.go @@ -7,6 +7,7 @@ package execinfomanager import ( + "context" "errors" "fmt" "os" @@ -35,6 +36,7 @@ import ( sdtypes "github.com/elastic/otel-profiling-agent/nativeunwind/stackdeltatypes" pmebpf "github.com/elastic/otel-profiling-agent/processmanager/ebpf" "github.com/elastic/otel-profiling-agent/support" + "github.com/elastic/otel-profiling-agent/symbolication" "github.com/elastic/otel-profiling-agent/tpbase" "github.com/elastic/otel-profiling-agent/util" ) @@ -90,6 +92,9 @@ type ExecutableInfoManager struct { // sdp allows fetching stack deltas for executables. sdp nativeunwind.StackDeltaProvider + // uploader is used to upload symbolication data. + uploader symbolication.Uploader + // state bundles up all mutable state of the manager. state xsync.RWMutex[executableInfoManagerState] @@ -101,6 +106,7 @@ type ExecutableInfoManager struct { // NewExecutableInfoManager creates a new instance of the executable info manager. func NewExecutableInfoManager( sdp nativeunwind.StackDeltaProvider, + uploader symbolication.Uploader, ebpf pmebpf.EbpfHandler, includeTracers config.IncludedTracers, ) (*ExecutableInfoManager, error) { @@ -138,7 +144,8 @@ func NewExecutableInfoManager( deferredFileIDs.SetLifetime(deferredFileIDTimeout) return &ExecutableInfoManager{ - sdp: sdp, + sdp: sdp, + uploader: uploader, state: xsync.NewRWMutex(executableInfoManagerState{ interpreterLoaders: interpreterLoaders, executables: map[host.FileID]*entry{}, @@ -154,9 +161,9 @@ func NewExecutableInfoManager( // // The return value is copied instead of returning a pointer in order to spare us the use // of getters and more complicated locking semantics. -func (mgr *ExecutableInfoManager) AddOrIncRef(fileID host.FileID, +func (mgr *ExecutableInfoManager) AddOrIncRef(hostFileID host.FileID, fileID libpf.FileID, elfRef *pfelf.Reference) (ExecutableInfo, error) { - if _, exists := mgr.deferredFileIDs.Get(fileID); exists { + if _, exists := mgr.deferredFileIDs.Get(hostFileID); exists { return ExecutableInfo{}, ErrDeferredFileID } var ( @@ -169,7 +176,7 @@ func (mgr *ExecutableInfoManager) AddOrIncRef(fileID host.FileID, // Fast path for executable info that is already present. state := mgr.state.WLock() - info, ok := state.executables[fileID] + info, ok := state.executables[hostFileID] if ok { defer mgr.state.WUnlock(&state) info.rc++ @@ -180,10 +187,11 @@ func (mgr *ExecutableInfoManager) AddOrIncRef(fileID host.FileID, // so we release the lock before doing this. mgr.state.WUnlock(&state) - if err = mgr.sdp.GetIntervalStructuresForFile(fileID, elfRef, &intervalData); err != nil { + if err = mgr.sdp.GetIntervalStructuresForFile(hostFileID, elfRef, &intervalData); err != nil { if !errors.Is(err, os.ErrNotExist) { - mgr.deferredFileIDs.Add(fileID, libpf.Void{}) + mgr.deferredFileIDs.Add(hostFileID, libpf.Void{}) } + return ExecutableInfo{}, fmt.Errorf("failed to extract interval data: %w", err) } @@ -197,21 +205,20 @@ func (mgr *ExecutableInfoManager) AddOrIncRef(fileID host.FileID, // Re-take the lock and check whether another thread beat us to // inserting the data while we were waiting for the write lock. state = mgr.state.WLock() - defer mgr.state.WUnlock(&state) - if info, ok = state.executables[fileID]; ok { + if info, ok = state.executables[hostFileID]; ok { info.rc++ return info.ExecutableInfo, nil } // Load the data into BPF maps. - ref, gaps, err = state.loadDeltas(fileID, intervalData.Deltas) + ref, gaps, err = state.loadDeltas(hostFileID, intervalData.Deltas) if err != nil { - mgr.deferredFileIDs.Add(fileID, libpf.Void{}) + mgr.deferredFileIDs.Add(hostFileID, libpf.Void{}) return ExecutableInfo{}, fmt.Errorf("failed to load deltas: %w", err) } // Create the LoaderInfo for interpreter detection - loaderInfo := interpreter.NewLoaderInfo(fileID, elfRef, gaps) + loaderInfo := interpreter.NewLoaderInfo(hostFileID, elfRef, gaps) // Insert a corresponding record into our map. info = &entry{ @@ -222,7 +229,19 @@ func (mgr *ExecutableInfoManager) AddOrIncRef(fileID host.FileID, mapRef: ref, rc: 1, } - state.executables[fileID] = info + state.executables[hostFileID] = info + mgr.state.WUnlock(&state) + + // Processing symbols for upload can take a while, so we release the lock + // before doing this. + // We also use a timeout to avoid blocking the process manager for too long. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + err = mgr.uploader.HandleExecutable(ctx, elfRef, fileID) + if err != nil { + log.Errorf("Failed to handle executable %v: %v", elfRef.FileName(), err) + } return info.ExecutableInfo, nil } diff --git a/processmanager/manager.go b/processmanager/manager.go index bc506cf..62088a4 100644 --- a/processmanager/manager.go +++ b/processmanager/manager.go @@ -29,6 +29,7 @@ import ( pmebpf "github.com/elastic/otel-profiling-agent/processmanager/ebpf" eim "github.com/elastic/otel-profiling-agent/processmanager/execinfomanager" "github.com/elastic/otel-profiling-agent/reporter" + "github.com/elastic/otel-profiling-agent/symbolication" "github.com/elastic/otel-profiling-agent/tracehandler" "github.com/elastic/otel-profiling-agent/traceutil" "github.com/elastic/otel-profiling-agent/util" @@ -68,7 +69,8 @@ var ( // implementation. func New(ctx context.Context, includeTracers config.IncludedTracers, monitorInterval time.Duration, ebpf pmebpf.EbpfHandler, fileIDMapper FileIDMapper, symbolReporter reporter.SymbolReporter, - sdp nativeunwind.StackDeltaProvider, filterErrorFrames bool) (*ProcessManager, error) { + uploader symbolication.Uploader, sdp nativeunwind.StackDeltaProvider, + filterErrorFrames bool) (*ProcessManager, error) { if fileIDMapper == nil { var err error fileIDMapper, err = newFileIDMapper(lruFileIDCacheSize) @@ -84,7 +86,7 @@ func New(ctx context.Context, includeTracers config.IncludedTracers, monitorInte } elfInfoCache.SetLifetime(elfInfoCacheTTL) - em, err := eim.NewExecutableInfoManager(sdp, ebpf, includeTracers) + em, err := eim.NewExecutableInfoManager(sdp, uploader, ebpf, includeTracers) if err != nil { return nil, fmt.Errorf("unable to create ExecutableInfoManager: %v", err) } diff --git a/processmanager/manager_test.go b/processmanager/manager_test.go index 7c6bca6..45df32d 100644 --- a/processmanager/manager_test.go +++ b/processmanager/manager_test.go @@ -31,6 +31,7 @@ import ( pmebpf "github.com/elastic/otel-profiling-agent/processmanager/ebpf" "github.com/elastic/otel-profiling-agent/remotememory" "github.com/elastic/otel-profiling-agent/reporter" + "github.com/elastic/otel-profiling-agent/symbolication" "github.com/elastic/otel-profiling-agent/traceutil" "github.com/elastic/otel-profiling-agent/util" @@ -322,6 +323,7 @@ func TestInterpreterConvertTrace(t *testing.T) { nil, &symbolReporterMockup{}, nil, + nil, true) require.NoError(t, err) @@ -416,6 +418,7 @@ func TestNewMapping(t *testing.T) { ebpfMockup, NewMapFileIDMapper(), symRepMockup, + symbolication.NewNoopUploader(), &dummyProvider, true) require.NoError(t, err) @@ -606,6 +609,7 @@ func TestProcExit(t *testing.T) { ebpfMockup, NewMapFileIDMapper(), repMockup, + symbolication.NewNoopUploader(), &dummyProvider, true) require.NoError(t, err) diff --git a/processmanager/processinfo.go b/processmanager/processinfo.go index f3b50b5..81070cd 100644 --- a/processmanager/processinfo.go +++ b/processmanager/processinfo.go @@ -214,8 +214,15 @@ func (pm *ProcessManager) handleNewInterpreter(pr process.Process, m *Mapping, // handleNewMapping processes new file backed mappings func (pm *ProcessManager) handleNewMapping(pr process.Process, m *Mapping, elfRef *pfelf.Reference) error { + fileID, ok := pm.FileIDMapper.Get(m.FileID) + if !ok { + log.Debugf("file ID lookup failed for PID %d, file ID %d", + pr.PID(), m.FileID) + fileID = libpf.UnsymbolizedFileID + } + // Resolve executable info first - ei, err := pm.eim.AddOrIncRef(m.FileID, elfRef) + ei, err := pm.eim.AddOrIncRef(m.FileID, fileID, elfRef) if err != nil { return err } diff --git a/symbolication/datadog_uploader.go b/symbolication/datadog_uploader.go new file mode 100644 index 0000000..f166c55 --- /dev/null +++ b/symbolication/datadog_uploader.go @@ -0,0 +1,253 @@ +package symbolication + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "mime/multipart" + "net/http" + "net/textproto" + "net/url" + "os" + "os/exec" + "runtime" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/elastic/otel-profiling-agent/libpf" + "github.com/elastic/otel-profiling-agent/libpf/pfelf" + "github.com/elastic/otel-profiling-agent/vc" +) + +const sourceMapEndpoint = "/api/v2/srcmap" + +type DatadogUploader struct { + ddAPIKey string + intakeURL string +} + +var _ Uploader = (*DatadogUploader)(nil) + +func NewDatadogUploader() (Uploader, error) { + err := exec.Command("objcopy", "--version").Run() + if err != nil { + return nil, fmt.Errorf("objcopy is not available: %w", err) + } + + ddAPIKey := os.Getenv("DD_API_KEY") + if ddAPIKey == "" { + return nil, errors.New("DD_API_KEY is not set") + } + + ddSite := os.Getenv("DD_SITE") + if ddSite == "" { + return nil, errors.New("DD_SITE is not set") + } + + intakeURL, err := url.JoinPath("https://sourcemap-intake."+ddSite, sourceMapEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to parse URL: %w", err) + } + + return &DatadogUploader{ + ddAPIKey: ddAPIKey, + intakeURL: intakeURL, + }, nil +} + +func (d *DatadogUploader) HandleExecutable(ctx context.Context, elfRef *pfelf.Reference, + fileID libpf.FileID) error { + fileName := elfRef.FileName() + ef, err := elfRef.GetELF() + // If the ELF file is not found, we ignore it + // This can happen for short-lived processes that are already gone by the time + // we try to upload symbols + if err != nil { + log.Debugf("Skipping symbol upload for executable %s: %v", + fileName, err) + return nil + } + + // We only upload symbols for executables that have DWARF data + if !ef.HasDWARFData() { + log.Debugf("Skipping symbol upload for executable %s as it does not have DWARF data", + fileName) + return nil + } + + e, err := newExecutableMetadata(fileName, ef, fileID) + if err != nil { + return err + } + + inputFilePath, err := ef.FilePath() + if err != nil { + return fmt.Errorf("failed to get ELF file path: %w", err) + } + + symbolFile, err := os.CreateTemp("", "objcopy-debug") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + + err = d.copySymbols(ctx, inputFilePath, symbolFile.Name()) + if err != nil { + return fmt.Errorf("failed to copy symbols: %w", err) + } + + // TODO: + // This will launch a goroutine to upload the symbols, per executable + // which would potentially lead to a large number of goroutines + // if there are many executables. + // Ideally, we should limit the number of concurrent uploads + go func() { + d.uploadSymbols(symbolFile, e) + symbolFile.Close() + os.Remove(symbolFile.Name()) + }() + + return nil +} + +type executableMetadata struct { + Arch string `json:"arch"` + GNUBuildID string `json:"gnu_build_id"` + GoBuildID string `json:"go_build_id"` + FileHash string `json:"file_hash"` + Platform string `json:"platform"` + Type string `json:"type"` + + fileName string +} + +func newExecutableMetadata(fileName string, elf *pfelf.File, + fileID libpf.FileID) (*executableMetadata, error) { + buildID, err := elf.GetBuildID() + if err != nil { + return nil, fmt.Errorf("failed to get build id: %w", err) + } + + goBuildID := "" + if elf.IsGolang() { + goBuildID, err = elf.GetGoBuildID() + if err != nil { + return nil, fmt.Errorf("failed to get go build id: %w", err) + } + } + + return &executableMetadata{ + Arch: runtime.GOARCH, + GNUBuildID: buildID, + GoBuildID: goBuildID, + FileHash: fileID.StringNoQuotes(), + Platform: "elf", + Type: "elf_symbol_file", + + fileName: fileName, + }, nil +} + +func (d *DatadogUploader) copySymbols(ctx context.Context, inputPath, outputPath string) error { + args := []string{ + "--only-keep-debug", + "--remove-section=.gdb_index", + inputPath, + outputPath, + } + err := exec.CommandContext(ctx, "objcopy", args...).Run() + if err != nil { + return fmt.Errorf("failed to extract debug symbols: %w", err) + } + return nil +} + +func (d *DatadogUploader) uploadSymbols(symbolFile *os.File, e *executableMetadata) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + req, err := d.buildSymbolUploadRequest(ctx, symbolFile, e) + if err != nil { + log.Errorf("Failed to build symbol upload request: %v", err) + return + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Errorf("Failed to upload symbols: %v", err) + return + } + + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + respBody, _ := io.ReadAll(resp.Body) + + log.Errorf("Failed to upload symbols: %s, %s", resp.Status, string(respBody)) + return + } + + log.Infof("Symbols uploaded successfully for executable: %+v", e) +} + +func (d *DatadogUploader) buildSymbolUploadRequest(ctx context.Context, symbolFile *os.File, + e *executableMetadata) (*http.Request, error) { + b := new(bytes.Buffer) + + gzipped := gzip.NewWriter(b) + + mw := multipart.NewWriter(gzipped) + + // Copy the symbol file into the multipart writer + filePart, err := mw.CreateFormFile("elf_symbol_file", "elf_symbol_file") + if err != nil { + return nil, fmt.Errorf("failed to create form file: %w", err) + } + + _, err = io.Copy(filePart, symbolFile) + if err != nil { + return nil, fmt.Errorf("failed to copy symbol file: %w", err) + } + + // Write the event metadata into the multipart writer + eventPart, err := mw.CreatePart(textproto.MIMEHeader{ + "Content-Disposition": []string{`form-data; name="event"; filename="event.json"`}, + "Content-Type": []string{"application/json"}, + }) + if err != nil { + return nil, fmt.Errorf("failed to create event part: %w", err) + } + + err = json.NewEncoder(eventPart).Encode(e) + if err != nil { + return nil, fmt.Errorf("failed to write JSON metadata: %w", err) + } + + // Close the multipart writer then the gzip writer + err = mw.Close() + if err != nil { + return nil, fmt.Errorf("failed to close multipart writer: %w", err) + } + + err = gzipped.Close() + if err != nil { + return nil, fmt.Errorf("failed to close gzip writer: %w", err) + } + + r, err := http.NewRequestWithContext(ctx, http.MethodPost, d.intakeURL, b) + if err != nil { + log.Error("Failed to create request", err) + return nil, err + } + + r.Header.Set("DD-API-KEY", d.ddAPIKey) + r.Header.Set("DD-EVP-ORIGIN", "otel-profiling-agent") + r.Header.Set("DD-EVP-ORIGIN-VERSION", vc.Version()) + r.Header.Set("Content-Type", mw.FormDataContentType()) + r.Header.Set("Content-Encoding", "gzip") + return r, nil +} diff --git a/symbolication/iface.go b/symbolication/iface.go new file mode 100644 index 0000000..e5b1513 --- /dev/null +++ b/symbolication/iface.go @@ -0,0 +1,12 @@ +package symbolication + +import ( + "context" + + "github.com/elastic/otel-profiling-agent/libpf" + "github.com/elastic/otel-profiling-agent/libpf/pfelf" +) + +type Uploader interface { + HandleExecutable(ctx context.Context, elfRef *pfelf.Reference, fileID libpf.FileID) error +} diff --git a/symbolication/uploader.go b/symbolication/uploader.go new file mode 100644 index 0000000..1df34da --- /dev/null +++ b/symbolication/uploader.go @@ -0,0 +1,21 @@ +package symbolication + +import ( + "context" + + "github.com/elastic/otel-profiling-agent/libpf" + "github.com/elastic/otel-profiling-agent/libpf/pfelf" +) + +var _ Uploader = (*NoopUploader)(nil) + +type NoopUploader struct{} + +func (n *NoopUploader) HandleExecutable(_ context.Context, _ *pfelf.Reference, + _ libpf.FileID) error { + return nil +} + +func NewNoopUploader() Uploader { + return &NoopUploader{} +} diff --git a/tools/coredump/coredump.go b/tools/coredump/coredump.go index 056c700..794e38c 100644 --- a/tools/coredump/coredump.go +++ b/tools/coredump/coredump.go @@ -202,7 +202,7 @@ func ExtractTraces(ctx context.Context, pr process.Process, debug bool, includeTracers, _ := config.ParseTracers("all") manager, err := pm.New(todo, includeTracers, monitorInterval, &coredumpEbpfMaps, - pm.NewMapFileIDMapper(), symCache, elfunwindinfo.NewStackDeltaProvider(), false) + pm.NewMapFileIDMapper(), symCache, nil, elfunwindinfo.NewStackDeltaProvider(), false) if err != nil { return nil, fmt.Errorf("failed to get Interpreter manager: %v", err) } diff --git a/tools/coredump/storecoredump.go b/tools/coredump/storecoredump.go index 5c9bdcc..5204302 100644 --- a/tools/coredump/storecoredump.go +++ b/tools/coredump/storecoredump.go @@ -54,7 +54,7 @@ func (scd *StoreCoredump) OpenMappingFile(m *process.Mapping) (process.ReadAtClo func (scd *StoreCoredump) OpenELF(path string) (*pfelf.File, error) { file, err := scd.openFile(path) if err == nil { - return pfelf.NewFile(file, 0, false) + return pfelf.NewFile(path, file, 0, false) } if !errors.Is(err, os.ErrNotExist) { return nil, err @@ -70,7 +70,7 @@ func OpenStoreCoredump(store *modulestore.Store, coreFileRef modulestore.ID, mod if err != nil { return nil, fmt.Errorf("failed to open coredump file reader: %w", err) } - coreELF, err := pfelf.NewFile(reader, 0, false) + coreELF, err := pfelf.NewFile("", reader, 0, false) if err != nil { return nil, fmt.Errorf("failed to open coredump ELF: %w", err) } diff --git a/tracer/ebpf_integration_test.go b/tracer/ebpf_integration_test.go index cec28e9..c4475c0 100644 --- a/tracer/ebpf_integration_test.go +++ b/tracer/ebpf_integration_test.go @@ -19,10 +19,10 @@ import ( cebpf "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" - "github.com/elastic/otel-profiling-agent/libpf" "github.com/elastic/otel-profiling-agent/config" "github.com/elastic/otel-profiling-agent/host" hostmeta "github.com/elastic/otel-profiling-agent/hostmetadata/host" + "github.com/elastic/otel-profiling-agent/libpf" "github.com/elastic/otel-profiling-agent/rlimit" "github.com/elastic/otel-profiling-agent/support" "github.com/elastic/otel-profiling-agent/util" @@ -137,7 +137,7 @@ func TestTraceTransmissionAndParsing(t *testing.T) { enabledTracers, _ := config.ParseTracers("") enabledTracers.Enable(config.PythonTracer) - tracer, err := NewTracer(ctx, &mockReporter{}, &mockIntervals{}, enabledTracers, false) + tracer, err := NewTracer(ctx, &mockReporter{}, nil, &mockIntervals{}, enabledTracers, false) require.NoError(t, err) traceChan := make(chan *host.Trace, 16) diff --git a/tracer/tracer.go b/tracer/tracer.go index 546e718..1962f20 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -43,6 +43,7 @@ import ( "github.com/elastic/otel-profiling-agent/reporter" "github.com/elastic/otel-profiling-agent/rlimit" "github.com/elastic/otel-profiling-agent/support" + "github.com/elastic/otel-profiling-agent/symbolication" "github.com/elastic/otel-profiling-agent/tracehandler" "github.com/elastic/otel-profiling-agent/util" ) @@ -229,8 +230,9 @@ func calcFallbackModuleID(moduleSym libpf.Symbol, kernelSymbols *libpf.SymbolMap } // NewTracer loads eBPF code and map definitions from the ELF module at the configured path. -func NewTracer(ctx context.Context, rep reporter.SymbolReporter, intervals Intervals, - includeTracers config.IncludedTracers, filterErrorFrames bool) (*Tracer, error) { +func NewTracer(ctx context.Context, rep reporter.SymbolReporter, uploader symbolication.Uploader, + intervals Intervals, includeTracers config.IncludedTracers, + filterErrorFrames bool) (*Tracer, error) { kernelSymbols, err := proc.GetKallsyms("/proc/kallsyms") if err != nil { return nil, fmt.Errorf("failed to read kernel symbols: %v", err) @@ -251,7 +253,7 @@ func NewTracer(ctx context.Context, rep reporter.SymbolReporter, intervals Inter hasBatchOperations := ebpfHandler.SupportsGenericBatchOperations() processManager, err := pm.New(ctx, includeTracers, intervals.MonitorInterval(), ebpfHandler, - nil, rep, elfunwindinfo.NewStackDeltaProvider(), filterErrorFrames) + nil, rep, uploader, elfunwindinfo.NewStackDeltaProvider(), filterErrorFrames) if err != nil { return nil, fmt.Errorf("failed to create processManager: %v", err) }