Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Presto Queue Length Based Routing Manager #81

Merged
merged 8 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.editorconfig
.idea
**/target/*
.DS_Store
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package com.lyft.data.gateway.ha;
package com.lyft.data.gateway.ha.clustermonitor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration;
import com.lyft.data.gateway.ha.notifier.Notifier;
import com.lyft.data.gateway.ha.router.GatewayBackendManager;
import io.dropwizard.lifecycle.Managed;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
Expand All @@ -20,27 +18,28 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.HttpMethod;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;

@Slf4j
public class ActiveClusterMonitor implements Managed {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int BACKEND_CONNECT_TIMEOUT_SECONDS = 15;
private static final int MAX_THRESHOLD_QUEUED_QUERY_COUNT = 100;

private static final int MONITOR_TASK_DELAY_MIN = 1;

@Inject private Notifier emailNotifier;
@Inject private GatewayBackendManager gatewayBackendManager;
@Inject
private List<PrestoClusterStatsObserver> clusterStatsObservers;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing listeners , which can reactive to the stats collected from the Active Cluster monitor

@Inject
private GatewayBackendManager gatewayBackendManager;

private volatile boolean monitorActive = true;

private ExecutorService executorService = Executors.newFixedThreadPool(10);
private ExecutorService singleTaskExecutor = Executors.newSingleThreadExecutor();

/**
* Run an app that queries all active presto clusters for stats.
*/
public void start() {
singleTaskExecutor.submit(
() -> {
Expand All @@ -54,20 +53,18 @@ public void start() {
executorService.submit(() -> getPrestoClusterStats(backend));
futures.add(call);
}

List<ClusterStats> stats = new ArrayList<>();
for (Future<ClusterStats> clusterStatsFuture : futures) {
ClusterStats clusterStats = clusterStatsFuture.get();
if (!clusterStats.isHealthy()) {
notifyUnhealthyCluster(clusterStats);
} else {
if (clusterStats.getQueuedQueryCount() > MAX_THRESHOLD_QUEUED_QUERY_COUNT) {
notifyForTooManyQueuedQueries(clusterStats);
}
if (clusterStats.getNumWorkerNodes() < 1) {
notifyForNoWorkers(clusterStats);
}
stats.add(clusterStats);
}

if (clusterStatsObservers != null) {
for (PrestoClusterStatsObserver observer : clusterStatsObservers) {
observer.observe(stats);
}
}

} catch (Exception e) {
log.error("Error performing backend monitor tasks", e);
}
Expand All @@ -80,22 +77,6 @@ public void start() {
});
}

private void notifyUnhealthyCluster(ClusterStats clusterStats) {
emailNotifier.sendNotification(String.format("%s - Cluster unhealthy",
clusterStats.getClusterId()),
clusterStats.toString());
}

private void notifyForTooManyQueuedQueries(ClusterStats clusterStats) {
emailNotifier.sendNotification(String.format("%s - Too many queued queries",
clusterStats.getClusterId()), clusterStats.toString());
}

private void notifyForNoWorkers(ClusterStats clusterStats) {
emailNotifier.sendNotification(String.format("%s - Number of workers",
clusterStats.getClusterId()), clusterStats.toString());
}

private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
ClusterStats clusterStats = new ClusterStats();
clusterStats.setClusterId(backend.getName());
Expand All @@ -122,6 +103,8 @@ private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
clusterStats.setQueuedQueryCount((int) result.get("queuedQueries"));
clusterStats.setRunningQueryCount((int) result.get("runningQueries"));
clusterStats.setBlockedQueryCount((int) result.get("blockedQueries"));
clusterStats.setProxyTo(backend.getProxyTo());
clusterStats.setRoutingGroup(backend.getRoutingGroup());
conn.disconnect();
}
} catch (Exception e) {
Expand All @@ -130,20 +113,13 @@ private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
return clusterStats;
}

/**
* Shut down the app.
*/
public void stop() {
this.monitorActive = false;
this.executorService.shutdown();
this.singleTaskExecutor.shutdown();
}

