-
Notifications
You must be signed in to change notification settings - Fork 156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Presto Queue Length Based Routing Manager #81
Conversation
145542e
to
681db56
Compare
@Inject private Notifier emailNotifier; | ||
@Inject private GatewayBackendManager gatewayBackendManager; | ||
@Inject | ||
private List<PrestoClusterStatsObserver> clusterStatsObservers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introducing listeners , which can reactive to the stats collected from the Active Cluster monitor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me!
I have a few nits, and a couple questions about how generating the sampling distribution works, but nothing that should block you from proceeding when you're ready.
} | ||
if (clusterStats.getNumWorkerNodes() < 1) { | ||
notifyForNoWorkers(clusterStats); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's a lot of boilerplate, but do you think we should split up the healthchecks to different observer classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could def do that . We also want to handle different routingGroup health checks differently . Ill add that as a follow up PR
|
||
for (ClusterStats stat : stats) { | ||
if (!clusterQueueMap.containsKey(stat.getRoutingGroup())) { | ||
clusterQueueMap.put(stat.getRoutingGroup(), new HashMap<String, Integer>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL you can do initializers like this in Java!
It might be slightly simpler to use Map.of
if we're on java9+.
@@ -89,4 +89,4 @@ public RoutingManager getRoutingManager() { | |||
public JdbcConnectionManager getConnectionManager() { | |||
return this.connectionManager; | |||
} | |||
} | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Missing newline at EOF
|
||
|
||
/** | ||
* Performs routing to an adhoc backend based compute weights base don cluster queue depth. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: base don
-> based on
/** | ||
* Performs routing to an adhoc backend based compute weights base don cluster queue depth. | ||
* | ||
* <p>d. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo?
* provisioned or not. | ||
*/ | ||
|
||
if (maxQueueLn == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand what each branch of this conditional is doing — it reads to me like this is trying to set a max weight such that all of the weights will sum to a particular value, and establish a discontinuous jump in the weight distribution at the last-but-one-th element?
I think it might be simpler to reduce the weighting to a linear interpolation — pseudocode, but:
given queueSum.
for each cluster
fractionOfQueueSum = cluster.weight() / queueSum
alpha = 1 - fractionOfQueueSum
weight = alpha * MAX_WT
bucket_count = floor(NUM_BUCKETS * weight)
create_n_sampling_buckets(cluster.name, bucket_count)
If you're concerned about the bucket count not exactly matching up, we can either make the routing function do its modular math in terms of weightedDistributionRouting.size()
, or pad the map with an extra bucket pointing to the least-queued cluster, slightly skewing the distribution.
I believe this is basically what's already happening down on L156-157, but I think perhaps we don't need to do any of this conditional special-casing for small routing groups. If we assign each cluster a simple numeric weight (inversely proportional to its queue depth as a fraction of the whole) it should accomplish the same thing, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alex, My initial approach was as you mentioned. Wt would be Inverse of the queueDepth. As I ran my tests I realized that the distribution of queries ( especially when the query volume is low) didnt really work well by excluding the most queue up cluster even when the difference between the least queued Up & most queued Up was significant.
Just to clarify the the computed weights potentially change every min ( as often as the Active Cluster Monitor) and since the gateway is HA with a fleet of proxy hosts, each proxy host would get a small number of queries.
The way this is designed is to have each of the proxy host to be stateless and agnostic of how the query distribution is happening at a global level.
To account for the above I wanted to tweak the weight given to the most queued up cluster. Idea is to set a very negligible wt to the most queued up cluster when there are other lesser util clusters avail. Almost removing it from the mix but not really.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bearcage Experimented with suggestion to use a sqr of the inverse (computed wt) ,but that does not work well for cases with some what even distribution. Ive tried to simplify the math to a large extent, to make the code more readable. I however still need to specially handle some edge cases.
@ssanthanam185 could you share a sample configuration for using Queue Length Based Routing Manager ? |
@puneetjaiswal would be great if there was a sample guide for this feature! |
Hello, I wrote a pull request to make this router function available, after understanding structure of the codebase. (I enjoyed your DEVIEW presentation about Trino, Thank you..!) |
a. There is a huge difference between the least queued up & most queued up cluster, in which case the most queued Up cluster is given a negligible wt such that it almost get no new requests.
b. there diff is not huge between least & most queued up in which case most queued Up cluster is given a small-ish wt such that it get sonly a few requests in favor of the other healthier clusters.
c. All clusters have equal load OR 0 load , at which point its a even distribution.
With this new refactor, when users want to Use the ActiveClusterMonitor managed App, make sure to also add the module ClusterStateListenerModule to gateway-ha-config.yml