diff --git a/libpf/libpf.go b/libpf/libpf.go index ffe3660..ae08b83 100644 --- a/libpf/libpf.go +++ b/libpf/libpf.go @@ -476,6 +476,7 @@ type TraceAndCounts struct { Comm string PodName string ContainerName string + PID PID } type FrameMetadata struct { diff --git a/libpf/process/coredump.go b/libpf/process/coredump.go index 1b408b8..a32cda7 100644 --- a/libpf/process/coredump.go +++ b/libpf/process/coredump.go @@ -282,6 +282,13 @@ func (cd *CoredumpProcess) GetMappingFile(_ *Mapping) string { return "" } +func (cd *CoredumpProcess) GetExecutablePath() (string, error) { + if cd.MainExecutable() == "" { + return "", errors.New("no main executable found") + } + return cd.MainExecutable(), nil +} + // CalculateMappingFileID implements the Process interface func (cd *CoredumpProcess) CalculateMappingFileID(m *Mapping) (libpf.FileID, error) { // It is not possible to calculate the real FileID as the section headers diff --git a/libpf/process/process.go b/libpf/process/process.go index 26fe8bc..580e740 100644 --- a/libpf/process/process.go +++ b/libpf/process/process.go @@ -166,6 +166,10 @@ func (sp *systemProcess) GetThreads() ([]ThreadInfo, error) { return nil, errors.New("not implemented") } +func (sp *systemProcess) GetExecutablePath() (string, error) { + return os.Readlink(fmt.Sprintf("/proc/%d/exe", sp.pid)) +} + func (sp *systemProcess) Close() error { return nil } diff --git a/libpf/process/types.go b/libpf/process/types.go index de75522..a88c56b 100644 --- a/libpf/process/types.go +++ b/libpf/process/types.go @@ -105,6 +105,9 @@ type Process interface { // GetMapping reads and parses process memory mappings GetMappings() ([]Mapping, error) + // GetExecutablePath returns the path to the executable of the process + GetExecutablePath() (string, error) + // GetThread reads the process thread states GetThreads() ([]ThreadInfo, error) diff --git a/processmanager/manager_test.go b/processmanager/manager_test.go index 7379855..693ada1 100644 --- a/processmanager/manager_test.go +++ b/processmanager/manager_test.go @@ -63,6 +63,8 @@ func (d *dummyProcess) GetMappingFile(_ *process.Mapping) string { return "" } +func (d *dummyProcess) GetExecutablePath() (string, error) { return "", nil } + func (d *dummyProcess) CalculateMappingFileID(m *process.Mapping) (libpf.FileID, error) { return pfelf.CalculateID(m.Path) } diff --git a/processmanager/processinfo.go b/processmanager/processinfo.go index c6a534e..0bdc2bc 100644 --- a/processmanager/processinfo.go +++ b/processmanager/processinfo.go @@ -614,6 +614,9 @@ func (pm *ProcessManager) SynchronizeProcess(pr process.Process) { // Also see: Unified PID Events design doc pm.ebpf.RemoveReportedPID(pid) } + + execPath, _ := pr.GetExecutablePath() + pm.reporter.ProcessMetadata(context.TODO(), pid, execPath) } // CleanupPIDs executes a periodic synchronization of pidToProcessInfo table with system processes. diff --git a/reporter/datadog_reporter.go b/reporter/datadog_reporter.go index 15a65be..c24d4c3 100644 --- a/reporter/datadog_reporter.go +++ b/reporter/datadog_reporter.go @@ -66,6 +66,9 @@ type DatadogReporter struct { // frames maps frame information to its source location. frames *lru.SyncedLRU[libpf.FileID, map[libpf.AddressOrLineno]sourceInfo] + + // execPathes stores the last known execPath for a PID. + execPathes *lru.SyncedLRU[libpf.PID, string] } // ReportFramesForTrace accepts a trace with the corresponding frames @@ -94,7 +97,7 @@ func (r *DatadogReporter) ReportFramesForTrace(trace *libpf.Trace) { // caches this information. // nolint: dupl func (r *DatadogReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp libpf.UnixTime32, - count uint16, comm, podName, containerName string) { + count uint16, comm, podName, containerName string, pid libpf.PID) { if v, exists := r.traces.Peek(traceHash); exists { // As traces is filled from two different API endpoints, // some information for the trace might be available already. @@ -103,6 +106,7 @@ func (r *DatadogReporter) ReportCountForTrace(traceHash libpf.TraceHash, timesta v.comm = comm v.podName = podName v.containerName = containerName + v.pid = pid r.traces.Add(traceHash, v) } else { @@ -110,6 +114,7 @@ func (r *DatadogReporter) ReportCountForTrace(traceHash libpf.TraceHash, timesta comm: comm, podName: podName, containerName: containerName, + pid: pid, }) } @@ -144,6 +149,10 @@ func (r *DatadogReporter) ExecutableMetadata(_ context.Context, }) } +func (r *DatadogReporter) ProcessMetadata(_ context.Context, pid libpf.PID, execPath string) { + r.execPathes.Add(pid, execPath) +} + // FrameMetadata accepts metadata associated with a frame and caches this information. func (r *DatadogReporter) FrameMetadata(fileID libpf.FileID, addressOrLine libpf.AddressOrLineno, lineNumber libpf.SourceLineno, functionOffset uint32, functionName, filePath string) { @@ -241,6 +250,11 @@ func StartDatadog(mainCtx context.Context, c *Config) (Reporter, error) { return nil, err } + execPathes, err := lru.NewSynced[libpf.PID, string](cacheSize, libpf.PID.Hash32) + if err != nil { + return nil, err + } + // Next step: Dynamically configure the size of this LRU. // Currently we use the length of the JSON array in // hostmetadata/hostmetadata.json. @@ -261,6 +275,7 @@ func StartDatadog(mainCtx context.Context, c *Config) (Reporter, error) { executables: executables, frames: frames, hostmetadata: hostmetadata, + execPathes: execPathes, } // Create a child context for reporting features @@ -511,6 +526,23 @@ func (r *DatadogReporter) getPprofProfile() (profile *pprofile.Profile, sample.Location = append(sample.Location, loc) } + execPath, _ := r.execPathes.Get(trace.pid) + + // Check if the last frame is a kernel frame. + if trace.frameTypes[len(trace.frameTypes)-1] == libpf.KernelFrame { + // If the last frame is a kernel frame, we need to add a dummy + // location with the kernel as the function name. + execPath = "kernel" + } + + if execPath != "" { + lastLocation := sample.Location[len(sample.Location)-1] + loc := createPProfLocation(profile, lastLocation.Address) + m := createPprofFunctionEntry(funcMap, profile, execPath, execPath) + loc.Line = append(loc.Line, pprofile.Line{Function: m}) + sample.Location = append(sample.Location, loc) + } + sample.Label = make(map[string][]string) addTraceLabels(sample.Label, trace) diff --git a/reporter/iface.go b/reporter/iface.go index 1828120..4d7360a 100644 --- a/reporter/iface.go +++ b/reporter/iface.go @@ -48,7 +48,7 @@ type TraceReporter interface { // ReportCountForTrace accepts a hash of a trace with a corresponding count and // caches this information before a periodic reporting to the backend. ReportCountForTrace(traceHash libpf.TraceHash, timestamp libpf.UnixTime32, - count uint16, comm, podName, containerName string) + count uint16, comm, podName, containerName string, pid libpf.PID) } type SymbolReporter interface { @@ -63,6 +63,10 @@ type SymbolReporter interface { // a periodic reporting to the backend. FrameMetadata(fileID libpf.FileID, addressOrLine libpf.AddressOrLineno, lineNumber libpf.SourceLineno, functionOffset uint32, functionName, filePath string) + + // ProcessMetadata accepts metadata associated with a process and caches this information + // before a periodic reporting to the backend. + ProcessMetadata(ctx context.Context, pid libpf.PID, exe string) } type HostMetadataReporter interface { diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index e8c8b9e..ffd06c3 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -39,6 +39,7 @@ type traceInfo struct { podName string containerName string apmServiceName string + pid libpf.PID } // sample holds dynamic information about traces. @@ -101,6 +102,9 @@ type OTLPReporter struct { // frames maps frame information to its source location. frames *lru.SyncedLRU[libpf.FileID, map[libpf.AddressOrLineno]sourceInfo] + + // execPathes stores the last known execPath for a PID. + execPathes *lru.SyncedLRU[libpf.PID, string] } // hashString is a helper function for LRUs that use string as a key. @@ -136,7 +140,7 @@ func (r *OTLPReporter) ReportFramesForTrace(trace *libpf.Trace) { // caches this information. // nolint: dupl func (r *OTLPReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp libpf.UnixTime32, - count uint16, comm, podName, containerName string) { + count uint16, comm, podName, containerName string, pid libpf.PID) { if v, exists := r.traces.Peek(traceHash); exists { // As traces is filled from two different API endpoints, // some information for the trace might be available already. @@ -145,6 +149,7 @@ func (r *OTLPReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp v.comm = comm v.podName = podName v.containerName = containerName + v.pid = pid r.traces.Add(traceHash, v) } else { @@ -152,6 +157,7 @@ func (r *OTLPReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp comm: comm, podName: podName, containerName: containerName, + pid: pid, }) } @@ -186,6 +192,10 @@ func (r *OTLPReporter) ExecutableMetadata(_ context.Context, }) } +func (r *OTLPReporter) ProcessMetadata(_ context.Context, pid libpf.PID, execPath string) { + r.execPathes.Add(pid, execPath) +} + // FrameMetadata accepts metadata associated with a frame and caches this information. func (r *OTLPReporter) FrameMetadata(fileID libpf.FileID, addressOrLine libpf.AddressOrLineno, lineNumber libpf.SourceLineno, functionOffset uint32, functionName, filePath string) { @@ -732,6 +742,16 @@ func getTraceLabels(stringMap map[string]uint32, i traceInfo) []*pprofextended.L }) } + if i.pid != 0 { + pidIdx := getStringMapIndex(stringMap, "pid") + pidValueIdx := getStringMapIndex(stringMap, string(i.pid)) + + labels = append(labels, &pprofextended.Label{ + Key: int64(pidIdx), + Str: int64(pidValueIdx), + }) + } + return labels } diff --git a/reporter/reporter.go b/reporter/reporter.go index a37a159..d9fe144 100644 --- a/reporter/reporter.go +++ b/reporter/reporter.go @@ -93,6 +93,8 @@ type GRPCReporter struct { hostMetadataQueue fifoRingBuffer[*HostMetadata] // fallbackSymbolsQueue is a ring buffer based FIFO for *fallbackSymbol fallbackSymbolsQueue fifoRingBuffer[*fallbackSymbol] + // execPathesQueue stores the last known execPath for a PID. + execPathesQueue fifoRingBuffer[*processMetadata] } // Assert that we implement the full Reporter interface. @@ -109,6 +111,11 @@ type executableMetadata struct { buildID string } +type processMetadata struct { + pid libpf.PID + execPath string +} + // ExecutableMetadata implements the SymbolReporter interface. func (r *GRPCReporter) ExecutableMetadata(ctx context.Context, fileID libpf.FileID, fileName, buildID string) { @@ -124,6 +131,13 @@ func (r *GRPCReporter) ExecutableMetadata(ctx context.Context, fileID libpf.File } } +func (r *GRPCReporter) ProcessMetadata(_ context.Context, pid libpf.PID, execPath string) { + r.execPathesQueue.append(&processMetadata{ + pid: pid, + execPath: execPath, + }) +} + // FrameMetadata implements the SymbolReporter interface. func (r *GRPCReporter) FrameMetadata(fileID libpf.FileID, addressOrLine libpf.AddressOrLineno, lineNumber libpf.SourceLineno, functionOffset uint32, @@ -140,7 +154,7 @@ func (r *GRPCReporter) FrameMetadata(fileID libpf.FileID, // ReportCountForTrace implements the TraceReporter interface. func (r *GRPCReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp libpf.UnixTime32, - count uint16, comm, podName, containerName string) { + count uint16, comm, podName, containerName string, pid libpf.PID) { r.countsForTracesQueue.append(&libpf.TraceAndCounts{ Hash: traceHash, Timestamp: timestamp, @@ -148,6 +162,7 @@ func (r *GRPCReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp Comm: comm, PodName: podName, ContainerName: containerName, + PID: pid, }) } diff --git a/tracehandler/tracehandler.go b/tracehandler/tracehandler.go index fbf63a9..ea0a946 100644 --- a/tracehandler/tracehandler.go +++ b/tracehandler/tracehandler.go @@ -145,7 +145,7 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) { if traceKnown { m.bpfTraceCacheHit++ m.reporter.ReportCountForTrace(postConvHash, timestamp, 1, - bpfTrace.Comm, meta.PodName, meta.ContainerName) + bpfTrace.Comm, meta.PodName, meta.ContainerName, bpfTrace.PID) return } m.bpfTraceCacheMiss++ @@ -155,7 +155,7 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) { log.Debugf("Trace hash remap 0x%x -> 0x%x", bpfTrace.Hash, umTrace.Hash) m.bpfTraceCache.Add(bpfTrace.Hash, umTrace.Hash) m.reporter.ReportCountForTrace(umTrace.Hash, timestamp, 1, - bpfTrace.Comm, meta.PodName, meta.ContainerName) + bpfTrace.Comm, meta.PodName, meta.ContainerName, bpfTrace.PID) // Trace already known to collector by UM hash? if _, known := m.umTraceCache.Get(umTrace.Hash); known { diff --git a/tracehandler/tracehandler_test.go b/tracehandler/tracehandler_test.go index 4e03cb8..c3c50c3 100644 --- a/tracehandler/tracehandler_test.go +++ b/tracehandler/tracehandler_test.go @@ -73,7 +73,7 @@ func (m *mockReporter) ReportFramesForTrace(trace *libpf.Trace) { } func (m *mockReporter) ReportCountForTrace(traceHash libpf.TraceHash, - _ libpf.UnixTime32, count uint16, _, _, _ string) { + _ libpf.UnixTime32, count uint16, _, _, _ string, _ libpf.PID) { m.reportedCounts = append(m.reportedCounts, reportedCount{ traceHash: traceHash, count: count, diff --git a/utils/coredump/coredump.go b/utils/coredump/coredump.go index 829d2ab..2a70254 100644 --- a/utils/coredump/coredump.go +++ b/utils/coredump/coredump.go @@ -105,6 +105,8 @@ func (c *symbolizationCache) FrameMetadata(fileID libpf.FileID, func (c *symbolizationCache) ReportFallbackSymbol(libpf.FrameID, string) {} +func (c *symbolizationCache) ProcessMetadata(_ context.Context, _ libpf.PID, _ string) {} + func generateErrorMap() (map[libpf.AddressOrLineno]string, error) { file, err := os.Open("../errors-codegen/errors.json") if err != nil {