@Data
@ToString
public static class ClusterStats {
private int runningQueryCount;
private int queuedQueryCount;
private int blockedQueryCount;
private int numWorkerNodes;
private boolean healthy;
private String clusterId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.lyft.data.gateway.ha.clustermonitor;

import lombok.Data;
import lombok.ToString;

@Data
@ToString
public class ClusterStats {
private int runningQueryCount;
private int queuedQueryCount;
private int blockedQueryCount;
private int numWorkerNodes;
private boolean healthy;
private String clusterId;
private String proxyTo;
private String routingGroup;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.lyft.data.gateway.ha.clustermonitor;

import com.lyft.data.gateway.ha.notifier.Notifier;
import java.util.List;

public class HealthChecker implements PrestoClusterStatsObserver {
private static final int MAX_THRESHOLD_QUEUED_QUERY_COUNT = 100;
private Notifier notifier;

public HealthChecker(Notifier notifier) {
this.notifier = notifier;
}

@Override
public void observe(List<ClusterStats> clustersStats) {
for (ClusterStats clusterStats : clustersStats) {
if (!clusterStats.isHealthy()) {
notifyUnhealthyCluster(clusterStats);
} else {
if (clusterStats.getQueuedQueryCount() > MAX_THRESHOLD_QUEUED_QUERY_COUNT) {
notifyForTooManyQueuedQueries(clusterStats);
}
if (clusterStats.getNumWorkerNodes() < 1) {
notifyForNoWorkers(clusterStats);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's a lot of boilerplate, but do you think we should split up the healthchecks to different observer classes?

Copy link
Contributor Author

@ssanthanam185 ssanthanam185 Dec 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could def do that . We also want to handle different routingGroup health checks differently . Ill add that as a follow up PR

}
}
}

private void notifyUnhealthyCluster(ClusterStats clusterStats) {
notifier.sendNotification(String.format("%s - Cluster unhealthy",
clusterStats.getClusterId()),
clusterStats.toString());
}

private void notifyForTooManyQueuedQueries(ClusterStats clusterStats) {
notifier.sendNotification(String.format("%s - Too many queued queries",
clusterStats.getClusterId()), clusterStats.toString());
}

private void notifyForNoWorkers(ClusterStats clusterStats) {
notifier.sendNotification(String.format("%s - Number of workers",
clusterStats.getClusterId()), clusterStats.toString());
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.lyft.data.gateway.ha.clustermonitor;

import java.util.List;

public interface PrestoClusterStatsObserver {

void observe(List<ClusterStats> stats);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.lyft.data.gateway.ha.clustermonitor;

import com.lyft.data.gateway.ha.router.PrestoQueueLengthRoutingTable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Updates the QueueLength Based Routing Manager {@link PrestoQueueLengthRoutingTable} with
* updated queue lengths of active clusters.
*/
public class PrestoQueueLengthChecker implements PrestoClusterStatsObserver {

PrestoQueueLengthRoutingTable routingManager;

public PrestoQueueLengthChecker(PrestoQueueLengthRoutingTable routingManager) {
this.routingManager = routingManager;
}

@Override
public void observe(List<ClusterStats> stats) {
Map<String, Map<String, Integer>> clusterQueueMap = new HashMap<String, Map<String, Integer>>();

for (ClusterStats stat : stats) {
if (!clusterQueueMap.containsKey(stat.getRoutingGroup())) {
clusterQueueMap.put(stat.getRoutingGroup(), new HashMap<String, Integer>() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL you can do initializers like this in Java!

It might be slightly simpler to use Map.of if we're on java9+.

{
put(stat.getClusterId(), stat.getQueuedQueryCount());
}
}
);
} else {
clusterQueueMap.get(stat.getRoutingGroup()).put(stat.getClusterId(),
stat.getQueuedQueryCount());
}
}

routingManager.updateRoutingTable(clusterQueueMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.lyft.data.gateway.ha.module;

import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.lyft.data.baseapp.AppModule;
import com.lyft.data.gateway.ha.clustermonitor.HealthChecker;
import com.lyft.data.gateway.ha.clustermonitor.PrestoClusterStatsObserver;
import com.lyft.data.gateway.ha.config.HaGatewayConfiguration;
import com.lyft.data.gateway.ha.config.NotifierConfiguration;
import com.lyft.data.gateway.ha.notifier.EmailNotifier;
import io.dropwizard.setup.Environment;
import java.util.ArrayList;
import java.util.List;

public class ClusterStateListenerModule extends AppModule<HaGatewayConfiguration, Environment> {
List<PrestoClusterStatsObserver> observers;

public ClusterStateListenerModule(HaGatewayConfiguration config, Environment env) {
super(config, env);
}

/**
* Observers to cluster stats updates from
* {@link com.lyft.data.gateway.ha.clustermonitor.ActiveClusterMonitor}.
*
* @return
*/
@Provides
@Singleton
public List<PrestoClusterStatsObserver> getClusterStatsObservers() {
observers = new ArrayList<>();
NotifierConfiguration notifierConfiguration = getConfiguration().getNotifier();
observers.add(new HealthChecker(new EmailNotifier(notifierConfiguration)));
return observers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@ public RoutingManager getRoutingManager() {
public JdbcConnectionManager getConnectionManager() {
return this.connectionManager;
}
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Missing newline at EOF

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public List<QueryDetail> fetchQueryHistory() {
}
}

@Override
public String getBackendForQueryId(String queryId) {
String backend = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,11 @@

@Slf4j
public class HaRoutingManager extends RoutingManager {
HaQueryHistoryManager queryHistoryManager;
QueryHistoryManager queryHistoryManager;

public HaRoutingManager(
GatewayBackendManager gatewayBackendManager, HaQueryHistoryManager queryHistoryManager) {
GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager) {
super(gatewayBackendManager);
this.queryHistoryManager = queryHistoryManager;
}

protected String findBackendForUnknownQueryId(String queryId) {
String backend;
backend = queryHistoryManager.getBackendForQueryId(queryId);
if (Strings.isNullOrEmpty(backend)) {
backend = super.findBackendForUnknownQueryId(queryId);
}
return backend;
}
}
Loading