Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of jump consistent hashing for selecting servers #110

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions mtop-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,75 @@ use tokio::sync::RwLock;
// - https://stackoverflow.com/questions/69841546/consistent-hashing-why-are-vnodes-a-thing
// - https://medium.com/@panchr/dynamic-replication-in-memcached-8939c6f81e7f

/// Logic for picking a server to "own" a particular cache key that uses
/// jump consistent hashing.
///
/// See https://arxiv.org/pdf/1406.2294.pdf
#[derive(Debug)]
pub struct SelectorJump {
servers: RwLock<Vec<Server>>,
}

impl SelectorJump {
/// Create a new instance with the provided initial server list
pub fn new(mut servers: Vec<Server>) -> Self {
// Jump hash requires that the list of servers is always in the same order
servers.sort();
Self {
servers: RwLock::new(servers),
}
}

/// Implementation of the jump consistent hash.
///
/// Adapted from https://arxiv.org/pdf/1406.2294.pdf
fn jump_hash(mut key: u64, buckets: usize) -> usize {
assert!(buckets > 0);
let mut b: u64 = 0;
let mut j: u64 = 0;

while j < buckets as u64 {
b = j;
key = key.wrapping_mul(2862933555777941757) + 1;
j = ((b + 1) as f64 * ((1_u64 << 31) as f64 / ((key >> 33) + 1) as f64)) as u64;
}

b as usize
}

/// Get a copy of all current servers.
pub async fn servers(&self) -> Vec<Server> {
let servers = self.servers.read().await;
servers.clone()
}

/// Get the `Server` that owns the given key, or none if there are no servers.
pub async fn server(&self, key: &Key) -> Option<Server> {
let servers = self.servers.read().await;
if servers.is_empty() {
None
} else if servers.len() == 1 {
servers.first().cloned()
} else {
let mut hasher = DefaultHasher::new();
hasher.write(key.as_ref().as_bytes());
let hash = hasher.finish();

let idx = Self::jump_hash(hash, servers.len());
servers.get(idx).cloned()
}
}

/// Update the list of potential servers to pick from.
pub async fn set_servers(&self, mut servers: Vec<Server>) {
// Jump hash requires that the list of servers is always in the same order
servers.sort();

let mut current = self.servers.write().await;
*current = servers
}
}

/// Logic for picking a server to "own" a particular cache key that uses
/// rendezvous hashing.
///
Expand Down
Loading