-
Notifications
You must be signed in to change notification settings - Fork 753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HTTP Analytics Module #3299
HTTP Analytics Module #3299
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused as to why the tests are failing in the PR, when I ran validation locally, everything passed.
But here's what I'm seeing on the PR:
2023/11/15 12:17:25 http: superfluous response.WriteHeader call from github.com/prebid/prebid-server/v2/exchange.TestExtraBidWithMultiCurrencies.func1 (bidder_test.go:2958)
fatal error: concurrent map iteration and map write
Validation error message looked unrelated to the chenges in this PR, it now looks better after jobs re-run |
8c8e377
to
bb1e9bf
Compare
Thanks for the quick review - I'll updated the PR. |
76b63f7
to
0193a3e
Compare
Thanks for the review @SyntaxNode - i've rebased master and updated the PR |
Code coverage summaryNote:
appnexusRefer here for heat map coverage report
|
@VeronikaSolovei9 // @AlexBVolcy - Is there anything outstanding on my end, or is there anything I can help to get this merged? |
@@ -459,6 +460,49 @@ func (cfg *CurrencyConverter) validate(errs []error) []error { | |||
return errs | |||
} | |||
|
|||
type AnalyticsHttp struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please specify default values using the viper SetDefault
function for each config option in SetupViper
.
You should then update TestDefaults
and TestFullConfig
in config/config_test.go
.
config/config.go
Outdated
// Enable or disable the module | ||
Enabled bool `mapstructure:"enabled"` | ||
// Endpoint Config | ||
Endpoint AnalyticsHttpEndpoint `mapstructure:"endpoint"` | ||
// Buffer, triggers a flush whan any of the conditions are met | ||
Buffers AnalyticsBuffer `mapstructure:"buffers"` | ||
// Features |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: I think we can remove all of these comments; IMO the names are self-explanatory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree I think the comments can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bsardo Do you want me to remove all comments in the config or only the selected lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steffenmllr I think some of the other comments could be removed as well. Your readme does provide clarity on the meaning of some of the files which is good so the comments in the code can be redundant. It looks like you might have copied some of the config patterns from pubstack which makes sense but I will say that in some cases it might make sense to deviate from pubstack and include the units in the config name if the units are fixed and ditch the comment. For example,
// HTTP Timeout in milliseconds
Timeout string `mapstructure:"timeout"`
could be changed to:
Timeout string `mapstructure:"timeout_ms"`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bsardo I've removed all comments and clarified in the README. That comments was also wrong. This is a time.Duration
and not ms
. If you want me to change this to int
and _ms
let me know
analytics/http/filters.go
Outdated
|
||
return true | ||
}, nil | ||
} | ||
|
||
func createNotificationFilter( | ||
feature config.AnalyticsFeature, | ||
randomGenerator randomutil.RandomGenerator, | ||
) (notificationFilter, error) { | ||
var filterProgram *vm.Program | ||
var err error | ||
|
||
if feature.Filter != "" { | ||
filterProgram, err = expr.Compile(feature.Filter, expr.Env(analytics.NotificationEvent{}), expr.AsBool()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
return func(event *analytics.NotificationEvent) bool { | ||
if event == nil || feature.SampleRate <= 0 || randomGenerator.GenerateFloat64() > feature.SampleRate { | ||
return false | ||
} | ||
|
||
if filterProgram != nil { | ||
output, err := expr.Run(filterProgram, event) | ||
if err != nil { | ||
glog.Errorf("[HttpAnalytics] Error filter notification: %v", err) | ||
return false | ||
} | ||
return output.(bool) | ||
} | ||
|
||
return true | ||
}, nil | ||
} | ||
|
||
func createSetUIDFilter( | ||
feature config.AnalyticsFeature, | ||
randomGenerator randomutil.RandomGenerator, | ||
) (setUIDFilter, error) { | ||
var filterProgram *vm.Program | ||
var err error | ||
|
||
if feature.Filter != "" { | ||
filterProgram, err = expr.Compile(feature.Filter, expr.Env(analytics.SetUIDObject{}), expr.AsBool()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
return func(event *analytics.SetUIDObject) bool { | ||
if event == nil || feature.SampleRate <= 0 || randomGenerator.GenerateFloat64() > feature.SampleRate { | ||
return false | ||
} | ||
|
||
if filterProgram != nil { | ||
output, err := expr.Run(filterProgram, event) | ||
if err != nil { | ||
glog.Errorf("[HttpAnalytics] Error filter setUID: %v", err) | ||
return false | ||
} | ||
return output.(bool) | ||
} | ||
|
||
return true | ||
}, nil | ||
} | ||
|
||
func createVideoFilter( | ||
feature config.AnalyticsFeature, | ||
randomGenerator randomutil.RandomGenerator, | ||
) (videoFilter, error) { | ||
var filterProgram *vm.Program | ||
var err error | ||
|
||
if feature.Filter != "" { | ||
filterProgram, err = expr.Compile(feature.Filter, expr.Env(analytics.VideoObject{}), expr.AsBool()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
return func(event *analytics.VideoObject) bool { | ||
if event == nil || feature.SampleRate <= 0 || randomGenerator.GenerateFloat64() > feature.SampleRate { | ||
return false | ||
} | ||
|
||
if filterProgram != nil { | ||
output, err := expr.Run(filterProgram, event) | ||
if err != nil { | ||
glog.Errorf("[HttpAnalytics] Error filter video: %v", err) | ||
return false | ||
} | ||
return output.(bool) | ||
} | ||
|
||
return true | ||
}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we can significantly reduce the amount of code in this file by using generics:
func createFilter [T analytics.AuctionObject | analytics.AmpObject | analytics.VideoObject | analytics.SetUIDObject | analytics.CookieSyncObject | analytics.NotificationEvent](
feature config.AnalyticsFeature,
randomGenerator randomutil.RandomGenerator,
)(func (event *T) bool, error) {
var filterProgram *vm.Program
var err error
if feature.Filter != "" {
var obj T
// precompile the filter expression for performance, make sure we return a boolean from the expression
filterProgram, err = expr.Compile(feature.Filter, expr.Env(obj), expr.AsBool())
if err != nil {
return nil, err
}
}
return func(event *T) bool {
// Disable tracking for nil events or events with a sample rate of 0
if event == nil || feature.SampleRate <= 0 || randomGenerator.GenerateFloat64() > feature.SampleRate {
return false
}
// Use a filter is one is defined
if filterProgram != nil {
output, err := expr.Run(filterProgram, event)
if err != nil {
glog.Errorf("[HttpAnalytics] Error filter: %v", err)
return false
}
return output.(bool)
}
return true
}, nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I like the idea of consolidating this code.
For my own understanding because I haven't worked with generics that much, what would it look like to actually call createFilter()
in the place of createAuctionFilter()
for example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the hint with generics, refactored the code and the tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I like the idea of consolidating this code.
For my own understanding because I haven't worked with generics that much, what would it look like to actually call
createFilter()
in the place ofcreateAuctionFilter()
for example?
@AlexBVolcy see analytics/http/http_module.go#newHttpLogger
analytics/http/http_module.go
Outdated
// parse max event count | ||
pSize, err := units.FromHumanSize(cfg.Buffers.BufferSize) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// parse duration | ||
pDuration, err := time.ParseDuration(cfg.Buffers.Timeout) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Check for filters | ||
randomGenerator := randomutil.RandomNumberGenerator{} | ||
shouldTrackAuction, err := createAuctionFilter(cfg.Auction, randomGenerator) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
shouldTrackAmp, err := createAmpFilter(cfg.Auction, randomGenerator) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
shouldTrackCookieSync, err := createCookieSyncFilter(cfg.CookieSync, randomGenerator) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
shouldTrackNotification, err := createNotificationFilter(cfg.Notification, randomGenerator) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
shouldTrackSetUID, err := createSetUIDFilter(cfg.SetUID, randomGenerator) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
shouldTrackVideo, err := createVideoFilter(cfg.Video, randomGenerator) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// init buffer | ||
buffer := bytes.Buffer{} | ||
buffer.Write([]byte("[")) | ||
|
||
// Check for filters | ||
return &HttpLogger{ | ||
sender: sender, | ||
clock: clock, | ||
|
||
maxBufferByteSize: pSize, | ||
eventCount: 0, | ||
maxEventCount: int64(cfg.Buffers.EventCount), | ||
maxDuration: pDuration, | ||
|
||
// Filters | ||
shouldTrackAuction: shouldTrackAuction, | ||
shouldTrackAmp: shouldTrackAmp, | ||
shouldTrackCookieSync: shouldTrackCookieSync, | ||
shouldTrackNotification: shouldTrackNotification, | ||
shouldTrackSetUID: shouldTrackSetUID, | ||
shouldTrackVideo: shouldTrackVideo, | ||
|
||
buffer: buffer, | ||
bufferCh: make(chan []byte), | ||
sigTermCh: make(chan os.Signal), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: let's make this a little more compact by removing all of the comments and blank lines in this function except for the two blank lines around
// init buffer
buffer := bytes.Buffer{}
buffer.Write([]byte("["))
analytics/http/http_module.go
Outdated
sender httpSender | ||
config *config.AnalyticsHttp | ||
clock clock.Clock | ||
|
||
eventCount int64 | ||
maxEventCount int64 | ||
maxBufferByteSize int64 | ||
maxDuration time.Duration | ||
|
||
shouldTrackAuction auctionFilter | ||
shouldTrackAmp ampFilter | ||
shouldTrackCookieSync cookieSyncFilter | ||
shouldTrackNotification notificationFilter | ||
shouldTrackSetUID setUIDFilter | ||
shouldTrackVideo videoFilter | ||
|
||
mux sync.RWMutex | ||
sigTermCh chan os.Signal | ||
buffer bytes.Buffer | ||
bufferCh chan []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: remove blank lines
w := gzip.NewWriter(&b) | ||
_, err := w.Write([]byte(requestBody)) | ||
if err != nil { | ||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be calling w.Close()
when this error occurs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, good catch!
analytics/http/filters.go
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: please rename to filter.go
and filter_test.go
to follow go conventions.
analytics/http/http_module.go
Outdated
if l.eventCount == 0 || l.buffer.Len() == 0 { | ||
return | ||
} | ||
l.mux.Lock() | ||
defer l.reset() | ||
defer l.mux.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does a race condition exist in this function? Will there ever be a case where two threads are running flush
at the same time? If so, I can see one thread being blocked on line 174 while the other thread flushes the buffer and then releases the lock prior to the reset since defers are processed as LIFO. This would make it possible for the previously blocked thread to grab the lock and start attempting to flush before buffer has been reset, which could introduce problems. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, Thanks! I've removed the defers to avoid the LIFO mistake and called Unlock
and reset
in order
@steffenmllr for any changes going forward, if you could avoid force pushing and instead just push up any new commits that would be great. That will ensure a quicker re-review. We will squash everything when it gets merged. Thanks! |
Code coverage summaryNote:
appnexusRefer here for heat map coverage report
|
@bsardo Thanks for the review. I'll update the PR with your comments. Sorry for the force-push on the rebase (habit) - I'll only push from now on |
Code coverage summaryNote:
appnexusRefer here for heat map coverage report
|
Code coverage summaryNote:
appnexusRefer here for heat map coverage report
|
@bsardo I've renamed the comment, let me know if I need to change anything else |
Please summarize the JSON format for this adapter so we can put it on the docs.prebid.org site. We'll add this detail to https://docs.prebid.org/prebid-server/developers/pbs-build-an-analytics-adapter.html |
@bretg Sure, I'll add a PR to the documentation repository to address this. If you have any specific format or level of detail in mind, please feel free to share your preferences! |
Thanks. It would be helpful for analytics endpoints to have a reference for what fields could be sent, data types, etc. It would be ok to link out to other resources. I didn't read this code to see what is being proposed, but here's a general example:
If there are special fields or missing fields, that would all need to be listed in the reference. |
Code coverage summaryNote:
appnexusRefer here for heat map coverage report
|
Code coverage summaryNote:
appnexusRefer here for heat map coverage report
|
@bretg I've added a first draft here: prebid/prebid.github.io#5021 - let me know if this is the kind of detail you are looking for. |
This is fine for now @steffenmllr - thanks. If you allow me to write to your fork, I'll fix the linting errors and make a couple of wordsmithing tweaks. |
@bretg great thanks! I've added you the the forked docs repo |
Hmm. Didn't seem to work. Still don't have edit access. |
@bretg I think you need to accept the invite to push into the PR https://github.com/mllrsohn/prebid.github.io/invitations |
@steffenmllr sorry for the delay. Overall this looks good. The Go and Java teams are working to ensure that we're aligned before merging this in. I imagine it will be merged just after the new year. |
Working on trying to document an external interface, I realized that there are privacy concerns here. It might be that PBS-Core already takes care of all of this, but just to make sure, here are the requirements:
|
Sorry to be a stickler here, but I think trying to define a generic JSON structure for analytics adapters is harder than it seems. Devil in the details. I went through the account structure field-by-field and identified several items that don't belong in JSON going to an analytics endpoint. Specifically:
All the rows marked in red on the accounts tab of https://docs.google.com/spreadsheets/d/1Rvhd6T7NKZdzqpYQNmi2Xzqxos6-HLKTZX1aXbZrVNk/edit?usp=sharing Also, a couple of items are datastructures in Go, but I think ought to be arrays of strings in a JSON representation. I'll work on the module stuff tomorrow. |
The subcommittee met and discussed. We're behind this effort, but it's larger than it looks. I would expect this to take a couple of months.
Several action items were assigned to members and we meet again in 2 weeks. |
@bretg - Happy new year and thanks for the update! Our use case, as outlined in #3281, is actually more specific. We at agma (https://www.agma-mmc.de/) collect only minimal bid data, limited to certain prebid server accounts (hence the filter), as well as the openrtb site and user object in the bid request, similar to our Prebid.js analytics adapter. This means we gather a limited amount of data and don't cover all endpoints. I'd love to support this effort for a generic adapter, but I also want to provide our members with the option to use our adapter as soon as possible. Given that we don't want to send every field and support log-level data on a account level I was wondering if we can think of another kind of analytics interface I wanted to run by you. I've create a sample config for this: analytics:
- name: 'some-analytics' # can be multiple collectors, so it would work for usecase 2: one url per account
transport: # transport can be something like `gRPC`, `http` with buffer is already implemented
type: http
options: # same as in this PR
url: "https://my-rest-endpoint.com"
timeout: "2s"
gzip: false
additional_headers:
X-My-header: "some-thing"
buffer:
size: "2MB"
count : 100
timeout: "15m"
auction:
sample_rate: 0.25 # same as in this PR
processor: "generic" # this would be the official processor with all fields that need a json schema
video:
sample_rate: 1
processor: "agma" # the processor only returns the data needed, implemented as a golang interface
filter: "Request.AccountID in [123, 5432]" # filter would be the same as this PR, usecase 2
... The This solution would allow the subcommittee time to finalize the official analytics protobuf / JSON schema, while providing a solution for our usecase now. What do you think about this ? |
@steffenmllr - appreciate your thoughts here.
I'd like to ask... why do you need this custom analytics code open-sourced? Magnite has a private analytics module, and I imagine many other do as well. We have a fork of Prebid Server where the private analytics module lives into which we frequently merge the open source repo. If you distribute Prebid Server to your members, this model could also work for you. If, on the otherhand, each of your members is responsible for maintaining their own versions of Prebid Server, then I get why you'd want it open-sourced.
If you need this open-sourced and custom and soon, then just go ahead and create an "agma analytics module" - and we can just drop the idea of trying to make it generic for now. That would let the sub-committee take their time to work through Go/Java differences and make a proposal that can be reviewed and prioritized. If you decide to just convert this into an agma-specific thing, please consider the GDPR/TCF angle -- the Prebid Server rule is that analytics modules are supposed to have a GVLID and PBS-core checks that the vendor has user Purpose 7 consent before passing the request to it. |
Hi @bretg - each member is responsible for hosting their own server, and some of them use a hosting provider. This is why we want to open source it, as this approach makes it easier for everyone involved to update and access the code. I thought making this more generic would benefit others as well, but I do see the effort involved in creating a schema that works for everybody in both systems Java and Golang across all events. I have moved this complexity to the analytics receiver in my head. For now, I will also make another PR with an agma-only adapter until the sub-committee has time to work through the Java/Golang differences. I'll make sure to consider the GDPR/TCF angle and add an extra check for the TC string. This does reduce complexity quite a lot. However, I'm still interested in joining the effort to build a generic analytics adapter and would like to keep this PR open as a discussion if possible. Is there a working group I can join since this covers multiple code bases? How do you organize this work? |
Great, that will make your partners happy - go for it.
I've opened #3388 to track generic analytics. You can update this PR as desired. Thanks! |
import ( | ||
"testing" | ||
|
||
"github.com/prebid/openrtb/v19/openrtb2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We updated the OpenRTB library to version 20. Please merge with the master branch and change these imports to "github.com/prebid/openrtb/v20/openrtb2"
Just a friendly ping, are you still working on this? |
I think it's better to track this in #3388 - I would love to implement this as soon as you documented the requirements |
Following up on #3281, I have rewritten our analytics implementation into a more generally usable module, which may also benefit others.
If enabled, it buffers all events from the analytics.Module interface in memory and triggers an HTTP request when either a specified buffer is full, a timer is triggered, or an event count is reached (similar to the Pubstack implementation).
The generated payload is always a valid JSON array, and it can be gzipped before sending to reduce network bandwidth.
To further reduce the number of sent events, there is a sample rate and the possibility to filter for specific events. It uses https://github.com/antonmedv/expr for this purpose.
Code coverage is around 82.0% and should cover all relevant use cases.