Skip to content

Commit

Permalink
add mqtt
Browse files Browse the repository at this point in the history
  • Loading branch information
youngday committed Jul 19, 2024
1 parent f46b429 commit 32d19be
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 76 deletions.
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,11 @@ COM_TYPE:u32=0;//0=zeromq 1=ice_oryx2,2=?

### speed
15 fps
### queue
### queue (inspired from plotjuggler)
☑️ zeromq
☑️ ice_oryx2
mqtt
socket
tcp
http
☑️ mqtt
websocket

## goal

Expand Down
2 changes: 1 addition & 1 deletion examples/mqtt/mqtt_async_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ async fn requests(client: AsyncClient) {
info!("pub vec:{:?}",i);
i+=1;

time::sleep(Duration::from_secs(1)).await;
time::sleep(Duration::from_secs_f64(0.02)).await;
}
}
18 changes: 8 additions & 10 deletions examples/mqtt/mqtt_async_sub.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use log::{debug, error, info, trace, warn};
use log4rs;

use rumqttc::tokio_rustls::rustls::internal::msgs::base::Payload;
use tmq::publish;
use tokio::{task, time};

Expand Down Expand Up @@ -37,6 +36,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

loop {
let event = eventloop.poll().await;
//payload unpack ref https://github.com/bytebeamio/rumqtt/issues/617
match &event {
Ok(v) => {
// info!("Event = {v:?}");
Expand All @@ -52,8 +52,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
let _payload=p.payload.clone();
info!("\ntopic = {0:?},payload = {1:?}",_topic,_payload.as_ref());



}
Packet::PubAck(_) => {}
Packet::PingReq(_) => {}
Expand All @@ -69,18 +67,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
Packet::UnsubAck(_) => {}
Packet::Disconnect(_) => {}
}
}
Outgoing(o) => {
}//pack
Outgoing(_o) => {
// info!("Outgoing = {o:?}");
}
}
}
}//event
}//ok
Err(e) => {
error!("Error = {e:?}");
return Ok(());
}
}
}
}//err
}//result
}//loop
}

async fn requests(client: AsyncClient) {
Expand Down
59 changes: 0 additions & 59 deletions examples/mqtt/mqtt_asyncpubsub.rs

This file was deleted.

95 changes: 94 additions & 1 deletion examples/plot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::thread;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio::{task, time};

use realtime_plot::draw_piston_window;
use realtime_plot::Settings;
Expand All @@ -24,12 +25,22 @@ use tmq::{subscribe, Context, Result};
use iceoryx2::{port::subscriber, prelude::*};
use realtime_plot::transmission_data::TransmissionData;

use rumqttc::v5::mqttbytes::v5::Packet;
use rumqttc::v5::mqttbytes::v5::Packet::Publish;
use rumqttc::v5::mqttbytes::v5::Packet::SubAck;
use rumqttc::v5::mqttbytes::QoS;
use rumqttc::v5::{
AsyncClient,
Event::{Incoming, Outgoing},
MqttOptions,
};

const CYCLE_TIME: Duration = Duration::from_millis(10);

const FPS: u32 = 15;
const LENGTH: u32 = 20;
const N_DATA_POINTS: usize = (FPS * LENGTH) as usize;
const COM_TYPE:u32=1;//0=zeromq 1=ice_oryx2,2=?
const COM_TYPE:u32=2;//0=zeromq 1=ice_oryx2,2=mqtt?
#[tokio::main]
async fn main() {
let mut window: PistonWindow = WindowSettings::new("Real Time CPU Usage", [450, 300])
Expand Down Expand Up @@ -100,6 +111,77 @@ async fn main() {
info!("exit ...");
});
// let result = computation.join().unwrap();//TODO: block and nonblock
} else if COM_TYPE==2 {
let mut mqttoptions = MqttOptions::new("test-2", "localhost", 1884);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
task::spawn(async move {
requests(client).await;
time::sleep(Duration::from_secs(3)).await;
});

task::spawn(async move {
loop {
let now = Instant::now(); // 程序起始时间
info!("mqtt start: {:?}", now);
let event = eventloop.poll().await;
match &event {
Ok(v) => {
// info!("Event = {v:?}");
match &v {
Incoming(i) => {
// info!("incoming = {i:?}");
match &i {
Packet::Connect(_, _, _) => {}
Packet::ConnAck(_) => {}
Publish(p) => {
// info!("publish = {p:?}");
let _topic=p.topic.clone();
let _payload=p.payload.clone();
let val=_payload.as_ref()[2].as_f64()*0.01;//[0,1,val]
info!("\ntopic = {0:?},payload = {1:?}",_topic,_payload.as_ref());


let end = now.elapsed().as_millis();
info!("mqtt end,dur: {:?} ms.", end);
let ret_send = sender.send(val);
info!("ret_send: {:?}", ret_send);
info!("🟢 send val: {:?}", val);


}
Packet::PubAck(_) => {}
Packet::PingReq(_) => {}
Packet::PingResp(_) => {}
Packet::Subscribe(_) => {}
SubAck(ack) => {
info!("ack = {ack:?}");
}
Packet::PubRec(_) => {}
Packet::PubRel(_) => {}
Packet::PubComp(_) => {}
Packet::Unsubscribe(_) => {}
Packet::UnsubAck(_) => {}
Packet::Disconnect(_) => {}
}
}//pack
Outgoing(_o) => {
// info!("Outgoing = {o:?}");
}
}//event
}//ok
Err(e) => {
error!("Error = {e:?}");
// return Ok(());
}//err
}//result
}//loop
});




}else {

}
Expand Down Expand Up @@ -219,3 +301,14 @@ async fn zmq_sub(socket: &mut subscribe::Subscribe) -> Result<f64> {
}
Ok(value)
}


async fn requests(client: AsyncClient) {
loop {
client.subscribe("hello", QoS::AtMostOnce).await.unwrap();
time::sleep(Duration::from_secs(1)).await;
// info!("subscribe:");
}

// time::sleep(Duration::from_secs(120)).await;
}

0 comments on commit 32d19be

Please sign in to comment.