diff --git a/agent/src/plugin_manager/v1.rs b/agent/src/plugin_manager/v1.rs index 7f3183f3f..f016fb20b 100644 --- a/agent/src/plugin_manager/v1.rs +++ b/agent/src/plugin_manager/v1.rs @@ -87,8 +87,8 @@ pub struct NumaNode { /// Generated client implementations. pub mod pod_resources_lister_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; /// PodResourcesLister is a service provided by the kubelet that provides information about the /// node resources consumed by pods and containers on the node #[derive(Debug, Clone)] @@ -134,8 +134,9 @@ pub mod pod_resources_lister_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { PodResourcesListerClient::new(InterceptedService::new(inner, interceptor)) } @@ -173,16 +174,23 @@ pub mod pod_resources_lister_client { pub async fn list( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/v1.PodResourcesLister/List"); + let path = http::uri::PathAndQuery::from_static( + "/v1.PodResourcesLister/List", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1.PodResourcesLister", "List")); @@ -191,23 +199,28 @@ pub mod pod_resources_lister_client { pub async fn get_allocatable_resources( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/v1.PodResourcesLister/GetAllocatableResources", ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "v1.PodResourcesLister", - "GetAllocatableResources", - )); + req.extensions_mut() + .insert( + GrpcMethod::new("v1.PodResourcesLister", "GetAllocatableResources"), + ); self.inner.unary(req, path, codec).await } } @@ -222,11 +235,17 @@ pub mod pod_resources_lister_server { async fn list( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn get_allocatable_resources( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// PodResourcesLister is a service provided by the kubelet that provides information about the /// node resources consumed by pods and containers on the node @@ -253,7 +272,10 @@ pub mod pod_resources_lister_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -309,11 +331,15 @@ pub mod pod_resources_lister_server { "/v1.PodResourcesLister/List" => { #[allow(non_camel_case_types)] struct ListSvc(pub Arc); - impl - tonic::server::UnaryService for ListSvc - { + impl< + T: PodResourcesLister, + > tonic::server::UnaryService + for ListSvc { type Response = super::ListPodResourcesResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -351,12 +377,15 @@ pub mod pod_resources_lister_server { "/v1.PodResourcesLister/GetAllocatableResources" => { #[allow(non_camel_case_types)] struct GetAllocatableResourcesSvc(pub Arc); - impl - tonic::server::UnaryService - for GetAllocatableResourcesSvc - { + impl< + T: PodResourcesLister, + > tonic::server::UnaryService + for GetAllocatableResourcesSvc { type Response = super::AllocatableResourcesResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -364,9 +393,10 @@ pub mod pod_resources_lister_server { let inner = Arc::clone(&self.0); let fut = async move { ::get_allocatable_resources( - &inner, request, - ) - .await + &inner, + request, + ) + .await }; Box::pin(fut) } @@ -394,14 +424,18 @@ pub mod pod_resources_lister_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } @@ -427,7 +461,8 @@ pub mod pod_resources_lister_server { write!(f, "{:?}", self.0) } } - impl tonic::server::NamedService for PodResourcesListerServer { + impl tonic::server::NamedService + for PodResourcesListerServer { const NAME: &'static str = "v1.PodResourcesLister"; } } diff --git a/agent/src/plugin_manager/v1beta1.rs b/agent/src/plugin_manager/v1beta1.rs index a88570f4d..7b068ecab 100644 --- a/agent/src/plugin_manager/v1beta1.rs +++ b/agent/src/plugin_manager/v1beta1.rs @@ -96,7 +96,9 @@ pub struct PreStartContainerResponse {} #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreferredAllocationRequest { #[prost(message, repeated, tag = "1")] - pub container_requests: ::prost::alloc::vec::Vec, + pub container_requests: ::prost::alloc::vec::Vec< + ContainerPreferredAllocationRequest, + >, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -106,7 +108,9 @@ pub struct ContainerPreferredAllocationRequest { pub available_device_i_ds: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// List of deviceIDs that must be included in the preferred allocation #[prost(string, repeated, tag = "2")] - pub must_include_device_i_ds: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + pub must_include_device_i_ds: ::prost::alloc::vec::Vec< + ::prost::alloc::string::String, + >, /// Number of devices to include in the preferred allocation #[prost(int32, tag = "3")] pub allocation_size: i32, @@ -117,7 +121,9 @@ pub struct ContainerPreferredAllocationRequest { #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreferredAllocationResponse { #[prost(message, repeated, tag = "1")] - pub container_responses: ::prost::alloc::vec::Vec, + pub container_responses: ::prost::alloc::vec::Vec< + ContainerPreferredAllocationResponse, + >, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -162,8 +168,10 @@ pub struct AllocateResponse { pub struct ContainerAllocateResponse { /// List of environment variable to be set in the container to access one of more devices. #[prost(map = "string, string", tag = "1")] - pub envs: - ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, + pub envs: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, /// Mounts for the container. #[prost(message, repeated, tag = "2")] pub mounts: ::prost::alloc::vec::Vec, @@ -172,8 +180,10 @@ pub struct ContainerAllocateResponse { pub devices: ::prost::alloc::vec::Vec, /// Container annotations to pass to the container runtime #[prost(map = "string, string", tag = "4")] - pub annotations: - ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, + pub annotations: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, } /// Mount specifies a host volume to mount into a container. /// where device library or tools are installed on host and container @@ -210,8 +220,8 @@ pub struct DeviceSpec { /// Generated client implementations. pub mod registration_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; /// Registration is the service advertised by the Kubelet /// Only when Kubelet answers with a success code to a Register Request /// may Device Plugins start their service @@ -261,8 +271,9 @@ pub mod registration_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { RegistrationClient::new(InterceptedService::new(inner, interceptor)) } @@ -301,14 +312,19 @@ pub mod registration_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/v1beta1.Registration/Register"); + let path = http::uri::PathAndQuery::from_static( + "/v1beta1.Registration/Register", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1beta1.Registration", "Register")); @@ -319,8 +335,8 @@ pub mod registration_client { /// Generated client implementations. pub mod device_plugin_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; /// DevicePlugin is the service advertised by Device Plugins #[derive(Debug, Clone)] pub struct DevicePluginClient { @@ -365,8 +381,9 @@ pub mod device_plugin_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { DevicePluginClient::new(InterceptedService::new(inner, interceptor)) } @@ -406,23 +423,28 @@ pub mod device_plugin_client { pub async fn get_device_plugin_options( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/v1beta1.DevicePlugin/GetDevicePluginOptions", ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "v1beta1.DevicePlugin", - "GetDevicePluginOptions", - )); + req.extensions_mut() + .insert( + GrpcMethod::new("v1beta1.DevicePlugin", "GetDevicePluginOptions"), + ); self.inner.unary(req, path, codec).await } /// ListAndWatch returns a stream of List of Devices @@ -435,14 +457,19 @@ pub mod device_plugin_client { tonic::Response>, tonic::Status, > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/ListAndWatch"); + let path = http::uri::PathAndQuery::from_static( + "/v1beta1.DevicePlugin/ListAndWatch", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1beta1.DevicePlugin", "ListAndWatch")); @@ -456,23 +483,28 @@ pub mod device_plugin_client { pub async fn get_preferred_allocation( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/v1beta1.DevicePlugin/GetPreferredAllocation", ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "v1beta1.DevicePlugin", - "GetPreferredAllocation", - )); + req.extensions_mut() + .insert( + GrpcMethod::new("v1beta1.DevicePlugin", "GetPreferredAllocation"), + ); self.inner.unary(req, path, codec).await } /// Allocate is called during container creation so that the Device @@ -481,15 +513,23 @@ pub mod device_plugin_client { pub async fn allocate( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/Allocate"); + let path = http::uri::PathAndQuery::from_static( + "/v1beta1.DevicePlugin/Allocate", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1beta1.DevicePlugin", "Allocate")); @@ -501,17 +541,23 @@ pub mod device_plugin_client { pub async fn pre_start_container( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = - http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/PreStartContainer"); + let path = http::uri::PathAndQuery::from_static( + "/v1beta1.DevicePlugin/PreStartContainer", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1beta1.DevicePlugin", "PreStartContainer")); @@ -560,7 +606,10 @@ pub mod registration_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -616,16 +665,23 @@ pub mod registration_server { "/v1beta1.Registration/Register" => { #[allow(non_camel_case_types)] struct RegisterSvc(pub Arc); - impl tonic::server::UnaryService for RegisterSvc { + impl< + T: Registration, + > tonic::server::UnaryService + for RegisterSvc { type Response = super::Empty; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = - async move { ::register(&inner, request).await }; + let fut = async move { + ::register(&inner, request).await + }; Box::pin(fut) } } @@ -652,14 +708,18 @@ pub mod registration_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } @@ -701,11 +761,15 @@ pub mod device_plugin_server { async fn get_device_plugin_options( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Server streaming response type for the ListAndWatch method. type ListAndWatchStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, - > + Send + > + + Send + 'static; /// ListAndWatch returns a stream of List of Devices /// Whenever a Device state change or a Device disappears, ListAndWatch @@ -713,7 +777,10 @@ pub mod device_plugin_server { async fn list_and_watch( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// GetPreferredAllocation returns a preferred set of devices to allocate /// from a list of available ones. The resulting preferred allocation is not /// guaranteed to be the allocation ultimately performed by the @@ -722,21 +789,30 @@ pub mod device_plugin_server { async fn get_preferred_allocation( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Allocate is called during container creation so that the Device /// Plugin can run device specific operations and instruct Kubelet /// of the steps to make the Device available in the container async fn allocate( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// PreStartContainer is called, if indicated by Device Plugin during registeration phase, /// before each container start. Device plugin can run device specific operations /// such as resetting the device before making devices available to the container async fn pre_start_container( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// DevicePlugin is the service advertised by Device Plugins #[derive(Debug)] @@ -762,7 +838,10 @@ pub mod device_plugin_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -818,13 +897,23 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/GetDevicePluginOptions" => { #[allow(non_camel_case_types)] struct GetDevicePluginOptionsSvc(pub Arc); - impl tonic::server::UnaryService for GetDevicePluginOptionsSvc { + impl tonic::server::UnaryService + for GetDevicePluginOptionsSvc { type Response = super::DevicePluginOptions; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_device_plugin_options(&inner, request) + ::get_device_plugin_options( + &inner, + request, + ) .await }; Box::pin(fut) @@ -856,12 +945,20 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/ListAndWatch" => { #[allow(non_camel_case_types)] struct ListAndWatchSvc(pub Arc); - impl tonic::server::ServerStreamingService for ListAndWatchSvc { + impl< + T: DevicePlugin, + > tonic::server::ServerStreamingService + for ListAndWatchSvc { type Response = super::ListAndWatchResponse; type ResponseStream = T::ListAndWatchStream; - type Future = - BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { ::list_and_watch(&inner, request).await @@ -895,19 +992,26 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/GetPreferredAllocation" => { #[allow(non_camel_case_types)] struct GetPreferredAllocationSvc(pub Arc); - impl - tonic::server::UnaryService - for GetPreferredAllocationSvc - { + impl< + T: DevicePlugin, + > tonic::server::UnaryService + for GetPreferredAllocationSvc { type Response = super::PreferredAllocationResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_preferred_allocation(&inner, request).await + ::get_preferred_allocation( + &inner, + request, + ) + .await }; Box::pin(fut) } @@ -938,16 +1042,23 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/Allocate" => { #[allow(non_camel_case_types)] struct AllocateSvc(pub Arc); - impl tonic::server::UnaryService for AllocateSvc { + impl< + T: DevicePlugin, + > tonic::server::UnaryService + for AllocateSvc { type Response = super::AllocateResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = - async move { ::allocate(&inner, request).await }; + let fut = async move { + ::allocate(&inner, request).await + }; Box::pin(fut) } } @@ -977,19 +1088,23 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/PreStartContainer" => { #[allow(non_camel_case_types)] struct PreStartContainerSvc(pub Arc); - impl - tonic::server::UnaryService - for PreStartContainerSvc - { + impl< + T: DevicePlugin, + > tonic::server::UnaryService + for PreStartContainerSvc { type Response = super::PreStartContainerResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::pre_start_container(&inner, request).await + ::pre_start_container(&inner, request) + .await }; Box::pin(fut) } @@ -1017,14 +1132,18 @@ pub mod device_plugin_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/controller/src/main.rs b/controller/src/main.rs index 31aab21be..63181d8b7 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -10,7 +10,10 @@ use futures::StreamExt; use kube::runtime::{watcher::Config, Controller}; use prometheus::IntGaugeVec; use std::sync::Arc; -use util::{controller_ctx::ControllerContext, instance_action, node_watcher, pod_watcher}; +use util::{ + context::{InstanceControllerContext, NodeWatcherContext, PodWatcherContext}, + instance_action, node_watcher, pod_watcher, +}; /// Length of time to sleep between controller system validation checks pub const SYSTEM_CHECK_DELAY_SECS: u64 = 30; @@ -40,9 +43,8 @@ async fn main() -> Result<(), Box // Start server for prometheus metrics tokio::spawn(run_metrics_server()); let client = Arc::new(kube::Client::try_default().await?); - let controller_ctx = Arc::new(ControllerContext::new(client.clone())); - let node_watcher_ctx = controller_ctx.clone(); - let pod_watcher_ctx = controller_ctx.clone(); + let node_watcher_ctx = Arc::new(NodeWatcherContext::new(client.clone())); + let pod_watcher_ctx = Arc::new(PodWatcherContext::new(client.clone())); node_watcher::check(client.clone()).await?; let node_controller = Controller::new( @@ -74,7 +76,7 @@ async fn main() -> Result<(), Box tokio::select! { _ = futures::future::join(node_controller, pod_controller) => {}, - _ = instance_action::run(client) => {} + _ = instance_action::run(Arc::new(InstanceControllerContext::new(client))) => {} } log::info!("{} Controller end", API_NAMESPACE); diff --git a/controller/src/util/controller_ctx.rs b/controller/src/util/context.rs similarity index 82% rename from controller/src/util/controller_ctx.rs rename to controller/src/util/context.rs index 8534688c1..b28b7a8e4 100644 --- a/controller/src/util/controller_ctx.rs +++ b/controller/src/util/context.rs @@ -83,19 +83,42 @@ impl< { } -pub struct ControllerContext { +pub struct NodeWatcherContext { /// Kubernetes client pub client: Arc, - pub known_pods: Arc>>, pub known_nodes: Arc>>, } -impl ControllerContext { +impl NodeWatcherContext { pub fn new(client: Arc) -> Self { - ControllerContext { + NodeWatcherContext { client, - known_pods: Arc::new(RwLock::new(HashMap::new())), known_nodes: Arc::new(RwLock::new(HashMap::new())), } } } + +pub struct PodWatcherContext { + /// Kubernetes client + pub client: Arc, + pub known_pods: Arc>>, +} + +impl PodWatcherContext { + pub fn new(client: Arc) -> Self { + PodWatcherContext { + client, + known_pods: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +pub struct InstanceControllerContext(Arc); +impl InstanceControllerContext { + pub fn new(client: Arc) -> Self { + InstanceControllerContext(client) + } + pub fn client(&self) -> Arc { + Arc::clone(&self.0) + } +} diff --git a/controller/src/util/instance_action.rs b/controller/src/util/instance_action.rs index 29d38b735..7c6477b6c 100644 --- a/controller/src/util/instance_action.rs +++ b/controller/src/util/instance_action.rs @@ -1,6 +1,7 @@ -use crate::util::controller_ctx::ControllerKubeClient; -use crate::util::{pod_action::PodAction, pod_action::PodActionInfo}; -use crate::BROKER_POD_COUNT_METRIC; +use super::super::BROKER_POD_COUNT_METRIC; +use super::{pod_action::PodAction, pod_action::PodActionInfo}; +use crate::util::context::{ControllerKubeClient, InstanceControllerContext}; +use crate::util::{ControllerError, Result}; use akri_shared::akri::configuration::Configuration; use akri_shared::k8s::api::Api; use akri_shared::{ @@ -11,13 +12,17 @@ use akri_shared::{ }, }; use anyhow::Context; -use futures::TryStreamExt; +use futures::StreamExt; use k8s_openapi::api::batch::v1::{Job, JobSpec}; use k8s_openapi::api::core::v1::{Pod, PodSpec}; use kube::{ api::{ListParams, ResourceExt}, - runtime::{controller::Action, watcher::watcher, watcher::Config, WatchStreamExt}, + runtime::{ + controller::{Action, Controller}, + finalizer::{finalizer, Event}, + watcher::Config, + }, }; use log::{error, trace}; use std::collections::HashMap; @@ -27,37 +32,76 @@ use std::sync::Arc; pub const PENDING_POD_GRACE_PERIOD_MINUTES: i64 = 5; /// Length of time a Pod can be in an error state before we retry pub const FAILED_POD_GRACE_PERIOD_MINUTES: i64 = 0; -// Identifier for the controller to be set as the field manager for server-side apply -pub const CONTROLLER_FIELD_MANAGER_ID: &str = "akri.sh/controller"; -/// This function is the main Reconcile function for Instance resources -/// This will get called every time an Instance gets added or is changed. -pub async fn run(client: Arc) -> anyhow::Result<()> { - let api: Box> = client.all(); +pub static INSTANCE_FINALIZER: &str = "instances.kube.rs"; + +/// Initialize the instance controller +/// TODO: consider passing state that is shared among controllers such as a metrics exporter +pub async fn run(ctx: Arc) { + let api = ctx.client().all().as_inner(); if let Err(e) = api.list(&ListParams::default().limit(1)).await { error!("Instance CRD is not queryable; {e:?}. Is the CRD installed?"); std::process::exit(1); } + Controller::new(api, Config::default().any_semantic()) + .shutdown_on_signal() + .run(reconcile, error_policy, ctx.clone()) + .filter_map(|x| async move { std::result::Result::ok(x) }) + .for_each(|_| futures::future::ready(())) + .await; +} - // First handle existing instances - let instances = api.list(&ListParams::default()).await?; - for instance in instances { - handle_instance_change(instance, client.clone()).await?; - } +fn error_policy( + _instance: Arc, + error: &ControllerError, + _ctx: Arc, +) -> Action { + log::warn!("reconcile failed: {:?}", error); + Action::requeue(std::time::Duration::from_secs(5 * 60)) +} - watcher(api.as_inner(), Config::default()) - .applied_objects() - .try_for_each(move |instance| { - let client = client.clone(); - async move { - handle_instance_change(instance, client) - .await - .map_err(kube::runtime::watcher::Error::WatchFailed)?; - Ok(()) - } - }) - .await?; - Ok(()) +/// Instance event types +/// +/// Instance actions describe the types of actions the Controller can +/// react to for Instances. +/// +/// This will determine what broker management actions to take (if any) +/// +/// | --> Instance Applied +/// | --> No broker => Do nothing +/// | --> => Deploy a Job if one does not exist +/// | --> => Ensure that each Node on Instance's `nodes` list (up to `capacity` total) have a Pod. +/// Deploy Pods as necessary + +/// This function is the main Reconcile function for Instance resources +/// This will get called every time an Instance gets added or is changed, it will also be called for every existing instance on startup. +pub async fn reconcile( + instance: Arc, + ctx: Arc, +) -> Result { + let ns = instance.namespace().unwrap(); // instance has namespace scope + trace!("Reconciling {} in {}", instance.name_any(), ns); + finalizer( + &ctx.client().all().as_inner(), + INSTANCE_FINALIZER, + instance, + |event| reconcile_inner(event, ctx.client()), + ) + .await + .map_err(|e| ControllerError::FinalizerError(Box::new(e))) +} + +async fn reconcile_inner( + event: Event, + client: Arc, +) -> Result { + match event { + Event::Apply(instance) => handle_instance_change(&instance, client).await, + Event::Cleanup(_) => { + // Do nothing. OwnerReferences are attached to Jobs and Pods to automate cleanup + Ok(default_requeue_action()) + } + } } /// PodContext stores a set of details required to track/create/delete broker @@ -171,7 +215,7 @@ async fn handle_deletion_work( api.delete(&pod_app_name).await?; trace!("handle_deletion_work - pod::remove_pod succeeded"); BROKER_POD_COUNT_METRIC - .with_label_values(&[configuration_name, &context_node_name]) + .with_label_values(&[configuration_name, context_node_name]) .dec(); Ok(()) } @@ -190,7 +234,7 @@ async fn handle_addition_work( ); trace!("handle_addition_work - New pod spec={:?}", pod); - api.apply(pod, CONTROLLER_FIELD_MANAGER_ID).await?; + api.apply(pod, INSTANCE_FINALIZER).await?; trace!("handle_addition_work - pod::create_pod succeeded",); BROKER_POD_COUNT_METRIC .with_label_values(&[configuration_name, new_node]) @@ -201,15 +245,11 @@ async fn handle_addition_work( /// Handle Instance change by /// 1) checking to make sure the Instance's Configuration exists -/// 2) taking the appropriate action depending on the broker type (Pod or Job) if any: -/// | --> No broker => Do nothing -/// | --> => Deploy a Job if one does not exist -/// | --> => Ensure that each Node on Instance's `nodes` list (up to `capacity` total) have a Pod. -/// Deploy Pods as necessary +/// 2) calling the appropriate handler depending on the broker type (Pod or Job) if any pub async fn handle_instance_change( - instance: Instance, + instance: &Instance, client: Arc, -) -> kube::Result { +) -> Result { trace!("handle_instance_change - enter"); let instance_namespace = instance.namespace().unwrap(); let api: Box> = client.namespaced(&instance_namespace); @@ -227,10 +267,10 @@ pub async fn handle_instance_change( return Ok(default_requeue_action()); }; let res = match broker_spec { - BrokerSpec::BrokerPodSpec(p) => handle_instance_change_pod(&instance, p, client).await, + BrokerSpec::BrokerPodSpec(p) => handle_instance_change_pod(instance, p, client).await, BrokerSpec::BrokerJobSpec(j) => { handle_instance_change_job( - &instance, + instance, *configuration.metadata.generation.as_ref().unwrap(), j, client.clone(), @@ -577,9 +617,11 @@ mod handle_instance_tests { instance_file: &'static str, ) { trace!("run_handle_instance_change_test enter"); - let instance_json: String = file::read_file_to_string(instance_file); + let instance_json = file::read_file_to_string(instance_file); let instance: Instance = serde_json::from_str(&instance_json).unwrap(); - handle_instance_change(instance, client).await.unwrap(); + reconcile_inner(Event::Apply(Arc::new(instance)), client) + .await + .unwrap(); trace!("run_handle_instance_change_test exit"); } @@ -724,7 +766,7 @@ mod handle_instance_tests { } /// Checks that the BROKER_POD_COUNT_METRIC is appropriately incremented - /// when an instance is added. Cannot be run in parallel with other tests + /// instance is added and pods are created. Cannot be run in parallel with other tests /// due to the metric being a global variable and modified unpredictably by /// other tests. /// Run with: cargo test -- test_broker_pod_count_metric --ignored @@ -751,5 +793,13 @@ mod handle_instance_tests { }, ); run_handle_instance_change_test(Arc::new(mock), "../test/json/local-instance.json").await; + + // Check that broker pod count metric has been incremented to include new pod for this instance + assert_eq!( + BROKER_POD_COUNT_METRIC + .with_label_values(&["config-a", "node-a"]) + .get(), + 1 + ); } } diff --git a/controller/src/util/mod.rs b/controller/src/util/mod.rs index 95f322b63..6fb37f63d 100644 --- a/controller/src/util/mod.rs +++ b/controller/src/util/mod.rs @@ -1,4 +1,4 @@ -pub(crate) mod controller_ctx; +pub(crate) mod context; pub mod instance_action; pub mod node_watcher; mod pod_action; diff --git a/controller/src/util/node_watcher.rs b/controller/src/util/node_watcher.rs index 474e863b2..af082eaa7 100644 --- a/controller/src/util/node_watcher.rs +++ b/controller/src/util/node_watcher.rs @@ -6,7 +6,7 @@ //! that the Instance.deviceUsage property no longer contains //! slots that are occupied by the node. use crate::util::{ - controller_ctx::{ControllerContext, NodeState}, + context::{NodeState, NodeWatcherContext}, ControllerError, Result, }; use akri_shared::akri::instance::{device_usage::NodeUsage, Instance}; @@ -28,7 +28,7 @@ use log::{info, trace}; use std::str::FromStr; use std::{collections::HashMap, sync::Arc}; -use super::controller_ctx::ControllerKubeClient; +use super::context::ControllerKubeClient; pub static NODE_FINALIZER: &str = "akri-node-watcher.kube.rs"; @@ -43,7 +43,7 @@ pub async fn check(client: Arc) -> anyhow::Result<()> pub fn error_policy( _node: Arc, error: &ControllerError, - _ctx: Arc, + _ctx: Arc, ) -> Action { log::warn!("reconcile failed: {:?}", error); Action::requeue(std::time::Duration::from_secs(5 * 60)) @@ -67,7 +67,7 @@ pub fn error_policy( /// Once a Node moves through the Running state into a non Running /// state, it becomes important to clean Instances referencing the /// non-Running Node. -pub async fn reconcile(node: Arc, ctx: Arc) -> Result { +pub async fn reconcile(node: Arc, ctx: Arc) -> Result { trace!("Reconciling node {}", node.name_any()); finalizer( &ctx.client.clone().all().as_inner(), @@ -80,7 +80,7 @@ pub async fn reconcile(node: Arc, ctx: Arc) -> Result, ctx: Arc) -> Result { +async fn reconcile_inner(event: Event, ctx: Arc) -> Result { match event { Event::Apply(node) => { let node_name = node.name_unchecked(); @@ -114,7 +114,10 @@ async fn reconcile_inner(event: Event, ctx: Arc) -> Res /// This should be called for Nodes that are either !Ready or Deleted. /// This function will clean up any Instances that reference a Node that /// was previously Running. -async fn handle_node_disappearance(node: &Node, ctx: Arc) -> anyhow::Result<()> { +async fn handle_node_disappearance( + node: &Node, + ctx: Arc, +) -> anyhow::Result<()> { let node_name = node.name_unchecked(); trace!( "handle_node_disappearance - enter: {:?}", @@ -284,7 +287,7 @@ mod tests { mock.node .expect_all() .return_once(|| Box::new(MockApi::new())); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); + let ctx = Arc::new(NodeWatcherContext::new(Arc::new(mock))); reconcile_inner(Event::Apply(Arc::new(node)), ctx.clone()) .await .unwrap(); @@ -305,7 +308,7 @@ mod tests { mock.node .expect_all() .return_once(|| Box::new(MockApi::new())); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); + let ctx = Arc::new(NodeWatcherContext::new(Arc::new(mock))); reconcile_inner(Event::Apply(Arc::new(node)), ctx.clone()) .await .unwrap(); @@ -326,7 +329,7 @@ mod tests { mock.node .expect_all() .return_once(|| Box::new(MockApi::new())); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); + let ctx = Arc::new(NodeWatcherContext::new(Arc::new(mock))); ctx.known_nodes .write() .await @@ -373,7 +376,7 @@ mod tests { mock.instance .expect_all() .return_once(move || Box::new(instance_api_mock)); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); + let ctx = Arc::new(NodeWatcherContext::new(Arc::new(mock))); ctx.known_nodes .write() .await @@ -420,7 +423,7 @@ mod tests { mock.instance .expect_all() .return_once(move || Box::new(instance_api_mock)); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); + let ctx = Arc::new(NodeWatcherContext::new(Arc::new(mock))); ctx.known_nodes .write() .await @@ -463,7 +466,7 @@ mod tests { mock.instance .expect_all() .return_once(move || Box::new(instance_api_mock)); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); + let ctx = Arc::new(NodeWatcherContext::new(Arc::new(mock))); reconcile_inner(Event::Cleanup(Arc::new(node)), ctx.clone()) .await .unwrap(); diff --git a/controller/src/util/pod_watcher.rs b/controller/src/util/pod_watcher.rs index 15a21c2a0..3b61018ed 100644 --- a/controller/src/util/pod_watcher.rs +++ b/controller/src/util/pod_watcher.rs @@ -1,5 +1,7 @@ -use crate::util::controller_ctx::{ControllerContext, PodState}; +use crate::util::context::{PodState, PodWatcherContext}; use crate::util::{ControllerError, Result}; +use crate::BROKER_POD_COUNT_METRIC; +use akri_shared::k8s::AKRI_TARGET_NODE_LABEL_NAME; use akri_shared::{ akri::{configuration::Configuration, instance::Instance, API_NAMESPACE}, k8s::{ @@ -65,9 +67,7 @@ fn get_broker_pod_owner_kind(pod: Arc) -> BrokerPodOwnerKind { } } -pub async fn check( - client: Arc, -) -> anyhow::Result<()> { +pub async fn check(client: Arc) -> anyhow::Result<()> { let api: Box> = client.all(); if let Err(e) = api.list(&ListParams::default().limit(1)).await { anyhow::bail!("Pods are not queryable; {e:?}") @@ -78,7 +78,7 @@ pub async fn check( pub fn error_policy( _pod: Arc, error: &ControllerError, - _ctx: Arc, + _ctx: Arc, ) -> Action { log::warn!("reconcile failed: {:?}", error); Action::requeue(std::time::Duration::from_secs(5 * 60)) @@ -96,7 +96,7 @@ pub fn error_policy( /// still have other broker Pods supporting them. If there /// are no other supporting broker Pods, delete one or both /// of the services. -pub async fn reconcile(pod: Arc, ctx: Arc) -> Result { +pub async fn reconcile(pod: Arc, ctx: Arc) -> Result { trace!("Reconciling broker pod {}", pod.name_any()); finalizer( &ctx.client.clone().all().as_inner(), @@ -108,7 +108,7 @@ pub async fn reconcile(pod: Arc, ctx: Arc) -> Result, ctx: Arc) -> Result { +async fn reconcile_inner(event: Event, ctx: Arc) -> Result { match event { Event::Apply(pod) => { let phase = get_pod_phase(&pod); @@ -158,12 +158,12 @@ fn get_pod_phase(pod: &Pod) -> String { async fn handle_pod( pod: Arc, - ctx: Arc, + ctx: Arc, desired_state: PodState, handler: F, ) -> anyhow::Result<()> where - F: FnOnce(Arc, Arc) -> Fut, + F: FnOnce(Arc, Arc) -> Fut, Fut: Future>, { trace!("handle_pod_if_needed - enter"); @@ -207,7 +207,7 @@ fn get_instance_and_configuration_from_pod(pod: Arc) -> anyhow::Result<(Str /// This is called when a broker Pod exits the Running phase and ensures /// that instance and configuration services are only running when /// supported by Running broker Pods. -async fn handle_non_running_pod(pod: Arc, ctx: Arc) -> anyhow::Result<()> { +async fn handle_non_running_pod(pod: Arc, ctx: Arc) -> anyhow::Result<()> { trace!("handle_non_running_pod - enter"); let namespace = pod.namespace().unwrap(); let (instance_id, config_name) = get_instance_and_configuration_from_pod(pod.clone())?; @@ -244,11 +244,21 @@ async fn handle_non_running_pod(pod: Arc, ctx: Arc) -> a svc_api.as_ref(), ) .await?; + let fallback_node = "unknown".to_string(); + let node = pod + .labels() + .get(AKRI_TARGET_NODE_LABEL_NAME) + .unwrap_or(&fallback_node); + + BROKER_POD_COUNT_METRIC + .with_label_values(&[&config_name, node]) + .dec(); // Only redeploy Pods that are managed by the Akri Controller (controlled by an Instance OwnerReference) if get_broker_pod_owner_kind(pod) == BrokerPodOwnerKind::Instance { - if let Ok(Some(instance)) = ctx.client.namespaced(&namespace).get(&instance_id).await { - super::instance_action::handle_instance_change(instance, ctx.client.clone()).await?; + let client: Box> = ctx.client.namespaced(&namespace); + if let Ok(Some(instance)) = client.get(&instance_id).await { + super::instance_action::handle_instance_change(&instance, ctx.client.clone()).await?; } } Ok(()) @@ -290,7 +300,7 @@ async fn cleanup_svc_if_unsupported( /// This is called when a Pod enters the Running phase and ensures /// that instance and configuration services are running as specified /// by the configuration. -async fn handle_running_pod(pod: Arc, ctx: Arc) -> anyhow::Result<()> { +async fn handle_running_pod(pod: Arc, ctx: Arc) -> anyhow::Result<()> { trace!("handle_running_pod - enter"); let namespace = pod.namespace().unwrap(); let (instance_name, configuration_name) = get_instance_and_configuration_from_pod(pod)?; @@ -341,7 +351,7 @@ async fn add_instance_and_configuration_services( namespace: &str, configuration_name: &str, configuration: &Configuration, - ctx: Arc, + ctx: Arc, ) -> anyhow::Result<()> { trace!( "add_instance_and_configuration_services - instance={:?}", @@ -706,7 +716,7 @@ mod tests { let pod = make_pod_with_owners_and_phase("instance_name", "copnfig_name", "Unknown", "Instance"); let pod_name = pod.name_unchecked(); - let ctx = Arc::new(ControllerContext::new(Arc::new( + let ctx = Arc::new(PodWatcherContext::new(Arc::new( MockControllerKubeClient::default(), ))); reconcile_inner(Event::Apply(Arc::new(pod)), ctx.clone()) @@ -724,7 +734,7 @@ mod tests { let pod = make_pod_with_owners_and_phase("instance_name", "config_name", "Pending", "Instance"); let pod_name = pod.name_unchecked(); - let ctx = Arc::new(ControllerContext::new(Arc::new( + let ctx = Arc::new(PodWatcherContext::new(Arc::new( MockControllerKubeClient::default(), ))); reconcile_inner(Event::Apply(Arc::new(pod)), ctx.clone()) @@ -743,7 +753,7 @@ mod tests { let pod = make_pod_with_owners_and_phase("instance_name", "config_name", "Running", "Instance"); let pod_name = pod.name_unchecked(); - let ctx = Arc::new(ControllerContext::new(Arc::new( + let ctx = Arc::new(PodWatcherContext::new(Arc::new( MockControllerKubeClient::default(), ))); ctx.known_pods @@ -813,7 +823,7 @@ mod tests { .return_once(|_| Box::new(mock_svc_api)) .with(mockall::predicate::eq("test-ns")); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); + let ctx = Arc::new(PodWatcherContext::new(Arc::new(mock))); reconcile_inner(Event::Apply(Arc::new(pod)), ctx.clone()) .await @@ -827,7 +837,7 @@ mod tests { fn controller_ctx_for_handle_ended_pod_if_needed( pod_list: ObjectList, delete_config_svc: bool, - ) -> ControllerContext { + ) -> PodWatcherContext { let mut mock = MockControllerKubeClient::default(); let mut mock_config_api: MockApi = MockApi::new(); mock_config_api.expect_get().return_once(|_| { @@ -878,7 +888,7 @@ mod tests { .expect_namespaced() .return_once(|_| Box::new(mock_svc_api)) .with(mockall::predicate::eq("test-ns")); - ControllerContext::new(Arc::new(mock)) + PodWatcherContext::new(Arc::new(mock)) } async fn test_reconcile_applied_terminated_phases(phase: &str) { diff --git a/discovery-utils/src/discovery/v0.rs b/discovery-utils/src/discovery/v0.rs index 08781a68e..a20490c38 100644 --- a/discovery-utils/src/discovery/v0.rs +++ b/discovery-utils/src/discovery/v0.rs @@ -9,10 +9,7 @@ pub struct RegisterDiscoveryHandlerRequest { /// Endpoint for the registering `DiscoveryHandler` #[prost(string, tag = "2")] pub endpoint: ::prost::alloc::string::String, - #[prost( - enumeration = "register_discovery_handler_request::EndpointType", - tag = "3" - )] + #[prost(enumeration = "register_discovery_handler_request::EndpointType", tag = "3")] pub endpoint_type: i32, /// Specifies whether this device could be used by multiple nodes (e.g. an IP camera) /// or can only be ever be discovered by a single node (e.g. a local USB device) @@ -22,7 +19,17 @@ pub struct RegisterDiscoveryHandlerRequest { /// Nested message and enum types in `RegisterDiscoveryHandlerRequest`. pub mod register_discovery_handler_request { /// Specifies the type of endpoint. - #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] #[repr(i32)] pub enum EndpointType { Uds = 0, @@ -68,7 +75,10 @@ pub struct DiscoverRequest { /// list of Key-value pairs containing additional information /// for the 'DiscoveryHandler' to discover devices #[prost(map = "string, message", tag = "2")] - pub discovery_properties: ::std::collections::HashMap<::prost::alloc::string::String, ByteData>, + pub discovery_properties: ::std::collections::HashMap< + ::prost::alloc::string::String, + ByteData, + >, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -87,8 +97,10 @@ pub struct Device { /// and set as environment variables in the device's broker Pods. May be information /// about where to find the device such as an RTSP URL or a device node (e.g. `/dev/video1`) #[prost(map = "string, string", tag = "2")] - pub properties: - ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, + pub properties: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, /// Optionally specify mounts for Pods that request this device as a resource #[prost(message, repeated, tag = "3")] pub mounts: ::prost::alloc::vec::Vec, @@ -133,8 +145,8 @@ pub struct DeviceSpec { /// Generated client implementations. pub mod registration_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; /// Registration is the service advertised by the Akri Agent. /// Any `DiscoveryHandler` can register with the Akri Agent. #[derive(Debug, Clone)] @@ -180,8 +192,9 @@ pub mod registration_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { RegistrationClient::new(InterceptedService::new(inner, interceptor)) } @@ -220,20 +233,22 @@ pub mod registration_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = - http::uri::PathAndQuery::from_static("/v0.Registration/RegisterDiscoveryHandler"); + let path = http::uri::PathAndQuery::from_static( + "/v0.Registration/RegisterDiscoveryHandler", + ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "v0.Registration", - "RegisterDiscoveryHandler", - )); + req.extensions_mut() + .insert(GrpcMethod::new("v0.Registration", "RegisterDiscoveryHandler")); self.inner.unary(req, path, codec).await } } @@ -241,8 +256,8 @@ pub mod registration_client { /// Generated client implementations. pub mod discovery_handler_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; #[derive(Debug, Clone)] pub struct DiscoveryHandlerClient { inner: tonic::client::Grpc, @@ -286,8 +301,9 @@ pub mod discovery_handler_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { DiscoveryHandlerClient::new(InterceptedService::new(inner, interceptor)) } @@ -329,14 +345,19 @@ pub mod discovery_handler_client { tonic::Response>, tonic::Status, > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/v0.DiscoveryHandler/Discover"); + let path = http::uri::PathAndQuery::from_static( + "/v0.DiscoveryHandler/Discover", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v0.DiscoveryHandler", "Discover")); @@ -381,7 +402,10 @@ pub mod registration_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -437,19 +461,27 @@ pub mod registration_server { "/v0.Registration/RegisterDiscoveryHandler" => { #[allow(non_camel_case_types)] struct RegisterDiscoveryHandlerSvc(pub Arc); - impl - tonic::server::UnaryService - for RegisterDiscoveryHandlerSvc - { + impl< + T: Registration, + > tonic::server::UnaryService + for RegisterDiscoveryHandlerSvc { type Response = super::Empty; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request< + super::RegisterDiscoveryHandlerRequest, + >, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::register_discovery_handler(&inner, request) + ::register_discovery_handler( + &inner, + request, + ) .await }; Box::pin(fut) @@ -478,14 +510,18 @@ pub mod registration_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } @@ -525,7 +561,8 @@ pub mod discovery_handler_server { /// Server streaming response type for the Discover method. type DiscoverStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, - > + Send + > + + Send + 'static; async fn discover( &self, @@ -555,7 +592,10 @@ pub mod discovery_handler_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -611,14 +651,16 @@ pub mod discovery_handler_server { "/v0.DiscoveryHandler/Discover" => { #[allow(non_camel_case_types)] struct DiscoverSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for DiscoverSvc - { + impl< + T: DiscoveryHandler, + > tonic::server::ServerStreamingService + for DiscoverSvc { type Response = super::DiscoverResponse; type ResponseStream = T::DiscoverStream; - type Future = - BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -653,14 +695,18 @@ pub mod discovery_handler_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/samples/brokers/udev-video-broker/src/util/camera.rs b/samples/brokers/udev-video-broker/src/util/camera.rs index 437b09cd8..4bd208251 100644 --- a/samples/brokers/udev-video-broker/src/util/camera.rs +++ b/samples/brokers/udev-video-broker/src/util/camera.rs @@ -13,8 +13,8 @@ pub struct NotifyResponse { /// Generated client implementations. pub mod camera_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; #[derive(Debug, Clone)] pub struct CameraClient { inner: tonic::client::Grpc, @@ -58,8 +58,9 @@ pub mod camera_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { CameraClient::new(InterceptedService::new(inner, interceptor)) } @@ -98,17 +99,19 @@ pub mod camera_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/camera.Camera/GetFrame"); let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("camera.Camera", "GetFrame")); + req.extensions_mut().insert(GrpcMethod::new("camera.Camera", "GetFrame")); self.inner.unary(req, path, codec).await } } @@ -148,7 +151,10 @@ pub mod camera_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -204,16 +210,21 @@ pub mod camera_server { "/camera.Camera/GetFrame" => { #[allow(non_camel_case_types)] struct GetFrameSvc(pub Arc); - impl tonic::server::UnaryService for GetFrameSvc { + impl tonic::server::UnaryService + for GetFrameSvc { type Response = super::NotifyResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = - async move { ::get_frame(&inner, request).await }; + let fut = async move { + ::get_frame(&inner, request).await + }; Box::pin(fut) } } @@ -240,14 +251,18 @@ pub mod camera_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } }