diff --git a/internal/engines/cache/cache.go b/internal/engines/cache/cache.go index f8f8ad3f9..b49733f7e 100644 --- a/internal/engines/cache/cache.go +++ b/internal/engines/cache/cache.go @@ -3,6 +3,7 @@ package cache import ( "context" "encoding/hex" + "time" "go.opentelemetry.io/otel" api "go.opentelemetry.io/otel/metric" @@ -88,10 +89,14 @@ func (c *CheckEngineWithCache) Check(ctx context.Context, request *base.Permissi if found { ctx, span := tracer.Start(ctx, "hit") defer span.End() + start := time.Now() // Increase the check count in the metrics. c.cacheCounter.Add(ctx, 1) + duration := time.Now().Sub(start) + c.cacheHitDurationHistogram.Record(ctx, duration.Microseconds()) + // If the request doesn't have the exclusion flag set, return the cached result. return &base.PermissionCheckResponse{ Can: res.GetCan(), diff --git a/internal/servers/dataServer.go b/internal/servers/dataServer.go index ed6da3017..d99af4570 100644 --- a/internal/servers/dataServer.go +++ b/internal/servers/dataServer.go @@ -2,8 +2,10 @@ package servers import ( "log/slog" + "time" otelCodes "go.opentelemetry.io/otel/codes" + api "go.opentelemetry.io/otel/metric" "golang.org/x/net/context" "google.golang.org/grpc/status" @@ -19,10 +21,17 @@ import ( type DataServer struct { v1.UnimplementedDataServer - sr storage.SchemaReader - dr storage.DataReader - br storage.BundleReader - dw storage.DataWriter + sr storage.SchemaReader + dr storage.DataReader + br storage.BundleReader + dw storage.DataWriter + writeDataHistogram api.Int64Histogram + deleteDataHistogram api.Int64Histogram + readAttributesHistogram api.Int64Histogram + readRelationshipsHistogram api.Int64Histogram + writeRelationshipsHistogram api.Int64Histogram + deleteRelationshipsHistogram api.Int64Histogram + runBundleHistogram api.Int64Histogram } // NewDataServer - Creates new Data Server @@ -32,11 +41,89 @@ func NewDataServer( br storage.BundleReader, sr storage.SchemaReader, ) *DataServer { + + writeDataHistogram, err := meter.Int64Histogram( + "write_data", + api.WithUnit("microseconds"), + api.WithDescription("Duration of writing data in microseconds"), + ) + + if err != nil { + panic(err) + } + + deleteDataHistogram, err := meter.Int64Histogram( + "delete_data", + api.WithUnit("microseconds"), + api.WithDescription("Duration of deleting data in microseconds"), + ) + + if err != nil { + panic(err) + } + + readAttributesHistogram, err := meter.Int64Histogram( + "read_attributes", + api.WithUnit("microseconds"), + api.WithDescription("Duration of reading attributes in microseconds"), + ) + + if err != nil { + panic(err) + } + + readRelationshipsHistogram, err := meter.Int64Histogram( + "read_relationships", + api.WithUnit("microseconds"), + api.WithDescription("Duration of reading relationships in microseconds"), + ) + + if err != nil { + panic(err) + } + + writeRelationshipsHistogram, err := meter.Int64Histogram( + "write_relationships", + api.WithUnit("microseconds"), + api.WithDescription("Duration of writing relationships in microseconds"), + ) + + if err != nil { + panic(err) + } + + deleteRelationshipsHistogram, err := meter.Int64Histogram( + "delete_relationships", + api.WithUnit("microseconds"), + api.WithDescription("Duration of deleting relationships in microseconds"), + ) + + if err != nil { + panic(err) + } + + runBundleHistogram, err := meter.Int64Histogram( + "run_bundle", + api.WithUnit("microseconds"), + api.WithDescription("Duration of running bunble in microseconds"), + ) + + if err != nil { + panic(err) + } + return &DataServer{ - dr: dr, - dw: dw, - br: br, - sr: sr, + dr: dr, + dw: dw, + br: br, + sr: sr, + writeDataHistogram: writeDataHistogram, + deleteDataHistogram: deleteDataHistogram, + readAttributesHistogram: readAttributesHistogram, + readRelationshipsHistogram: readRelationshipsHistogram, + writeRelationshipsHistogram: writeRelationshipsHistogram, + deleteRelationshipsHistogram: deleteRelationshipsHistogram, + runBundleHistogram: runBundleHistogram, } } @@ -44,6 +131,7 @@ func NewDataServer( func (r *DataServer) ReadRelationships(ctx context.Context, request *v1.RelationshipReadRequest) (*v1.RelationshipReadResponse, error) { ctx, span := tracer.Start(ctx, "data.read.relationships") defer span.End() + start := time.Now() v := request.Validate() if v != nil { @@ -76,6 +164,9 @@ func (r *DataServer) ReadRelationships(ctx context.Context, request *v1.Relation return nil, status.Error(GetStatus(err), err.Error()) } + duration := time.Now().Sub(start) + r.readRelationshipsHistogram.Record(ctx, duration.Microseconds()) + return &v1.RelationshipReadResponse{ Tuples: collection.GetTuples(), ContinuousToken: ct.String(), @@ -86,6 +177,7 @@ func (r *DataServer) ReadRelationships(ctx context.Context, request *v1.Relation func (r *DataServer) ReadAttributes(ctx context.Context, request *v1.AttributeReadRequest) (*v1.AttributeReadResponse, error) { ctx, span := tracer.Start(ctx, "data.read.attributes") defer span.End() + start := time.Now() v := request.Validate() if v != nil { @@ -118,6 +210,9 @@ func (r *DataServer) ReadAttributes(ctx context.Context, request *v1.AttributeRe return nil, status.Error(GetStatus(err), err.Error()) } + duration := time.Now().Sub(start) + r.readAttributesHistogram.Record(ctx, duration.Microseconds()) + return &v1.AttributeReadResponse{ Attributes: collection.GetAttributes(), ContinuousToken: ct.String(), @@ -128,6 +223,7 @@ func (r *DataServer) ReadAttributes(ctx context.Context, request *v1.AttributeRe func (r *DataServer) Write(ctx context.Context, request *v1.DataWriteRequest) (*v1.DataWriteResponse, error) { ctx, span := tracer.Start(ctx, "data.write") defer span.End() + start := time.Now() v := request.Validate() if v != nil { @@ -215,6 +311,9 @@ func (r *DataServer) Write(ctx context.Context, request *v1.DataWriteRequest) (* return nil, status.Error(GetStatus(err), err.Error()) } + duration := time.Now().Sub(start) + r.writeDataHistogram.Record(ctx, duration.Microseconds()) + return &v1.DataWriteResponse{ SnapToken: snap.String(), }, nil @@ -224,6 +323,7 @@ func (r *DataServer) Write(ctx context.Context, request *v1.DataWriteRequest) (* func (r *DataServer) WriteRelationships(ctx context.Context, request *v1.RelationshipWriteRequest) (*v1.RelationshipWriteResponse, error) { ctx, span := tracer.Start(ctx, "relationships.write") defer span.End() + start := time.Now() v := request.Validate() if v != nil { @@ -280,6 +380,9 @@ func (r *DataServer) WriteRelationships(ctx context.Context, request *v1.Relatio return nil, status.Error(GetStatus(err), err.Error()) } + duration := time.Now().Sub(start) + r.writeRelationshipsHistogram.Record(ctx, duration.Microseconds()) + return &v1.RelationshipWriteResponse{ SnapToken: snap.String(), }, nil @@ -289,6 +392,7 @@ func (r *DataServer) WriteRelationships(ctx context.Context, request *v1.Relatio func (r *DataServer) Delete(ctx context.Context, request *v1.DataDeleteRequest) (*v1.DataDeleteResponse, error) { ctx, span := tracer.Start(ctx, "data.delete") defer span.End() + start := time.Now() v := request.Validate() if v != nil { @@ -308,6 +412,9 @@ func (r *DataServer) Delete(ctx context.Context, request *v1.DataDeleteRequest) return nil, status.Error(GetStatus(err), err.Error()) } + duration := time.Now().Sub(start) + r.deleteDataHistogram.Record(ctx, duration.Microseconds()) + return &v1.DataDeleteResponse{ SnapToken: snap.String(), }, nil @@ -317,6 +424,7 @@ func (r *DataServer) Delete(ctx context.Context, request *v1.DataDeleteRequest) func (r *DataServer) DeleteRelationships(ctx context.Context, request *v1.RelationshipDeleteRequest) (*v1.RelationshipDeleteResponse, error) { ctx, span := tracer.Start(ctx, "relationships.delete") defer span.End() + start := time.Now() v := request.Validate() if v != nil { @@ -336,6 +444,9 @@ func (r *DataServer) DeleteRelationships(ctx context.Context, request *v1.Relati return nil, status.Error(GetStatus(err), err.Error()) } + duration := time.Now().Sub(start) + r.deleteRelationshipsHistogram.Record(ctx, duration.Microseconds()) + return &v1.RelationshipDeleteResponse{ SnapToken: snap.String(), }, nil @@ -345,6 +456,7 @@ func (r *DataServer) DeleteRelationships(ctx context.Context, request *v1.Relati func (r *DataServer) RunBundle(ctx context.Context, request *v1.BundleRunRequest) (*v1.BundleRunResponse, error) { ctx, span := tracer.Start(ctx, "bundle.run") defer span.End() + start := time.Now() v := request.Validate() if v != nil { @@ -372,6 +484,9 @@ func (r *DataServer) RunBundle(ctx context.Context, request *v1.BundleRunRequest return nil, status.Error(GetStatus(err), err.Error()) } + duration := time.Now().Sub(start) + r.runBundleHistogram.Record(ctx, duration.Microseconds()) + return &v1.BundleRunResponse{ SnapToken: snap.String(), }, nil diff --git a/internal/servers/schemaServer.go b/internal/servers/schemaServer.go index d67a32bc5..ac2fa481e 100644 --- a/internal/servers/schemaServer.go +++ b/internal/servers/schemaServer.go @@ -3,8 +3,10 @@ package servers import ( "log/slog" "strings" + "time" "github.com/rs/xid" + api "go.opentelemetry.io/otel/metric" "google.golang.org/grpc/status" otelCodes "go.opentelemetry.io/otel/codes" @@ -21,15 +23,52 @@ import ( type SchemaServer struct { v1.UnimplementedSchemaServer - sw storage.SchemaWriter - sr storage.SchemaReader + sw storage.SchemaWriter + sr storage.SchemaReader + writeSchemaHistogram api.Int64Histogram + readSchemaHistogram api.Int64Histogram + listSchemaHistogram api.Int64Histogram } // NewSchemaServer - Creates new Schema Server func NewSchemaServer(sw storage.SchemaWriter, sr storage.SchemaReader) *SchemaServer { + + writeSchemaHistogram, err := meter.Int64Histogram( + "write_schema", + api.WithUnit("microseconds"), + api.WithDescription("Duration of writing schema in microseconds"), + ) + + if err != nil { + panic(err) + } + + readSchemaHistogram, err := meter.Int64Histogram( + "read_schema", + api.WithUnit("microseconds"), + api.WithDescription("Duration of reading schema in microseconds"), + ) + + if err != nil { + panic(err) + } + + listSchemaHistogram, err := meter.Int64Histogram( + "list_schema", + api.WithUnit("microseconds"), + api.WithDescription("Duration of listing schema in microseconds"), + ) + + if err != nil { + panic(err) + } + return &SchemaServer{ - sw: sw, - sr: sr, + sw: sw, + sr: sr, + writeSchemaHistogram: writeSchemaHistogram, + readSchemaHistogram: readSchemaHistogram, + listSchemaHistogram: listSchemaHistogram, } } @@ -37,6 +76,7 @@ func NewSchemaServer(sw storage.SchemaWriter, sr storage.SchemaReader) *SchemaSe func (r *SchemaServer) Write(ctx context.Context, request *v1.SchemaWriteRequest) (*v1.SchemaWriteResponse, error) { ctx, span := tracer.Start(ctx, "schemas.write") defer span.End() + start := time.Now() sch, err := parser.NewParser(request.GetSchema()).Parse() if err != nil { @@ -72,6 +112,9 @@ func (r *SchemaServer) Write(ctx context.Context, request *v1.SchemaWriteRequest return nil, status.Error(GetStatus(err), err.Error()) } + duration := time.Now().Sub(start) + r.writeSchemaHistogram.Record(ctx, duration.Microseconds()) + return &v1.SchemaWriteResponse{ SchemaVersion: version, }, nil @@ -192,6 +235,7 @@ func (r *SchemaServer) PartialWrite(ctx context.Context, request *v1.SchemaParti func (r *SchemaServer) Read(ctx context.Context, request *v1.SchemaReadRequest) (*v1.SchemaReadResponse, error) { ctx, span := tracer.Start(ctx, "schemas.read") defer span.End() + start := time.Now() version := request.GetMetadata().GetSchemaVersion() if version == "" { @@ -210,6 +254,9 @@ func (r *SchemaServer) Read(ctx context.Context, request *v1.SchemaReadRequest) return nil, status.Error(GetStatus(err), err.Error()) } + duration := time.Now().Sub(start) + r.readSchemaHistogram.Record(ctx, duration.Microseconds()) + return &v1.SchemaReadResponse{ Schema: response, }, nil @@ -219,6 +266,7 @@ func (r *SchemaServer) Read(ctx context.Context, request *v1.SchemaReadRequest) func (r *SchemaServer) List(ctx context.Context, request *v1.SchemaListRequest) (*v1.SchemaListResponse, error) { ctx, span := tracer.Start(ctx, "schemas.list") defer span.End() + start := time.Now() schemas, ct, err := r.sr.ListSchemas(ctx, request.GetTenantId(), database.NewPagination(database.Size(request.GetPageSize()), database.Token(request.GetContinuousToken()))) if err != nil { @@ -233,6 +281,9 @@ func (r *SchemaServer) List(ctx context.Context, request *v1.SchemaListRequest) return nil, status.Error(GetStatus(err), err.Error()) } + duration := time.Now().Sub(start) + r.listSchemaHistogram.Record(ctx, duration.Microseconds()) + return &v1.SchemaListResponse{ Head: head, Schemas: schemas, diff --git a/internal/servers/server.go b/internal/servers/server.go index e9dbd6867..7cae6d51f 100644 --- a/internal/servers/server.go +++ b/internal/servers/server.go @@ -40,6 +40,7 @@ import ( ) var tracer = otel.Tracer("servers") +var meter = otel.Meter("servers") // Container is a struct that holds the invoker and various storage // for permission-related operations. It serves as a central point of access