Skip to content

Commit

Permalink
Merge pull request #1344 from Permify/ufuk/servermetrics
Browse files Browse the repository at this point in the history
feat: metric exporters for data and schema servers
  • Loading branch information
tolgaOzen authored Jul 10, 2024
2 parents ec79d37 + 49dc929 commit ff73ef2
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 12 deletions.
5 changes: 5 additions & 0 deletions internal/engines/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
"context"
"encoding/hex"
"time"

"go.opentelemetry.io/otel"
api "go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -88,10 +89,14 @@ func (c *CheckEngineWithCache) Check(ctx context.Context, request *base.Permissi
if found {
ctx, span := tracer.Start(ctx, "hit")
defer span.End()
start := time.Now()

// Increase the check count in the metrics.
c.cacheCounter.Add(ctx, 1)

duration := time.Now().Sub(start)
c.cacheHitDurationHistogram.Record(ctx, duration.Microseconds())

// If the request doesn't have the exclusion flag set, return the cached result.
return &base.PermissionCheckResponse{
Can: res.GetCan(),
Expand Down
131 changes: 123 additions & 8 deletions internal/servers/dataServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package servers

import (
"log/slog"
"time"

otelCodes "go.opentelemetry.io/otel/codes"
api "go.opentelemetry.io/otel/metric"
"golang.org/x/net/context"
"google.golang.org/grpc/status"

Expand All @@ -19,10 +21,17 @@ import (
type DataServer struct {
v1.UnimplementedDataServer

sr storage.SchemaReader
dr storage.DataReader
br storage.BundleReader
dw storage.DataWriter
sr storage.SchemaReader
dr storage.DataReader
br storage.BundleReader
dw storage.DataWriter
writeDataHistogram api.Int64Histogram
deleteDataHistogram api.Int64Histogram
readAttributesHistogram api.Int64Histogram
readRelationshipsHistogram api.Int64Histogram
writeRelationshipsHistogram api.Int64Histogram
deleteRelationshipsHistogram api.Int64Histogram
runBundleHistogram api.Int64Histogram
}

// NewDataServer - Creates new Data Server
Expand All @@ -32,18 +41,97 @@ func NewDataServer(
br storage.BundleReader,
sr storage.SchemaReader,
) *DataServer {

writeDataHistogram, err := meter.Int64Histogram(
"write_data",
api.WithUnit("microseconds"),
api.WithDescription("Duration of writing data in microseconds"),
)

if err != nil {
panic(err)
}

deleteDataHistogram, err := meter.Int64Histogram(
"delete_data",
api.WithUnit("microseconds"),
api.WithDescription("Duration of deleting data in microseconds"),
)

if err != nil {
panic(err)
}

readAttributesHistogram, err := meter.Int64Histogram(
"read_attributes",
api.WithUnit("microseconds"),
api.WithDescription("Duration of reading attributes in microseconds"),
)

if err != nil {
panic(err)
}

readRelationshipsHistogram, err := meter.Int64Histogram(
"read_relationships",
api.WithUnit("microseconds"),
api.WithDescription("Duration of reading relationships in microseconds"),
)

if err != nil {
panic(err)
}

writeRelationshipsHistogram, err := meter.Int64Histogram(
"write_relationships",
api.WithUnit("microseconds"),
api.WithDescription("Duration of writing relationships in microseconds"),
)

if err != nil {
panic(err)
}

deleteRelationshipsHistogram, err := meter.Int64Histogram(
"delete_relationships",
api.WithUnit("microseconds"),
api.WithDescription("Duration of deleting relationships in microseconds"),
)

if err != nil {
panic(err)
}

runBundleHistogram, err := meter.Int64Histogram(
"run_bundle",
api.WithUnit("microseconds"),
api.WithDescription("Duration of running bunble in microseconds"),
)

if err != nil {
panic(err)
}

return &DataServer{
dr: dr,
dw: dw,
br: br,
sr: sr,
dr: dr,
dw: dw,
br: br,
sr: sr,
writeDataHistogram: writeDataHistogram,
deleteDataHistogram: deleteDataHistogram,
readAttributesHistogram: readAttributesHistogram,
readRelationshipsHistogram: readRelationshipsHistogram,
writeRelationshipsHistogram: writeRelationshipsHistogram,
deleteRelationshipsHistogram: deleteRelationshipsHistogram,
runBundleHistogram: runBundleHistogram,
}
}

// ReadRelationships - Allows directly querying the stored engines data to display and filter stored relational tuples
func (r *DataServer) ReadRelationships(ctx context.Context, request *v1.RelationshipReadRequest) (*v1.RelationshipReadResponse, error) {
ctx, span := tracer.Start(ctx, "data.read.relationships")
defer span.End()
start := time.Now()

v := request.Validate()
if v != nil {
Expand Down Expand Up @@ -76,6 +164,9 @@ func (r *DataServer) ReadRelationships(ctx context.Context, request *v1.Relation
return nil, status.Error(GetStatus(err), err.Error())
}

duration := time.Now().Sub(start)
r.readRelationshipsHistogram.Record(ctx, duration.Microseconds())

return &v1.RelationshipReadResponse{
Tuples: collection.GetTuples(),
ContinuousToken: ct.String(),
Expand All @@ -86,6 +177,7 @@ func (r *DataServer) ReadRelationships(ctx context.Context, request *v1.Relation
func (r *DataServer) ReadAttributes(ctx context.Context, request *v1.AttributeReadRequest) (*v1.AttributeReadResponse, error) {
ctx, span := tracer.Start(ctx, "data.read.attributes")
defer span.End()
start := time.Now()

v := request.Validate()
if v != nil {
Expand Down Expand Up @@ -118,6 +210,9 @@ func (r *DataServer) ReadAttributes(ctx context.Context, request *v1.AttributeRe
return nil, status.Error(GetStatus(err), err.Error())
}

duration := time.Now().Sub(start)
r.readAttributesHistogram.Record(ctx, duration.Microseconds())

return &v1.AttributeReadResponse{
Attributes: collection.GetAttributes(),
ContinuousToken: ct.String(),
Expand All @@ -128,6 +223,7 @@ func (r *DataServer) ReadAttributes(ctx context.Context, request *v1.AttributeRe
func (r *DataServer) Write(ctx context.Context, request *v1.DataWriteRequest) (*v1.DataWriteResponse, error) {
ctx, span := tracer.Start(ctx, "data.write")
defer span.End()
start := time.Now()

v := request.Validate()
if v != nil {
Expand Down Expand Up @@ -215,6 +311,9 @@ func (r *DataServer) Write(ctx context.Context, request *v1.DataWriteRequest) (*
return nil, status.Error(GetStatus(err), err.Error())
}

duration := time.Now().Sub(start)
r.writeDataHistogram.Record(ctx, duration.Microseconds())

return &v1.DataWriteResponse{
SnapToken: snap.String(),
}, nil
Expand All @@ -224,6 +323,7 @@ func (r *DataServer) Write(ctx context.Context, request *v1.DataWriteRequest) (*
func (r *DataServer) WriteRelationships(ctx context.Context, request *v1.RelationshipWriteRequest) (*v1.RelationshipWriteResponse, error) {
ctx, span := tracer.Start(ctx, "relationships.write")
defer span.End()
start := time.Now()

v := request.Validate()
if v != nil {
Expand Down Expand Up @@ -280,6 +380,9 @@ func (r *DataServer) WriteRelationships(ctx context.Context, request *v1.Relatio
return nil, status.Error(GetStatus(err), err.Error())
}

duration := time.Now().Sub(start)
r.writeRelationshipsHistogram.Record(ctx, duration.Microseconds())

return &v1.RelationshipWriteResponse{
SnapToken: snap.String(),
}, nil
Expand All @@ -289,6 +392,7 @@ func (r *DataServer) WriteRelationships(ctx context.Context, request *v1.Relatio
func (r *DataServer) Delete(ctx context.Context, request *v1.DataDeleteRequest) (*v1.DataDeleteResponse, error) {
ctx, span := tracer.Start(ctx, "data.delete")
defer span.End()
start := time.Now()

v := request.Validate()
if v != nil {
Expand All @@ -308,6 +412,9 @@ func (r *DataServer) Delete(ctx context.Context, request *v1.DataDeleteRequest)
return nil, status.Error(GetStatus(err), err.Error())
}

duration := time.Now().Sub(start)
r.deleteDataHistogram.Record(ctx, duration.Microseconds())

return &v1.DataDeleteResponse{
SnapToken: snap.String(),
}, nil
Expand All @@ -317,6 +424,7 @@ func (r *DataServer) Delete(ctx context.Context, request *v1.DataDeleteRequest)
func (r *DataServer) DeleteRelationships(ctx context.Context, request *v1.RelationshipDeleteRequest) (*v1.RelationshipDeleteResponse, error) {
ctx, span := tracer.Start(ctx, "relationships.delete")
defer span.End()
start := time.Now()

v := request.Validate()
if v != nil {
Expand All @@ -336,6 +444,9 @@ func (r *DataServer) DeleteRelationships(ctx context.Context, request *v1.Relati
return nil, status.Error(GetStatus(err), err.Error())
}

duration := time.Now().Sub(start)
r.deleteRelationshipsHistogram.Record(ctx, duration.Microseconds())

return &v1.RelationshipDeleteResponse{
SnapToken: snap.String(),
}, nil
Expand All @@ -345,6 +456,7 @@ func (r *DataServer) DeleteRelationships(ctx context.Context, request *v1.Relati
func (r *DataServer) RunBundle(ctx context.Context, request *v1.BundleRunRequest) (*v1.BundleRunResponse, error) {
ctx, span := tracer.Start(ctx, "bundle.run")
defer span.End()
start := time.Now()

v := request.Validate()
if v != nil {
Expand Down Expand Up @@ -372,6 +484,9 @@ func (r *DataServer) RunBundle(ctx context.Context, request *v1.BundleRunRequest
return nil, status.Error(GetStatus(err), err.Error())
}

duration := time.Now().Sub(start)
r.runBundleHistogram.Record(ctx, duration.Microseconds())

return &v1.BundleRunResponse{
SnapToken: snap.String(),
}, nil
Expand Down
59 changes: 55 additions & 4 deletions internal/servers/schemaServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package servers
import (
"log/slog"
"strings"
"time"

"github.com/rs/xid"
api "go.opentelemetry.io/otel/metric"
"google.golang.org/grpc/status"

otelCodes "go.opentelemetry.io/otel/codes"
Expand All @@ -21,22 +23,60 @@ import (
type SchemaServer struct {
v1.UnimplementedSchemaServer

sw storage.SchemaWriter
sr storage.SchemaReader
sw storage.SchemaWriter
sr storage.SchemaReader
writeSchemaHistogram api.Int64Histogram
readSchemaHistogram api.Int64Histogram
listSchemaHistogram api.Int64Histogram
}

// NewSchemaServer - Creates new Schema Server
func NewSchemaServer(sw storage.SchemaWriter, sr storage.SchemaReader) *SchemaServer {

writeSchemaHistogram, err := meter.Int64Histogram(
"write_schema",
api.WithUnit("microseconds"),
api.WithDescription("Duration of writing schema in microseconds"),
)

if err != nil {
panic(err)
}

readSchemaHistogram, err := meter.Int64Histogram(
"read_schema",
api.WithUnit("microseconds"),
api.WithDescription("Duration of reading schema in microseconds"),
)

if err != nil {
panic(err)
}

listSchemaHistogram, err := meter.Int64Histogram(
"list_schema",
api.WithUnit("microseconds"),
api.WithDescription("Duration of listing schema in microseconds"),
)

if err != nil {
panic(err)
}

return &SchemaServer{
sw: sw,
sr: sr,
sw: sw,
sr: sr,
writeSchemaHistogram: writeSchemaHistogram,
readSchemaHistogram: readSchemaHistogram,
listSchemaHistogram: listSchemaHistogram,
}
}

// Write - Configure new Permify Schema to Permify
func (r *SchemaServer) Write(ctx context.Context, request *v1.SchemaWriteRequest) (*v1.SchemaWriteResponse, error) {
ctx, span := tracer.Start(ctx, "schemas.write")
defer span.End()
start := time.Now()

sch, err := parser.NewParser(request.GetSchema()).Parse()
if err != nil {
Expand Down Expand Up @@ -72,6 +112,9 @@ func (r *SchemaServer) Write(ctx context.Context, request *v1.SchemaWriteRequest
return nil, status.Error(GetStatus(err), err.Error())
}

duration := time.Now().Sub(start)
r.writeSchemaHistogram.Record(ctx, duration.Microseconds())

return &v1.SchemaWriteResponse{
SchemaVersion: version,
}, nil
Expand Down Expand Up @@ -192,6 +235,7 @@ func (r *SchemaServer) PartialWrite(ctx context.Context, request *v1.SchemaParti
func (r *SchemaServer) Read(ctx context.Context, request *v1.SchemaReadRequest) (*v1.SchemaReadResponse, error) {
ctx, span := tracer.Start(ctx, "schemas.read")
defer span.End()
start := time.Now()

version := request.GetMetadata().GetSchemaVersion()
if version == "" {
Expand All @@ -210,6 +254,9 @@ func (r *SchemaServer) Read(ctx context.Context, request *v1.SchemaReadRequest)
return nil, status.Error(GetStatus(err), err.Error())
}

duration := time.Now().Sub(start)
r.readSchemaHistogram.Record(ctx, duration.Microseconds())

return &v1.SchemaReadResponse{
Schema: response,
}, nil
Expand All @@ -219,6 +266,7 @@ func (r *SchemaServer) Read(ctx context.Context, request *v1.SchemaReadRequest)
func (r *SchemaServer) List(ctx context.Context, request *v1.SchemaListRequest) (*v1.SchemaListResponse, error) {
ctx, span := tracer.Start(ctx, "schemas.list")
defer span.End()
start := time.Now()

schemas, ct, err := r.sr.ListSchemas(ctx, request.GetTenantId(), database.NewPagination(database.Size(request.GetPageSize()), database.Token(request.GetContinuousToken())))
if err != nil {
Expand All @@ -233,6 +281,9 @@ func (r *SchemaServer) List(ctx context.Context, request *v1.SchemaListRequest)
return nil, status.Error(GetStatus(err), err.Error())
}

duration := time.Now().Sub(start)
r.listSchemaHistogram.Record(ctx, duration.Microseconds())

return &v1.SchemaListResponse{
Head: head,
Schemas: schemas,
Expand Down
Loading

0 comments on commit ff73ef2

Please sign in to comment.