Skip to content

Commit

Permalink
Created Procedural Macros For Pub Sub
Browse files Browse the repository at this point in the history
Signed-off-by: zachary <[email protected]>
  • Loading branch information
zedgell committed Mar 18, 2023
1 parent 4cf17de commit b44d390
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 66 deletions.
11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ prost = "0.8"
bytes = "1"
prost-types = "0.8"
async-trait = "0.1"
serde = {version = "1.0.152", features = ["derive"], optional = true}
serde_json = {version = "1.0.91", optional = true }
proc_macros = {path = "./proc-macros", optional = true}

[workspace]
members = ["proc-macros"]

[features]
serde = ["dep:serde", "dep:serde_json"]
proc-macros = ["dep:proc_macros"]
full = ["serde", "proc-macros"]

[build-dependencies]
tonic-build = "0.5"
Expand Down
4 changes: 2 additions & 2 deletions examples/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ To run this example:
1. Start Subscriber (expose gRPC server receiver on port 50051):
```bash
dapr run --app-id rust-subscriber --app-protocol grpc --app-port 50051 cargo run -- --example subscriber
dapr run --app-id rust-subscriber --app-protocol grpc --app-port 50051 cargo run -- --example subscriber --features full
```
2. Start Publisher:
```bash
dapr run --app-id rust-publisher --app-protocol grpc cargo run -- --example publisher
dapr run --app-id rust-publisher --app-protocol grpc cargo run -- --example publisher --features full
```
18 changes: 17 additions & 1 deletion examples/pubsub/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
use std::{collections::HashMap, thread, time::Duration};

use dapr::serde::{Serialize, Deserialize};

use dapr::serde_json;


