Skip to content

Commit

Permalink
Merge pull request #38 from sjudd/sam/10-31-Add_support_for_deregiste…
Browse files Browse the repository at this point in the history
…r_service

Add support for deregister entity
  • Loading branch information
kushudai authored Nov 18, 2023
2 parents 65f7625 + 12bb8f2 commit 9003898
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 38 deletions.
124 changes: 86 additions & 38 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ const CREATE_OR_UPDATE_KEY_SYNC_METHOD_NAME: &str = "create_or_update_key_sync";
const DELETE_KEY_METHOD_NAME: &str = "delete_key";
const GET_LOCK_METHOD_NAME: &str = "get_lock";
const REGISTER_ENTITY_METHOD_NAME: &str = "register_entity";
const DEREGISTER_ENTITY_METHOD_NAME: &str = "deregister_entity";
const GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME: &str = "get_all_registered_service_names";
const GET_SERVICE_NODES_METHOD_NAME: &str = "get_service_nodes";
const GET_SESSION_METHOD_NAME: &str = "get_session";
Expand Down Expand Up @@ -491,6 +492,26 @@ impl Consul {
Ok(())
}

/// Removes entries from consul's global catalog.
/// See https://www.consul.io/api-docs/catalog#deregister-entity for more information.
/// # Arguments:
/// - payload: The [`DeregisterEntityPayload`](DeregisterEntityPayload) to provide the register entity API.
/// # Errors:
/// [ConsulError](consul::ConsulError) describes all possible errors returned by this api.
pub async fn deregister_entity(&self, payload: &DeregisterEntityPayload) -> Result<()> {
let uri = format!("{}/v1/catalog/deregister", self.config.address);
let request = hyper::Request::builder().method(Method::PUT).uri(uri);
let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?;
self.execute_request(
request,
payload.into(),
Some(Duration::from_secs(5)),
DEREGISTER_ENTITY_METHOD_NAME,
)
.await?;
Ok(())
}

/// Returns all services currently registered with consul.
/// See https://www.consul.io/api-docs/catalog#list-services for more information.
/// # Arguments:
Expand Down Expand Up @@ -900,51 +921,32 @@ mod tests {
let consul = get_client();

let new_service_name = "test-service-44".to_string();
register_entity(&consul, &new_service_name, "local").await;

// verify a service by this name is currently not registered
let ResponseMeta {
response: service_names_before_register,
..
} = consul
.get_all_registered_service_names(None)
.await
.expect("expected get_registered_service_names request to succeed");
assert!(!service_names_before_register.contains(&new_service_name));
assert!(is_registered(&consul, &new_service_name).await);
}

// register a new service
let payload = RegisterEntityPayload {
ID: None,
Node: "local".to_string(),
Address: "127.0.0.1".to_string(),
#[tokio::test(flavor = "multi_thread")]
async fn test_deregister_and_retrieve_services() {
let consul = get_client();

let new_service_name = "test-service-45".to_string();
let node_id = "local";
register_entity(&consul, &new_service_name, node_id).await;

let payload = DeregisterEntityPayload {
Node: Some(node_id.to_string()),
Datacenter: None,
TaggedAddresses: Default::default(),
NodeMeta: Default::default(),
Service: Some(RegisterEntityService {
ID: None,
Service: new_service_name.clone(),
Tags: vec![],
TaggedAddresses: Default::default(),
Meta: Default::default(),
Port: Some(42424),
Namespace: None,
}),
Check: None,
SkipNodeUpdate: None,
CheckID: None,
ServiceID: None,
Namespace: None,
};
consul
.register_entity(&payload)
.deregister_entity(&payload)
.await
.expect("expected register_entity request to succeed");
.expect("expected deregister_entity request to succeed");

// verify the newly registered service is retrieved
let ResponseMeta {
response: service_names_after_register,
..
} = consul
.get_all_registered_service_names(None)
.await
.expect("expected get_registered_service_names request to succeed");
assert!(service_names_after_register.contains(&new_service_name));
assert!(!is_registered(&consul, &new_service_name).await);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
Expand Down Expand Up @@ -1210,6 +1212,52 @@ mod tests {
assert_ne!(mod_idx3, mod_idx4);
}

async fn register_entity(consul: &Consul, service_name: &String, node_id: &str) {
let ResponseMeta {
response: service_names_before_register,
..
} = consul
.get_all_registered_service_names(None)
.await
.expect("expected get_registered_service_names request to succeed");
assert!(!service_names_before_register.contains(service_name));

let payload = RegisterEntityPayload {
ID: None,
Node: node_id.to_string(),
Address: "127.0.0.1".to_string(),
Datacenter: None,
TaggedAddresses: Default::default(),
NodeMeta: Default::default(),
Service: Some(RegisterEntityService {
ID: None,
Service: service_name.clone(),
Tags: vec![],
TaggedAddresses: Default::default(),
Meta: Default::default(),
Port: Some(42424),
Namespace: None,
}),
Check: None,
SkipNodeUpdate: None,
};
consul
.register_entity(&payload)
.await
.expect("expected register_entity request to succeed");
}

async fn is_registered(consul: &Consul, service_name: &String) -> bool {
let ResponseMeta {
response: service_names_after_register,
..
} = consul
.get_all_registered_service_names(None)
.await
.expect("expected get_registered_service_names request to succeed");
service_names_after_register.contains(service_name)
}

fn get_client() -> Consul {
let conf: Config = Config::from_env();
Consul::new(conf)
Expand Down
22 changes: 22 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,28 @@ pub struct RegisterEntityPayload {
pub SkipNodeUpdate: Option<bool>,
}

/// The service to deregister with consul's global catalog.
/// See https://www.consul.io/api/agent/service for more information.
#[allow(non_snake_case)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DeregisterEntityPayload {
/// The node to execute the check on.
#[serde(skip_serializing_if = "Option::is_none")]
pub Node: Option<String>,
/// The datacenter to register in, defaults to the agent's datacenter.
#[serde(skip_serializing_if = "Option::is_none")]
pub Datacenter: Option<String>,
/// Specifies the ID of the check to remove.
#[serde(skip_serializing_if = "Option::is_none")]
pub CheckID: Option<String>,
/// Specifies the ID of the service to remove. The service and all associated checks will be removed.
#[serde(skip_serializing_if = "Option::is_none")]
pub ServiceID: Option<String>,
/// Specifies the namespace of the service and checks you deregister.
#[serde(skip_serializing_if = "Option::is_none")]
pub Namespace: Option<String>,
}

/// The service to register with consul's global catalog.
/// See https://www.consul.io/api/agent/service for more information.
#[allow(non_snake_case)]
Expand Down

0 comments on commit 9003898

Please sign in to comment.