Skip to content

Commit

Permalink
Simplifying routing Math, Bumping up Artifact version
Browse files Browse the repository at this point in the history
  • Loading branch information
ssanthanam185 committed Dec 11, 2019
1 parent 681db56 commit 6b6d6eb
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 91 deletions.
2 changes: 1 addition & 1 deletion gateway-ha/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>com.lyft.data</groupId>
<artifactId>prestogateway-parent</artifactId>
<version>1.6.6</version>
<version>1.6.7</version>
<relativePath>../</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,5 @@ public RoutingManager getRoutingManager() {
public JdbcConnectionManager getConnectionManager() {
return this.connectionManager;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,16 @@
@Slf4j
public class PrestoQueueLengthRoutingTable extends HaRoutingManager {
private static final Random RANDOM = new Random();
private final Object lockObject = new Object();
private static final int MIN_WT = 1;
private static final int MAX_WT = 100;
private final Object lockObject = new Object();
private ConcurrentHashMap<String, Integer> routingGroupWeightSum;
private ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>> clusterQueueLengthMap;
private Map<String, TreeMap<Integer, String>> weightedDistributionRouting;

/**
* A Routing Manager that distributes queries according to assigned weights based on
* Presto cluster queue length.
*
* @param gatewayBackendManager
* @param queryHistoryManager
*/
public PrestoQueueLengthRoutingTable(GatewayBackendManager gatewayBackendManager,
QueryHistoryManager queryHistoryManager) {
Expand All @@ -49,13 +46,55 @@ public PrestoQueueLengthRoutingTable(GatewayBackendManager gatewayBackendManager
weightedDistributionRouting = new HashMap<String, TreeMap<Integer, String>>();
}

/**
* All wts are assigned as a fraction of maxQueueLn. Cluster with maxQueueLn should be
* given least weightage. What this value should be depends on the queueing on the rest of
* the clusters. since the list is sorted, the smallestQueueLn & lastButOneQueueLn give us
* the range. In an ideal situation all clusters should have equals distribution of
* queries, hence that is used as a threshold to check is a cluster queue is over
* provisioned or not.
*/
private int getWeightForMaxQueueCluster(LinkedHashMap<String, Integer> sortedByQueueLength) {
int queueSum = 0;
int numBuckets = 1;
int equalDistribution = 0;
int calculatedWtMaxQueue = 0;

numBuckets = sortedByQueueLength.size();
queueSum = sortedByQueueLength.values().stream().mapToInt(Integer::intValue).sum();
equalDistribution = queueSum / numBuckets;

Object[] queueLengths = sortedByQueueLength.values().toArray();

int smallestQueueLn = (Integer) queueLengths[0];
int maxQueueLn = (Integer) queueLengths[queueLengths.length - 1];
int lastButOneQueueLn = smallestQueueLn;
if (queueLengths.length > 2) {
lastButOneQueueLn = (Integer) queueLengths[queueLengths.length - 2];
}

if (maxQueueLn == 0) {
calculatedWtMaxQueue = MAX_WT;
} else if (lastButOneQueueLn == 0) {
calculatedWtMaxQueue = MIN_WT;
} else {
int lastButOneQueueWt = (MAX_WT - (lastButOneQueueLn * MAX_WT / maxQueueLn));
double fractionOfLastWt = (smallestQueueLn / (float) maxQueueLn);
calculatedWtMaxQueue = (int) Math.ceil(fractionOfLastWt * lastButOneQueueWt);

if (lastButOneQueueLn < equalDistribution || (lastButOneQueueLn > equalDistribution && smallestQueueLn <= equalDistribution)) {
calculatedWtMaxQueue = (smallestQueueLn == 0) ? MIN_WT :
(int) Math.ceil(fractionOfLastWt * fractionOfLastWt * lastButOneQueueWt);
}
}

return calculatedWtMaxQueue;
}

/**
* Uses the queue length of a cluster to assign weights to all active clusters in a routing group.
* The weights assigned ensure a fair distribution of routing for queries such that clusters with
* the least queue length get assigned more queries.
*
* @param queueLengthMap
*/
private void computeWeightsBasedOnQueueLength(ConcurrentHashMap<String,
ConcurrentHashMap<String, Integer>> queueLengthMap) {
Expand Down Expand Up @@ -95,61 +134,20 @@ private void computeWeightsBasedOnQueueLength(ConcurrentHashMap<String,
continue;
}

Map<String, Integer> sortedByQueueLength = queueLengthMap.get(routingGroup).entrySet()
LinkedHashMap<String, Integer> sortedByQueueLength = queueLengthMap.get(routingGroup)
.entrySet()
.stream().sorted(Comparator.comparing(Map.Entry::getValue))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
(e1, e2) -> e1, LinkedHashMap::new));

numBuckets = sortedByQueueLength.size();
queueSum = sortedByQueueLength.values().stream().mapToInt(Integer::intValue).sum();
equalDistribution = queueSum / numBuckets;

Object[] queueLengths = sortedByQueueLength.values().toArray();
Object[] clusterNames = sortedByQueueLength.keySet().toArray();

smallestQueueLn = (Integer) queueLengths[0];
maxQueueLn = (Integer) queueLengths[queueLengths.length - 1];
lastButOneQueueLn = smallestQueueLn;
if (queueLengths.length > 2) {
lastButOneQueueLn = (Integer) queueLengths[queueLengths.length - 2];
}

/**
* All wts are assigned as a fraction of maxQueueLn. Cluster with maxQueueLn should be
* given least weightage. What this value should be depends on the queueing on the rest of
* the clusters. since the list is sorted, the smallestQueueLn & lastButOneQueueLn give us
* the range. In an ideal situation all clusters should have equals distribution of
* queries, hence that is used as a threshold to check is a cluster queue is over
* provisioned or not.
*/

if (maxQueueLn == 0) {
calculatedWtMaxQueue = MAX_WT;
} else if (lastButOneQueueLn == 0) {
calculatedWtMaxQueue = MIN_WT;
} else {
float fractionOfLastWt =
(lastButOneQueueLn / (float) maxQueueLn)
* (MAX_WT - (lastButOneQueueLn * MAX_WT / maxQueueLn));

if (smallestQueueLn == 0) {
if (lastButOneQueueLn < equalDistribution) {
calculatedWtMaxQueue = MIN_WT;
} else {
calculatedWtMaxQueue = (int) fractionOfLastWt;
}
} else if (smallestQueueLn < equalDistribution) {
if (lastButOneQueueLn <= equalDistribution) {
calculatedWtMaxQueue = (fractionOfLastWt / 2 > 0)
? (int) fractionOfLastWt / 2 : MIN_WT;
} else {
calculatedWtMaxQueue = (fractionOfLastWt / 1.5 > 0)
? (int) (fractionOfLastWt / 1.5) : MIN_WT;
}
} else {
calculatedWtMaxQueue = (int) fractionOfLastWt;
}
}
calculatedWtMaxQueue = getWeightForMaxQueueCluster(sortedByQueueLength);

for (int i = 0; i < numBuckets - 1; i++) {
// If all clusters have same queue length, assign same wt
Expand Down Expand Up @@ -180,9 +178,6 @@ private void computeWeightsBasedOnQueueLength(ConcurrentHashMap<String,
* Newly added backends are handled through
* {@link PrestoQueueLengthRoutingTable#updateRoutingTable(Map)}
* updateRoutingTable}
*
* @param routingGroup
* @param backends
*/
public void updateRoutingTable(String routingGroup, Set<String> backends) {

Expand All @@ -208,8 +203,6 @@ public void updateRoutingTable(String routingGroup, Set<String> backends) {

/**
* Update routing Table with new Queue Lengths.
*
* @param updatedQueueLengthMap
*/
public void updateRoutingTable(Map<String, Map<String, Integer>> updatedQueueLengthMap) {
synchronized (lockObject) {
Expand All @@ -228,9 +221,6 @@ public void updateRoutingTable(Map<String, Map<String, Integer>> updatedQueueLen

/**
* A convenience method to peak into the weights used by the routing Manager.
*
* @param routingGroup
* @return
*/
public Map<String, Integer> getInternalWeightedRoutingTable(String routingGroup) {
if (!weightedDistributionRouting.containsKey(routingGroup)) {
Expand All @@ -246,9 +236,6 @@ public Map<String, Integer> getInternalWeightedRoutingTable(String routingGroup)

/**
* A convienience method to get a peak into the state of the routing manager.
*
* @param routingGroup
* @return
*/
public Map<String, Integer> getInternalClusterQueueLength(String routingGroup) {
if (!clusterQueueLengthMap.containsKey(routingGroup)) {
Expand All @@ -260,9 +247,6 @@ public Map<String, Integer> getInternalClusterQueueLength(String routingGroup) {

/**
* Looks up the closest weight to random number generated for a given routing group.
*
* @param routingGroup
* @return
*/
public String getEligibleBackEnd(String routingGroup) {
if (routingGroupWeightSum.containsKey(routingGroup)
Expand All @@ -277,8 +261,6 @@ public String getEligibleBackEnd(String routingGroup) {
/**
* Performs routing to a given cluster group. This falls back to an adhoc backend, if no scheduled
* backend is found.
*
* @return
*/
@Override
public String provideBackendForRoutingGroup(String routingGroup) {
Expand All @@ -305,11 +287,9 @@ public String provideBackendForRoutingGroup(String routingGroup) {


/**
* Performs routing to an adhoc backend based compute weights base don cluster queue depth.
* Performs routing to an adhoc backend based on computed weights.
*
* <p>d.
*
* @return
*/
public String provideAdhocBackend() {
Map<String, String> proxyMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,24 @@ private void registerBackEndsWithRandomQueueLength(String groupName, int numBack
routingTable.updateRoutingTable(clusterQueueMap);
}

private void registerBackEndsWithRandomQueueLengths(String groupName, int numBackends) {
int mockQueueLength = 0;
String backend;
Random rand = new Random();
Map<String, Integer> queueLenghts = new HashMap<>();

for (int i = 0; i < numBackends; i++) {
backend = groupName + i;
backendManager.activateBackend(backend);
queueLenghts.put(backend, mockQueueLength += rand.nextInt(100));
}

clusterQueueMap.put(groupName, queueLenghts);
routingTable.updateRoutingTable(clusterQueueMap);
}

private void registerBackEnds(String groupName, int numBackends,
int queueLengthDistributiveFactor) {
int queueLengthDistributiveFactor) {
int mockQueueLength = 0;
String backend;

Expand Down Expand Up @@ -166,19 +182,20 @@ public void testRoutingWithEvenWeightDistribution() {

@Test
public void testRoutingWithSkewedWeightDistribution() {
int queueDistribution = 30;
for (int numRequests : QUERY_VOLUMES) {
for (int numBk = 1; numBk <= NUM_BACKENDS; numBk++) {

resetBackends(mockRoutingGroup, numBk, queueDistribution);
deactiveAllBackends();
registerBackEndsWithRandomQueueLengths(mockRoutingGroup, numBk);

Map<String, Integer> routingDistribution = routeQueries(mockRoutingGroup, numRequests);

// Useful Debugging Info
//System.out.println("Input :" + clusterQueueMap.toString() + " Num of Requests:"
// + numRequests
// + " Internal Routing table: "
// + routingTable.getInternalWeightedRoutingTable(mockRoutingGroup).toString()
// + " Distribution: " + routingDistribution.toString());
System.out.println("Input :" + clusterQueueMap.toString() + " Num of Requests:"
+ numRequests
+ " Internal Routing table: "
+ routingTable.getInternalWeightedRoutingTable(mockRoutingGroup).toString()
+ " Distribution: " + routingDistribution.toString());
if (numBk > 1) {
if (routingDistribution.containsKey(mockRoutingGroup + (numBk - 1))) {
assert routingDistribution.get(mockRoutingGroup + (numBk - 1))
Expand All @@ -205,15 +222,16 @@ public void testRoutingWithEqualWeightDistribution() {
Map<String, Integer> routingDistribution = routeQueries(mockRoutingGroup, numRequests);

//Useful Debugging Info
//System.out.println("Input :" + clusterQueueMap.toString() + " Num of Requests:" +
//numRequests
//+ " Internal Routing table: " + routingTable.getInternalWeightedRoutingTable
//(mockRoutingGroup).toString()
//+ " Distribution: " + routingDistribution.toString());
System.out.println("Input :" + clusterQueueMap.toString() + " Num of Requests:" +
numRequests
+ " Internal Routing table: " + routingTable.getInternalWeightedRoutingTable
(mockRoutingGroup).toString()
+ " Distribution: " + routingDistribution.toString());

if (numBk > 1) {
// With equal weights, the algorithm randomly chooses from the list.
assert routingDistribution.size() == clusterQueueMap.get(mockRoutingGroup).size();
// With equal weights, the algorithm randomly chooses from the list. Check that the
// distribution spans atleast half of the routing group.
assert routingDistribution.size() >= clusterQueueMap.get(mockRoutingGroup).size()/2;
} else {
assert routingDistribution.get(mockRoutingGroup + '0') == numRequests;
}
Expand All @@ -224,19 +242,19 @@ public void testRoutingWithEqualWeightDistribution() {
@Test
public void testRoutingWithMultipleGroups() {
int queueDistribution = 10;
int numRequests = 50;
int numRequests = 15;
int numBk = 3;

for (String grp : mockRoutingGroups) {
resetBackends(grp, numBk, queueDistribution);
Map<String, Integer> routingDistribution = routeQueries(grp, numRequests);

// Useful for debugging
//System.out.println("Input :" + clusterQueueMap.toString() + " Num of Requests:" +
//numRequests
//+ " Internal Routing table: " + routingTable.getInternalWeightedRoutingTable
//(mockRoutingGroup).toString()
//+ " Distribution: " + routingDistribution.toString());
System.out.println("Input :" + clusterQueueMap.toString() + " Num of Requests:" +
numRequests
+ " Internal Routing table: " + routingTable.getInternalWeightedRoutingTable
(grp).toString()
+ " Distribution: " + routingDistribution.toString());
if (numBk > 1) {
if (routingDistribution.containsKey(grp + (numBk - 1))) {
assert routingDistribution.get(grp + (numBk - 1))
Expand Down

0 comments on commit 6b6d6eb

Please sign in to comment.