Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Feat/gateway cluster monitoring #98

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package com.lyft.data.gateway.ha.clustermonitor;

import com.lyft.data.gateway.ha.config.DataStoreConfiguration;
import com.lyft.data.gateway.ha.notifier.Notifier;
import com.lyft.data.gateway.ha.persistence.JdbcConnectionManager;
import com.lyft.data.gateway.ha.router.GatewayBackendStateManager.GatewayBackendState;
import com.lyft.data.gateway.ha.router.HaGatewayBackendStateManager;
import java.util.List;


public class HealthChecker implements PrestoClusterStatsObserver {
private static final int MAX_THRESHOLD_QUEUED_QUERY_COUNT = 100;
private Notifier notifier;
private HaGatewayBackendStateManager haGatewayBackendStateManager;

public HealthChecker(Notifier notifier) {
this.notifier = notifier;
Expand All @@ -24,24 +30,26 @@ public void observe(List<ClusterStats> clustersStats) {
notifyForNoWorkers(clusterStats);
}
}
GatewayBackendState backend = new GatewayBackendState();
backend.setName(clusterStats.getClusterId());
backend.setHealth(clusterStats.isHealthy());
backend.setWorkerCount(clusterStats.getNumWorkerNodes());
GatewayBackendState updated = haGatewayBackendStateManager.addBackend(backend);
}
}

private void notifyUnhealthyCluster(ClusterStats clusterStats) {
notifier.sendNotification(String.format("%s - Cluster unhealthy",
clusterStats.getClusterId()),
clusterStats.toString());
clusterStats.getClusterId()),
clusterStats.toString());
}

private void notifyForTooManyQueuedQueries(ClusterStats clusterStats) {
notifier.sendNotification(String.format("%s - Too many queued queries",
clusterStats.getClusterId()), clusterStats.toString());
clusterStats.getClusterId()), clusterStats.toString());
}

private void notifyForNoWorkers(ClusterStats clusterStats) {
notifier.sendNotification(String.format("%s - Number of workers",
clusterStats.getClusterId()), clusterStats.toString());
clusterStats.getClusterId()), clusterStats.toString());
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.lyft.data.gateway.ha.handler.QueryIdCachingProxyHandler;
import com.lyft.data.gateway.ha.persistence.JdbcConnectionManager;
import com.lyft.data.gateway.ha.router.GatewayBackendManager;
import com.lyft.data.gateway.ha.router.GatewayBackendStateManager;
import com.lyft.data.gateway.ha.router.HaGatewayBackendStateManager;
import com.lyft.data.gateway.ha.router.HaGatewayManager;
import com.lyft.data.gateway.ha.router.HaQueryHistoryManager;
import com.lyft.data.gateway.ha.router.HaRoutingManager;
Expand All @@ -25,6 +27,7 @@ public class HaGatewayProviderModule extends AppModule<HaGatewayConfiguration, E
private final QueryHistoryManager queryHistoryManager;
private final RoutingManager routingManager;
private final JdbcConnectionManager connectionManager;
private final GatewayBackendStateManager gatewayBackendStateManager;

