Skip to content

Commit

Permalink
Add election APIs. (#10)
Browse files Browse the repository at this point in the history
Add election APIs.
  • Loading branch information
h00448672 authored Jun 14, 2020
1 parent 692cd8a commit 4b8076e
Show file tree
Hide file tree
Showing 7 changed files with 851 additions and 7 deletions.
55 changes: 55 additions & 0 deletions examples/election.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//! Election example
use etcd_client::*;

#[tokio::main]
async fn main() -> Result<(), Error> {
let mut client = Client::connect(["localhost:2379"], None).await?;
let resp = client.lease_grant(10, None).await?;
let lease_id = resp.id();
println!("grant ttl:{:?}, id:{:?}", resp.ttl(), resp.id());

// campaign
let resp = client.campaign("myElection", "123", lease_id).await?;
let leader = resp.leader().unwrap();
println!(
"election name:{:?}, leaseId:{:?}",
leader.name_str(),
leader.lease()
);

// proclaim
let resp = client
.proclaim(
"123",
Some(ProclaimOptions::new().with_leader(leader.clone())),
)
.await?;
let header = resp.header();
println!("proclaim header {:?}", header.unwrap());

// observe
let mut msg = client.observe(leader.name()).await?;
loop {
if let Some(resp) = msg.message().await? {
println!("observe key {:?}", resp.kv().unwrap().key_str());
if resp.kv().is_some() {
break;
}
}
}

// leader
let resp = client.leader("myElection").await?;
let kv = resp.kv().unwrap();
println!("key is {:?}", kv.key_str());
println!("value is {:?}", kv.value_str());

// resign
let resign_option = ResignOptions::new().with_leader(leader.clone());
let resp = client.resign(Some(resign_option)).await?;
let header = resp.header();
println!("resign header {:?}", header.unwrap());

Ok(())
}
25 changes: 25 additions & 0 deletions examples/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,30 @@ async fn main() -> Result<(), Error> {
}
}

// Mover leader
let resp = client.member_list().await?;
let member_list = resp.members();

let resp = client.status().await?;
let leader_id = resp.leader();
println!("status {:?}, leader_id {:?}", resp, resp.leader());

let mut member_id = leader_id;
for member in member_list {
if member.id() != leader_id {
member_id = member.id();
println!("member_id {:?}, name is {:?}", member.id(), member.name());
break;
}
}

let resp = client.move_leader(member_id).await?;
let header = resp.header();
if member_id == leader_id {
assert!(header.is_none());
} else {
println!("move_leader header {:?}", header);
}