#[derive(Serialize, Deserialize, Debug)]
struct Order {
pub order_number: i32,
pub order_details: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
Expand All @@ -23,12 +34,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let topic = "A".to_string();

for count in 0..100 {

let order = Order {
order_number: count,
order_details: format!("Count is {}", count),
};
// message metadata
let mut metadata = HashMap::<String, String>::new();
metadata.insert("count".to_string(), count.to_string());

// message
let message = format!("{} => hello from rust!", &count).into_bytes();
let message = serde_json::to_string(&order).unwrap().into_bytes();

client
.publish_event(
Expand Down
76 changes: 13 additions & 63 deletions examples/pubsub/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,28 @@
use tonic::{transport::Server, Request, Response, Status};

use dapr::proc_macros::topic;
use dapr::{
appcallback::*,
dapr::dapr::proto::runtime::v1::app_callback_server::{AppCallback, AppCallbackServer},
};
use tonic::{transport::Server, Request, Response, Status};

#[derive(Default)]
pub struct AppCallbackService {}

#[tonic::async_trait]
impl AppCallback for AppCallbackService {
/// Invokes service method with InvokeRequest.
async fn on_invoke(
&self,
_request: Request<InvokeRequest>,
) -> Result<Response<InvokeResponse>, Status> {
Ok(Response::new(InvokeResponse::default()))
}

/// Lists all topics subscribed by this app.
///
/// NOTE: Dapr runtime will call this method to get
/// the list of topics the app wants to subscribe to.
/// In this example, the app is subscribing to topic `A`.
async fn list_topic_subscriptions(
&self,
_request: Request<()>,
) -> Result<Response<ListTopicSubscriptionsResponse>, Status> {
let topic = "A".to_string();
let pubsub_name = "pubsub".to_string();

let list_subscriptions = ListTopicSubscriptionsResponse::topic(pubsub_name, topic);

Ok(Response::new(list_subscriptions))
}

/// Subscribes events from Pubsub.
async fn on_topic_event(
&self,
request: Request<TopicEventRequest>,
) -> Result<Response<TopicEventResponse>, Status> {
let r = request.into_inner();
let data = &r.data;
let data_content_type = &r.data_content_type;

let message = String::from_utf8_lossy(&data);
println!("Message: {}", &message);
println!("Content-Type: {}", &data_content_type);

Ok(Response::new(TopicEventResponse::default()))
}
use dapr::serde::{Deserialize, Serialize};

/// Lists all input bindings subscribed by this app.
async fn list_input_bindings(
&self,
_request: Request<()>,
) -> Result<Response<ListInputBindingsResponse>, Status> {
Ok(Response::new(ListInputBindingsResponse::default()))
}
#[derive(Serialize, Deserialize, Debug)]
struct Order {
pub order_number: i32,
pub order_details: String,
}

/// Listens events from the input bindings.
async fn on_binding_event(
&self,
_request: Request<BindingEventRequest>,
) -> Result<Response<BindingEventResponse>, Status> {
Ok(Response::new(BindingEventResponse::default()))
}
#[topic(pub_sub_name = "pubsub", topic = "A")]
async fn handle_event(order: Order) {
println!("{:#?}", order)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::]:50051".parse().unwrap();
let addr = "127.0.0.1:50051".parse().unwrap();

let callback_service = AppCallbackService::default();
let callback_service = HandleEvent::default();

println!("AppCallback server listening on: {}", addr);

Expand Down
3 changes: 3 additions & 0 deletions proc-macros/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/target
**/*.rs.bk
Cargo.lock
16 changes: 16 additions & 0 deletions proc-macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "proc_macros"
version = "0.0.0"
authors = ["Zachary Edgell <[email protected]>"]
description = "A `cargo generate` template for quick-starting a procedural macro crate"
keywords = ["template", "proc_macro", "procmacro"]
edition = "2018"

[lib]
proc-macro = true

[dependencies]
quote = "1"
proc-macro2 = "1.0"
syn = "1.0"

211 changes: 211 additions & 0 deletions proc-macros/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
extern crate proc_macro;
use proc_macro::TokenStream;
use proc_macro2::{Ident, TokenTree};

use quote::{format_ident, quote};
use syn::parse::{Parse, ParseStream};
use syn::{parse_macro_input, LitStr};

macro_rules! derive_parse {(
@derive_only
$( #[$attr:meta] )*
$pub:vis
struct $StructName:ident {
$(
$( #[$field_attr:meta] )*
$field_pub:vis
$field_name:ident : $FieldTy:ty
),* $(,)?
}
) => (
impl Parse for $StructName {
fn parse (input: ParseStream)
-> ::syn::Result<Self>
{
mod kw {
$(
::syn::custom_keyword!( $field_name );
)*
}
use ::core::ops::Not as _;

$(
let mut $field_name = ::core::option::Option::None::< $FieldTy >;
)*
while input.is_empty().not() {
let lookahead = input.lookahead1();
match () {
$(
_case if lookahead.peek(kw::$field_name) => {
let span = input.parse::<kw::$field_name>().unwrap().span;
let _: ::syn::Token![ = ] = input.parse()?;
let prev = $field_name.replace(input.parse()?);
if prev.is_some() {
return ::syn::Result::Err(::syn::Error::new(span, "Duplicate key"));
}
},
)*
_default => return ::syn::Result::Err(lookahead.error()),
}
let _: ::core::option::Option<::syn::Token![ , ]> = input.parse()?;
}
Ok(Self {
$(
$field_name: $field_name.ok_or_else(|| ::syn::Error::new(
::proc_macro2::Span::call_site(),
::core::concat!("Missing key `", ::core::stringify!($field_name), "`"),
))?,
)*
})
}
}
); (
$( #[$attr:meta] )* $pub:vis struct $($rest:tt)*
) => (
$( #[$attr] )* $pub struct $($rest)*

derive_parse! { @derive_only $( #[$attr] )* $pub struct $($rest)* }
)}

derive_parse! {
struct TopicArgs {
pub_sub_name: LitStr,
topic: LitStr
}
}

/// Example of user-defined [procedural macro attribute][1].
///
/// [1]: https://doc.rust-lang.org/reference/procedural-macros.html#attribute-macros
#[proc_macro_attribute]
pub fn topic(args: TokenStream, input: TokenStream) -> TokenStream {
let new_input = proc_macro2::TokenStream::from(input);
let mut iter = new_input.clone().into_iter().filter(|i| match i {
TokenTree::Group(_) => true,
TokenTree::Ident(_) => true,
TokenTree::Punct(_) => false,
TokenTree::Literal(_) => false,
});

let mut current = iter.next().unwrap();

while current.to_string() != "fn" {
current = iter.next().unwrap()
}

let name = iter.next().unwrap();

let struct_name = name
.to_string()
.split('_')
.into_iter()
.map(|i| {
let mut chars: Vec<char> = i.chars().collect();
chars[0] = chars[0].to_ascii_uppercase();
let new_string: String = chars.into_iter().collect();
new_string
})
.collect::<Vec<String>>()
.join("");

let name_ident = Ident::new(name.to_string().as_str(), name.span());

let struct_name_ident = Ident::new(struct_name.as_str(), name.span());

let vars: Vec<String> = iter
.next()
.unwrap()
.to_string()
.replace(['(', ')'], "")
.split(':')
.into_iter()
.enumerate()
.filter(|&(i, _)| i % 2 != 0)
.map(|(_, i)| i.trim().to_string())
.collect();

assert_eq!(vars.len(), 1, "Expected to only have one input variable");

let parse_statement = match vars[0] == *"String" {
true => {
quote! {
let message = message.to_string();
}
}
false => {
let type_ident = format_ident!("{}", vars[0]);
println!("{}", type_ident);
quote! {
let message: #type_ident = dapr::serde_json::from_str(message.to_string().as_str()).unwrap();
}
}
};

let args = parse_macro_input!(args as TopicArgs);

let topic = args.topic.value();

let pub_sub_name = args.pub_sub_name.value();

let tokens = quote! {
#new_input

#[derive(Default)]
struct #struct_name_ident;

#[tonic::async_trait]
impl AppCallback for #struct_name_ident {
async fn on_invoke(
&self,
_request: Request<InvokeRequest>,
) -> Result<Response<InvokeResponse>, Status> {
Ok(Response::new(InvokeResponse::default()))
}

async fn list_topic_subscriptions(
&self,
_request: Request<()>,
) -> Result<Response<ListTopicSubscriptionsResponse>, Status> {
let topic = #topic.to_string();
let pubsub_name = #pub_sub_name.to_string();

let list_subscriptions = ListTopicSubscriptionsResponse::topic(pubsub_name, topic);

Ok(Response::new(list_subscriptions))
}

async fn on_topic_event(
&self,
request: Request<TopicEventRequest>,
) -> Result<Response<TopicEventResponse>, Status> {
let r = request.into_inner();
let data = &r.data;
let data_content_type = &r.data_content_type;

let message = String::from_utf8_lossy(&data);

#parse_statement

#name_ident(message).await;

Ok(Response::new(TopicEventResponse::default()))
}

async fn list_input_bindings(
&self,
_request: Request<()>,
) -> Result<Response<ListInputBindingsResponse>, Status> {
Ok(Response::new(ListInputBindingsResponse::default()))
}

async fn on_binding_event(
&self,
_request: Request<BindingEventRequest>,
) -> Result<Response<BindingEventResponse>, Status> {
Ok(Response::new(BindingEventResponse::default()))
}
}
};

tokens.into()
}
Loading

0 comments on commit b44d390

Please sign in to comment.