Skip to content

Commit

Permalink
Add additional frame with executable path when available
Browse files Browse the repository at this point in the history
  • Loading branch information
Gandem committed Jun 11, 2024
1 parent 714a34c commit d0a485a
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 7 deletions.
1 change: 1 addition & 0 deletions libpf/libpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ type TraceAndCounts struct {
Comm string
PodName string
ContainerName string
PID PID
}

type FrameMetadata struct {
Expand Down
7 changes: 7 additions & 0 deletions libpf/process/coredump.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ func (cd *CoredumpProcess) GetMappingFile(_ *Mapping) string {
return ""
}

func (cd *CoredumpProcess) GetExecutablePath() (string, error) {
if cd.MainExecutable() == "" {
return "", errors.New("no main executable found")
}
return cd.MainExecutable(), nil
}

// CalculateMappingFileID implements the Process interface
func (cd *CoredumpProcess) CalculateMappingFileID(m *Mapping) (libpf.FileID, error) {
// It is not possible to calculate the real FileID as the section headers
Expand Down
4 changes: 4 additions & 0 deletions libpf/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func (sp *systemProcess) GetThreads() ([]ThreadInfo, error) {
return nil, errors.New("not implemented")
}

func (sp *systemProcess) GetExecutablePath() (string, error) {
return os.Readlink(fmt.Sprintf("/proc/%d/exe", sp.pid))
}

func (sp *systemProcess) Close() error {
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions libpf/process/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type Process interface {
// GetMapping reads and parses process memory mappings
GetMappings() ([]Mapping, error)

// GetExecutablePath returns the path to the executable of the process
GetExecutablePath() (string, error)

// GetThread reads the process thread states
GetThreads() ([]ThreadInfo, error)

Expand Down
2 changes: 2 additions & 0 deletions processmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (d *dummyProcess) GetMappingFile(_ *process.Mapping) string {
return ""
}

func (d *dummyProcess) GetExecutablePath() (string, error) { return "", nil }

func (d *dummyProcess) CalculateMappingFileID(m *process.Mapping) (libpf.FileID, error) {
return pfelf.CalculateID(m.Path)
}
Expand Down
3 changes: 3 additions & 0 deletions processmanager/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,9 @@ func (pm *ProcessManager) SynchronizeProcess(pr process.Process) {
// Also see: Unified PID Events design doc
pm.ebpf.RemoveReportedPID(pid)
}

execPath, _ := pr.GetExecutablePath()
pm.reporter.ProcessMetadata(context.TODO(), pid, execPath)
}

// CleanupPIDs executes a periodic synchronization of pidToProcessInfo table with system processes.
Expand Down
34 changes: 33 additions & 1 deletion reporter/datadog_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type DatadogReporter struct {

// frames maps frame information to its source location.
frames *lru.SyncedLRU[libpf.FileID, map[libpf.AddressOrLineno]sourceInfo]

// execPathes stores the last known execPath for a PID.
execPathes *lru.SyncedLRU[libpf.PID, string]
}

// ReportFramesForTrace accepts a trace with the corresponding frames
Expand Down Expand Up @@ -94,7 +97,7 @@ func (r *DatadogReporter) ReportFramesForTrace(trace *libpf.Trace) {
// caches this information.
// nolint: dupl
func (r *DatadogReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp libpf.UnixTime32,
count uint16, comm, podName, containerName string) {
count uint16, comm, podName, containerName string, pid libpf.PID) {
if v, exists := r.traces.Peek(traceHash); exists {
// As traces is filled from two different API endpoints,
// some information for the trace might be available already.
Expand All @@ -103,13 +106,15 @@ func (r *DatadogReporter) ReportCountForTrace(traceHash libpf.TraceHash, timesta
v.comm = comm
v.podName = podName
v.containerName = containerName
v.pid = pid

r.traces.Add(traceHash, v)
} else {
r.traces.Add(traceHash, traceInfo{
comm: comm,
podName: podName,
containerName: containerName,
pid: pid,
})
}

Expand Down Expand Up @@ -144,6 +149,10 @@ func (r *DatadogReporter) ExecutableMetadata(_ context.Context,
})
}

func (r *DatadogReporter) ProcessMetadata(_ context.Context, pid libpf.PID, execPath string) {
r.execPathes.Add(pid, execPath)
}

// FrameMetadata accepts metadata associated with a frame and caches this information.
func (r *DatadogReporter) FrameMetadata(fileID libpf.FileID, addressOrLine libpf.AddressOrLineno,
lineNumber libpf.SourceLineno, functionOffset uint32, functionName, filePath string) {
Expand Down Expand Up @@ -241,6 +250,11 @@ func StartDatadog(mainCtx context.Context, c *Config) (Reporter, error) {
return nil, err
}

execPathes, err := lru.NewSynced[libpf.PID, string](cacheSize, libpf.PID.Hash32)
if err != nil {
return nil, err
}

// Next step: Dynamically configure the size of this LRU.
// Currently we use the length of the JSON array in
// hostmetadata/hostmetadata.json.
Expand All @@ -261,6 +275,7 @@ func StartDatadog(mainCtx context.Context, c *Config) (Reporter, error) {
executables: executables,
frames: frames,
hostmetadata: hostmetadata,
execPathes: execPathes,
}

// Create a child context for reporting features
Expand Down Expand Up @@ -511,6 +526,23 @@ func (r *DatadogReporter) getPprofProfile() (profile *pprofile.Profile,
sample.Location = append(sample.Location, loc)
}

execPath, _ := r.execPathes.Get(trace.pid)

// Check if the last frame is a kernel frame.
if trace.frameTypes[len(trace.frameTypes)-1] == libpf.KernelFrame {
// If the last frame is a kernel frame, we need to add a dummy
// location with the kernel as the function name.
execPath = "kernel"
}

if execPath != "" {
lastLocation := sample.Location[len(sample.Location)-1]
loc := createPProfLocation(profile, lastLocation.Address)
m := createPprofFunctionEntry(funcMap, profile, execPath, execPath)
loc.Line = append(loc.Line, pprofile.Line{Function: m})
sample.Location = append(sample.Location, loc)
}

sample.Label = make(map[string][]string)
addTraceLabels(sample.Label, trace)

Expand Down
6 changes: 5 additions & 1 deletion reporter/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type TraceReporter interface {
// ReportCountForTrace accepts a hash of a trace with a corresponding count and
// caches this information before a periodic reporting to the backend.
ReportCountForTrace(traceHash libpf.TraceHash, timestamp libpf.UnixTime32,
count uint16, comm, podName, containerName string)
count uint16, comm, podName, containerName string, pid libpf.PID)
}

type SymbolReporter interface {
Expand All @@ -63,6 +63,10 @@ type SymbolReporter interface {
// a periodic reporting to the backend.
FrameMetadata(fileID libpf.FileID, addressOrLine libpf.AddressOrLineno,
lineNumber libpf.SourceLineno, functionOffset uint32, functionName, filePath string)

// ProcessMetadata accepts metadata associated with a process and caches this information
// before a periodic reporting to the backend.
ProcessMetadata(ctx context.Context, pid libpf.PID, exe string)
}

type HostMetadataReporter interface {
Expand Down
22 changes: 21 additions & 1 deletion reporter/otlp_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type traceInfo struct {
podName string
containerName string
apmServiceName string
pid libpf.PID
}

// sample holds dynamic information about traces.
Expand Down Expand Up @@ -101,6 +102,9 @@ type OTLPReporter struct {

// frames maps frame information to its source location.
frames *lru.SyncedLRU[libpf.FileID, map[libpf.AddressOrLineno]sourceInfo]

// execPathes stores the last known execPath for a PID.
execPathes *lru.SyncedLRU[libpf.PID, string]
}

// hashString is a helper function for LRUs that use string as a key.
Expand Down Expand Up @@ -136,7 +140,7 @@ func (r *OTLPReporter) ReportFramesForTrace(trace *libpf.Trace) {
// caches this information.
// nolint: dupl
func (r *OTLPReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp libpf.UnixTime32,
count uint16, comm, podName, containerName string) {
count uint16, comm, podName, containerName string, pid libpf.PID) {
if v, exists := r.traces.Peek(traceHash); exists {
// As traces is filled from two different API endpoints,
// some information for the trace might be available already.
Expand All @@ -145,13 +149,15 @@ func (r *OTLPReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp
v.comm = comm
v.podName = podName
v.containerName = containerName
v.pid = pid

r.traces.Add(traceHash, v)
} else {
r.traces.Add(traceHash, traceInfo{
comm: comm,
podName: podName,
containerName: containerName,
pid: pid,
})
}

Expand Down Expand Up @@ -186,6 +192,10 @@ func (r *OTLPReporter) ExecutableMetadata(_ context.Context,
})
}

func (r *OTLPReporter) ProcessMetadata(_ context.Context, pid libpf.PID, execPath string) {
r.execPathes.Add(pid, execPath)
}

// FrameMetadata accepts metadata associated with a frame and caches this information.
func (r *OTLPReporter) FrameMetadata(fileID libpf.FileID, addressOrLine libpf.AddressOrLineno,
lineNumber libpf.SourceLineno, functionOffset uint32, functionName, filePath string) {
Expand Down Expand Up @@ -732,6 +742,16 @@ func getTraceLabels(stringMap map[string]uint32, i traceInfo) []*pprofextended.L
})
}

if i.pid != 0 {
pidIdx := getStringMapIndex(stringMap, "pid")
pidValueIdx := getStringMapIndex(stringMap, string(i.pid))

labels = append(labels, &pprofextended.Label{
Key: int64(pidIdx),
Str: int64(pidValueIdx),
})
}

return labels
}

Expand Down
17 changes: 16 additions & 1 deletion reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type GRPCReporter struct {
hostMetadataQueue fifoRingBuffer[*HostMetadata]
// fallbackSymbolsQueue is a ring buffer based FIFO for *fallbackSymbol
fallbackSymbolsQueue fifoRingBuffer[*fallbackSymbol]
// execPathesQueue stores the last known execPath for a PID.
execPathesQueue fifoRingBuffer[*processMetadata]
}

// Assert that we implement the full Reporter interface.
Expand All @@ -109,6 +111,11 @@ type executableMetadata struct {
buildID string
}

type processMetadata struct {
pid libpf.PID
execPath string
}

// ExecutableMetadata implements the SymbolReporter interface.
func (r *GRPCReporter) ExecutableMetadata(ctx context.Context, fileID libpf.FileID,
fileName, buildID string) {
Expand All @@ -124,6 +131,13 @@ func (r *GRPCReporter) ExecutableMetadata(ctx context.Context, fileID libpf.File
}
}

func (r *GRPCReporter) ProcessMetadata(_ context.Context, pid libpf.PID, execPath string) {
r.execPathesQueue.append(&processMetadata{
pid: pid,
execPath: execPath,
})
}

// FrameMetadata implements the SymbolReporter interface.
func (r *GRPCReporter) FrameMetadata(fileID libpf.FileID,
addressOrLine libpf.AddressOrLineno, lineNumber libpf.SourceLineno, functionOffset uint32,
Expand All @@ -140,14 +154,15 @@ func (r *GRPCReporter) FrameMetadata(fileID libpf.FileID,

// ReportCountForTrace implements the TraceReporter interface.
func (r *GRPCReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp libpf.UnixTime32,
count uint16, comm, podName, containerName string) {
count uint16, comm, podName, containerName string, pid libpf.PID) {
r.countsForTracesQueue.append(&libpf.TraceAndCounts{
Hash: traceHash,
Timestamp: timestamp,
Count: count,
Comm: comm,
PodName: podName,
ContainerName: containerName,
PID: pid,
})
}

Expand Down
4 changes: 2 additions & 2 deletions tracehandler/tracehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) {
if traceKnown {
m.bpfTraceCacheHit++
m.reporter.ReportCountForTrace(postConvHash, timestamp, 1,
bpfTrace.Comm, meta.PodName, meta.ContainerName)
bpfTrace.Comm, meta.PodName, meta.ContainerName, bpfTrace.PID)
return
}
m.bpfTraceCacheMiss++
Expand All @@ -155,7 +155,7 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) {
log.Debugf("Trace hash remap 0x%x -> 0x%x", bpfTrace.Hash, umTrace.Hash)
m.bpfTraceCache.Add(bpfTrace.Hash, umTrace.Hash)
m.reporter.ReportCountForTrace(umTrace.Hash, timestamp, 1,
bpfTrace.Comm, meta.PodName, meta.ContainerName)
bpfTrace.Comm, meta.PodName, meta.ContainerName, bpfTrace.PID)

// Trace already known to collector by UM hash?
if _, known := m.umTraceCache.Get(umTrace.Hash); known {
Expand Down
2 changes: 1 addition & 1 deletion tracehandler/tracehandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m *mockReporter) ReportFramesForTrace(trace *libpf.Trace) {
}

func (m *mockReporter) ReportCountForTrace(traceHash libpf.TraceHash,
_ libpf.UnixTime32, count uint16, _, _, _ string) {
_ libpf.UnixTime32, count uint16, _, _, _ string, _ libpf.PID) {
m.reportedCounts = append(m.reportedCounts, reportedCount{
traceHash: traceHash,
count: count,
Expand Down
2 changes: 2 additions & 0 deletions utils/coredump/coredump.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func (c *symbolizationCache) FrameMetadata(fileID libpf.FileID,

func (c *symbolizationCache) ReportFallbackSymbol(libpf.FrameID, string) {}

func (c *symbolizationCache) ProcessMetadata(_ context.Context, _ libpf.PID, _ string) {}

func generateErrorMap() (map[libpf.AddressOrLineno]string, error) {
file, err := os.Open("../errors-codegen/errors.json")
if err != nil {
Expand Down

0 comments on commit d0a485a

Please sign in to comment.