Ok(())
}
139 changes: 136 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ use crate::rpc::cluster::{
ClusterClient, MemberAddOptions, MemberAddResponse, MemberListResponse, MemberPromoteResponse,
MemberRemoveResponse, MemberUpdateResponse,
};
use crate::rpc::election::{
CampaignResponse, ElectionClient, LeaderResponse, ObserveStream, ProclaimOptions,
ProclaimResponse, ResignOptions, ResignResponse,
};
use crate::rpc::kv::{
CompactionOptions, CompactionResponse, DeleteOptions, DeleteResponse, GetOptions, GetResponse,
KvClient, PutOptions, PutResponse, Txn, TxnResponse,
Expand All @@ -24,7 +28,7 @@ use crate::rpc::lease::{
use crate::rpc::lock::{LockClient, LockOptions, LockResponse, UnlockResponse};
use crate::rpc::maintenance::{
AlarmAction, AlarmOptions, AlarmResponse, AlarmType, DefragmentResponse, HashKvResponse,
HashResponse, MaintenanceClient, SnapshotStreaming, StatusResponse,
HashResponse, MaintenanceClient, MoveLeaderResponse, SnapshotStreaming, StatusResponse,
};
use crate::rpc::watch::{WatchClient, WatchOptions, WatchStream, Watcher};
use tonic::metadata::{Ascii, MetadataValue};
Expand All @@ -42,6 +46,7 @@ pub struct Client {
auth: AuthClient,
maintenance: MaintenanceClient,
cluster: ClusterClient,
election: ElectionClient,
}

impl Client {
Expand Down Expand Up @@ -96,7 +101,8 @@ impl Client {
let lock = LockClient::new(channel.clone(), interceptor.clone());
let auth = AuthClient::new(channel.clone(), interceptor.clone());
let cluster = ClusterClient::new(channel.clone(), interceptor.clone());
let maintenance = MaintenanceClient::new(channel, interceptor);
let maintenance = MaintenanceClient::new(channel.clone(), interceptor.clone());
let election = ElectionClient::new(channel, interceptor);

Ok(Self {
kv,
Expand All @@ -106,6 +112,7 @@ impl Client {
auth,
maintenance,
cluster,
election,
})
}

Expand Down Expand Up @@ -453,6 +460,54 @@ impl Client {
pub async fn member_list(&mut self) -> Result<MemberListResponse> {
self.cluster.member_list().await
}

/// Moves the current leader node to target node.
#[inline]
pub async fn move_leader(&mut self, target_id: u64) -> Result<MoveLeaderResponse> {
self.maintenance.move_leader(target_id).await
}

/// Puts a value as eligible for the election on the prefix key.
/// Multiple sessions can participate in the election for the
/// same prefix, but only one can be the leader at a time.
#[inline]
pub async fn campaign(
&mut self,
name: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
lease: i64,
) -> Result<CampaignResponse> {
self.election.campaign(name, value, lease).await
}

/// Lets the leader announce a new value without another election.
#[inline]
pub async fn proclaim(
&mut self,
value: impl Into<Vec<u8>>,
options: Option<ProclaimOptions>,
) -> Result<ProclaimResponse> {
self.election.proclaim(value, options).await
}

/// Returns the leader value for the current election.
#[inline]
pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
self.election.leader(name).await
}

/// Returns a channel that reliably observes ordered leader proposals
/// as GetResponse values on every current elected leader key.
#[inline]
pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
self.election.observe(name).await
}

/// Releases election leadership and then start a new election
#[inline]
pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
self.election.resign(option).await
}
}

/// Options for `Connect` operation.
Expand Down Expand Up @@ -1163,8 +1218,86 @@ mod tests {
assert!(members.contains(&id1));
assert!(members.contains(&id2));
assert!(members.contains(&id3));
Ok(())
}

#[tokio::test]
async fn test_move_leader() -> Result<()> {
let mut client = get_client().await?;
let resp = client.member_list().await?;
let member_list = resp.members();

let resp = client.status().await?;
let leader_id = resp.leader();
println!("status {:?}, leader_id {:?}", resp, resp.leader());

let mut member_id = leader_id;
for member in member_list {
println!("member_id {:?}, name is {:?}", member.id(), member.name());
if member.id() != leader_id {
member_id = member.id();
break;
}
}

let resp = client.move_leader(member_id).await?;
let header = resp.header();
if member_id == leader_id {
assert!(header.is_none());
} else {
assert!(header.is_some());
}

Ok(())
}

#[tokio::test]
async fn test_election() -> Result<()> {
let mut client = get_client().await?;
let resp = client.lease_grant(10, None).await?;
let lease_id = resp.id();
assert_eq!(resp.ttl(), 10);

let resp = client.campaign("myElection", "123", lease_id).await?;
let leader = resp.leader().unwrap();
assert_eq!(leader.name(), b"myElection");
assert_eq!(leader.lease(), lease_id);

let resp = client
.proclaim(
"123",
Some(ProclaimOptions::new().with_leader(leader.clone())),
)
.await?;
let header = resp.header();
println!("proclaim header {:?}", header.unwrap());
assert!(header.is_some());

let mut msg = client.observe(leader.name()).await?;
loop {
if let Some(resp) = msg.message().await? {
assert!(resp.kv().is_some());
println!("observe key {:?}", resp.kv().unwrap().key_str());
if resp.kv().is_some() {
break;
}
}
}

let resp = client.leader("myElection").await?;
let kv = resp.kv().unwrap();
assert_eq!(kv.value(), b"123");
assert_eq!(kv.key(), leader.key());
println!("key is {:?}", kv.key_str());
println!("value is {:?}", kv.value_str());

let resign_option = ResignOptions::new().with_leader(leader.clone());

let resp = client.resign(Some(resign_option)).await?;
let header = resp.header();
println!("resign header {:?}", header.unwrap());
assert!(header.is_some());

client.member_remove(id1).await?;
Ok(())
}
}
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub enum Error {
/// Lease error
LeaseKeepAliveError(String),

/// Election error
ElectError(String),

/// Invalid metadata value
InvalidMetadataValue(tonic::metadata::errors::InvalidMetadataValue),
}
Expand All @@ -48,6 +51,7 @@ impl Display for Error {
Error::WatchError(e) => write!(f, "watch error: {}", e),
Error::Utf8Error(e) => write!(f, "utf8 error: {}", e),
Error::LeaseKeepAliveError(e) => write!(f, "lease keep alive error: {}", e),
Error::ElectError(e) => write!(f, "election error: {}", e),
Error::InvalidMetadataValue(e) => write!(f, "invalid metadata value: {}", e),
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ pub use crate::rpc::cluster::{
Member, MemberAddOptions, MemberAddResponse, MemberListResponse, MemberPromoteResponse,
MemberRemoveResponse, MemberUpdateResponse,
};
pub use crate::rpc::election::{
CampaignResponse, LeaderKey, LeaderResponse, ObserveStream, ProclaimOptions, ProclaimResponse,
ResignOptions, ResignResponse,
};
pub use crate::rpc::kv::{
CompactionOptions, CompactionResponse, Compare, CompareOp, DeleteOptions, DeleteResponse,
GetOptions, GetResponse, PutOptions, PutResponse, SortOrder, SortTarget, Txn, TxnOp,
Expand All @@ -30,7 +34,8 @@ pub use crate::rpc::lease::{
pub use crate::rpc::lock::{LockOptions, LockResponse, UnlockResponse};
pub use crate::rpc::maintenance::{
AlarmAction, AlarmMember, AlarmOptions, AlarmResponse, AlarmType, DefragmentResponse,
HashKvResponse, HashResponse, SnapshotResponse, SnapshotStreaming, StatusResponse,
HashKvResponse, HashResponse, MoveLeaderResponse, SnapshotResponse, SnapshotStreaming,
StatusResponse,
};
pub use crate::rpc::watch::{
Event, EventType, WatchFilterType, WatchOptions, WatchResponse, WatchStream, Watcher,
Expand Down
Loading

0 comments on commit 4b8076e

Please sign in to comment.