-
Notifications
You must be signed in to change notification settings - Fork 156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Presto Queue Length Based Routing Manager #81
Changes from 4 commits
223dc8c
66b3a2e
0cae4b1
681db56
6b6d6eb
1efaa51
f8d804f
7c7b099
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
.editorconfig | ||
.idea | ||
**/target/* | ||
.DS_Store | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package com.lyft.data.gateway.ha.clustermonitor; | ||
|
||
import lombok.Data; | ||
import lombok.ToString; | ||
|
||
@Data | ||
@ToString | ||
public class ClusterStats { | ||
private int runningQueryCount; | ||
private int queuedQueryCount; | ||
private int blockedQueryCount; | ||
private int numWorkerNodes; | ||
private boolean healthy; | ||
private String clusterId; | ||
private String proxyTo; | ||
private String routingGroup; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package com.lyft.data.gateway.ha.clustermonitor; | ||
|
||
import com.lyft.data.gateway.ha.notifier.Notifier; | ||
import java.util.List; | ||
|
||
public class HealthChecker implements PrestoClusterStatsObserver { | ||
private static final int MAX_THRESHOLD_QUEUED_QUERY_COUNT = 100; | ||
private Notifier notifier; | ||
|
||
public HealthChecker(Notifier notifier) { | ||
this.notifier = notifier; | ||
} | ||
|
||
@Override | ||
public void observe(List<ClusterStats> clustersStats) { | ||
for (ClusterStats clusterStats : clustersStats) { | ||
if (!clusterStats.isHealthy()) { | ||
notifyUnhealthyCluster(clusterStats); | ||
} else { | ||
if (clusterStats.getQueuedQueryCount() > MAX_THRESHOLD_QUEUED_QUERY_COUNT) { | ||
notifyForTooManyQueuedQueries(clusterStats); | ||
} | ||
if (clusterStats.getNumWorkerNodes() < 1) { | ||
notifyForNoWorkers(clusterStats); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know it's a lot of boilerplate, but do you think we should split up the healthchecks to different observer classes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could def do that . We also want to handle different routingGroup health checks differently . Ill add that as a follow up PR |
||
} | ||
} | ||
} | ||
|
||
private void notifyUnhealthyCluster(ClusterStats clusterStats) { | ||
notifier.sendNotification(String.format("%s - Cluster unhealthy", | ||
clusterStats.getClusterId()), | ||
clusterStats.toString()); | ||
} | ||
|
||
private void notifyForTooManyQueuedQueries(ClusterStats clusterStats) { | ||
notifier.sendNotification(String.format("%s - Too many queued queries", | ||
clusterStats.getClusterId()), clusterStats.toString()); | ||
} | ||
|
||
private void notifyForNoWorkers(ClusterStats clusterStats) { | ||
notifier.sendNotification(String.format("%s - Number of workers", | ||
clusterStats.getClusterId()), clusterStats.toString()); | ||
} | ||
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package com.lyft.data.gateway.ha.clustermonitor; | ||
|
||
import java.util.List; | ||
|
||
public interface PrestoClusterStatsObserver { | ||
|
||
void observe(List<ClusterStats> stats); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package com.lyft.data.gateway.ha.clustermonitor; | ||
|
||
import com.lyft.data.gateway.ha.router.PrestoQueueLengthRoutingTable; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** | ||
* Updates the QueueLength Based Routing Manager {@link PrestoQueueLengthRoutingTable} with | ||
* updated queue lengths of active clusters. | ||
*/ | ||
public class PrestoQueueLengthChecker implements PrestoClusterStatsObserver { | ||
|
||
PrestoQueueLengthRoutingTable routingManager; | ||
|
||
public PrestoQueueLengthChecker(PrestoQueueLengthRoutingTable routingManager) { | ||
this.routingManager = routingManager; | ||
} | ||
|
||
@Override | ||
public void observe(List<ClusterStats> stats) { | ||
Map<String, Map<String, Integer>> clusterQueueMap = new HashMap<String, Map<String, Integer>>(); | ||
|
||
for (ClusterStats stat : stats) { | ||
if (!clusterQueueMap.containsKey(stat.getRoutingGroup())) { | ||
clusterQueueMap.put(stat.getRoutingGroup(), new HashMap<String, Integer>() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TIL you can do initializers like this in Java! It might be slightly simpler to use |
||
{ | ||
put(stat.getClusterId(), stat.getQueuedQueryCount()); | ||
} | ||
} | ||
); | ||
} else { | ||
clusterQueueMap.get(stat.getRoutingGroup()).put(stat.getClusterId(), | ||
stat.getQueuedQueryCount()); | ||
} | ||
} | ||
|
||
routingManager.updateRoutingTable(clusterQueueMap); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package com.lyft.data.gateway.ha.module; | ||
|
||
import com.google.inject.Provides; | ||
import com.google.inject.Singleton; | ||
import com.lyft.data.baseapp.AppModule; | ||
import com.lyft.data.gateway.ha.clustermonitor.HealthChecker; | ||
import com.lyft.data.gateway.ha.clustermonitor.PrestoClusterStatsObserver; | ||
import com.lyft.data.gateway.ha.config.HaGatewayConfiguration; | ||
import com.lyft.data.gateway.ha.config.NotifierConfiguration; | ||
import com.lyft.data.gateway.ha.notifier.EmailNotifier; | ||
import io.dropwizard.setup.Environment; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
public class ClusterStateListenerModule extends AppModule<HaGatewayConfiguration, Environment> { | ||
List<PrestoClusterStatsObserver> observers; | ||
|
||
public ClusterStateListenerModule(HaGatewayConfiguration config, Environment env) { | ||
super(config, env); | ||
} | ||
|
||
/** | ||
* Observers to cluster stats updates from | ||
* {@link com.lyft.data.gateway.ha.clustermonitor.ActiveClusterMonitor}. | ||
* | ||
* @return | ||
*/ | ||
@Provides | ||
@Singleton | ||
public List<PrestoClusterStatsObserver> getClusterStatsObservers() { | ||
observers = new ArrayList<>(); | ||
NotifierConfiguration notifierConfiguration = getConfiguration().getNotifier(); | ||
observers.add(new HealthChecker(new EmailNotifier(notifierConfiguration))); | ||
return observers; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,4 +89,4 @@ public RoutingManager getRoutingManager() { | |
public JdbcConnectionManager getConnectionManager() { | ||
return this.connectionManager; | ||
} | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Missing newline at EOF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introducing listeners , which can reactive to the stats collected from the Active Cluster monitor