public HaGatewayProviderModule(HaGatewayConfiguration configuration, Environment environment) {
super(configuration, environment);
Expand All @@ -33,6 +36,7 @@ public HaGatewayProviderModule(HaGatewayConfiguration configuration, Environment
queryHistoryManager = new HaQueryHistoryManager(connectionManager);
routingManager =
new HaRoutingManager(gatewayBackendManager, (HaQueryHistoryManager) queryHistoryManager);
gatewayBackendStateManager = new HaGatewayBackendStateManager(connectionManager);
}

protected ProxyHandler getProxyHandler() {
Expand Down Expand Up @@ -84,10 +88,16 @@ public RoutingManager getRoutingManager() {
return this.routingManager;
}

@Provides
@Singleton
public GatewayBackendStateManager getGatewayBackendStateManager() {
return this.gatewayBackendStateManager;
}

@Provides
@Singleton
public JdbcConnectionManager getConnectionManager() {
return this.connectionManager;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.lyft.data.gateway.ha.persistence.dao;

import com.lyft.data.gateway.ha.router.GatewayBackendStateManager.GatewayBackendState;
import java.util.ArrayList;
import java.util.List;
import org.javalite.activejdbc.Model;
import org.javalite.activejdbc.annotations.Cached;
import org.javalite.activejdbc.annotations.IdName;
import org.javalite.activejdbc.annotations.Table;

@Table("gateway_backend_state")
@IdName("name")
@Cached
public class GatewayBackendStateHistory extends Model {
//TODO: Create an active jdbc model along with a function to create and update entries
private static final String name = "name";
private static final String health = "health";
private static final String workerCount = "worker_count";

public static List<GatewayBackendState> upcast(List<GatewayBackendStateHistory>
gatewayBackendStateHistoryList) {
List<GatewayBackendState> gatewayBackendStates = new ArrayList<>();
for (GatewayBackendStateHistory dao : gatewayBackendStateHistoryList) {
GatewayBackendState gatewayBackendState = new GatewayBackendState();
gatewayBackendState.setName(dao.getString(name));
gatewayBackendState.setHealth(dao.getBoolean(health));
gatewayBackendState.setWorkerCount(dao.getInteger(workerCount));
}
return gatewayBackendStates;
}

public static void create(GatewayBackendStateHistory model,
GatewayBackendState gatewayBackendState) {
model.set(name, gatewayBackendState.getName());
model.set(health, gatewayBackendState.getHealth());
model.set(workerCount, gatewayBackendState.getWorkerCount());
model.insert();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.inject.Inject;
import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration;
import com.lyft.data.gateway.ha.router.GatewayBackendManager;
import com.lyft.data.gateway.ha.router.GatewayBackendStateManager;
import io.dropwizard.views.View;

import java.io.IOException;
Expand All @@ -31,6 +32,7 @@
public class EntityEditorResource {

@Inject private GatewayBackendManager gatewayBackendManager;
@Inject private GatewayBackendStateManager gatewayBackendStateManager;
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@GET
Expand All @@ -46,7 +48,8 @@ protected EntityView(String templateName) {
}

private enum EntityType {
GATEWAY_BACKEND
GATEWAY_BACKEND,
GATEWAY_BACKEND_STATE
}

@GET
Expand Down Expand Up @@ -87,6 +90,8 @@ public Response getAllEntitiesForType(@PathParam("entityType") String entityType
switch (entityType) {
case GATEWAY_BACKEND:
return Response.ok(gatewayBackendManager.getAllBackends()).build();
case GATEWAY_BACKEND_STATE:
return Response.ok(gatewayBackendStateManager.getAllBackendStates()).build();
default:
}
return Response.ok(ImmutableList.of()).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.lyft.data.gateway.ha.router;

//import com.lyft.data.gateway.ha.persistence.dao.GatewayBackendStateHistory;
import java.util.List;
import lombok.Data;
import lombok.ToString;

public interface GatewayBackendStateManager {

List<GatewayBackendState> getAllBackendStates();

GatewayBackendState addBackend(GatewayBackendState backend);

@Data
@ToString
class GatewayBackendState {

private String name;
private Boolean health;
private int workerCount;

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.lyft.data.gateway.ha.router;

import com.lyft.data.gateway.ha.persistence.JdbcConnectionManager;
import com.lyft.data.gateway.ha.persistence.dao.GatewayBackendStateHistory;

import java.util.List;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HaGatewayBackendStateManager implements GatewayBackendStateManager {

private JdbcConnectionManager connectionManager;

public HaGatewayBackendStateManager(JdbcConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}

@Override
public List<GatewayBackendState> getAllBackendStates() {
//TODO implement method to obtain all cluster states
try {
connectionManager.open();
List<GatewayBackendStateHistory> proxyBackendList = GatewayBackendStateHistory.findAll();
return GatewayBackendStateHistory.upcast(proxyBackendList);
} finally {
connectionManager.close();
}
}

@Override
public GatewayBackendState addBackend(GatewayBackendState backend) {
try {
connectionManager.open();
GatewayBackendStateHistory.create(new GatewayBackendStateHistory(), backend);
} finally {
connectionManager.close();
}

return backend;
}
}