From 951dafc7d57a7e5fa093a851c65ea03b27a61db6 Mon Sep 17 00:00:00 2001 From: Andy Killorin <37423245+Speedy6451@users.noreply.github.com> Date: Tue, 4 Feb 2025 17:49:41 -0500 Subject: [PATCH] switched southbridge->northbridge to postcard cobs --- northbridge/src/framed_codec.rs | 28 +++++++++++++++------------- northbridge/src/main.rs | 24 ++++++++++++++++-------- southbridge/src/main.rs | 8 ++------ 3 files changed, 33 insertions(+), 27 deletions(-) diff --git a/northbridge/src/framed_codec.rs b/northbridge/src/framed_codec.rs index 505b8a6..9a7b54b 100644 --- a/northbridge/src/framed_codec.rs +++ b/northbridge/src/framed_codec.rs @@ -1,5 +1,7 @@ +use std::result; + use anyhow::{anyhow, Ok}; -use common::Command; +use common::{Command, Response}; use framed::{BoxPayload, FRAME_END_SYMBOL}; use tokio_util::{bytes::BytesMut, codec::{Decoder, Encoder}}; @@ -15,26 +17,26 @@ impl FramedCodec { } impl Decoder for FramedCodec { - type Item = Vec; + type Item = Response; type Error = anyhow::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - if src.len() < 4 { + println!("len: {}", src.len()); + if !src.contains(&0) { return Ok(None); } - let len = u32::from_be_bytes(src[..4].try_into()?) as usize; + let (message, remainder) = match postcard::take_from_bytes_cobs(src) { + result::Result::Ok(v) => v, + result::Result::Err(e) => { + src.clear(); + Err(e)? + } + }; - if src.len() < len + 4 { - return Ok(None); - } + src.clear(); - let data = src.split_to(len + 4); - - let mut out = Vec::new(); - out.extend_from_slice(&data[4..]); - - Ok(Some(out)) + Ok(Some(message)) } } diff --git a/northbridge/src/main.rs b/northbridge/src/main.rs index 4ff4464..2c6ec78 100644 --- a/northbridge/src/main.rs +++ b/northbridge/src/main.rs @@ -4,7 +4,7 @@ use anyhow::{Context, Result}; use common::{Command, Response, BAUDRATE}; use framed_codec::FramedCodec; use futures::{SinkExt, StreamExt}; -use tokio::{io::AsyncReadExt, net::{TcpListener, TcpSocket}, sync::{broadcast, watch::{self, Sender}}}; +use tokio::{io::AsyncReadExt, net::{TcpListener, TcpSocket}, sync::{broadcast, watch::{self, Sender}}, task::JoinHandle}; use tokio_serial::SerialPortBuilderExt; use tokio_util::codec::Framed; @@ -14,19 +14,27 @@ mod framed_codec; async fn main() -> Result<()> { let mut serial = tokio_serial::new("/dev/ttyAMA0", BAUDRATE).open_native_async()?; - let (sensor_sender, sensor_data) = broadcast::channel(5); + let (sensor_sender, mut sensor_data) = broadcast::channel(5); serial.set_exclusive(false)?; - let (mut write, read) = Framed::new(serial, FramedCodec::new()).split(); - - let mut read = read.map(|data| -> Result { - Ok(postcard::from_bytes::(&data.ok().context("decode err")?)?) - }); + let (mut write, mut read) = Framed::new(serial, FramedCodec::new()).split(); let (send, commands) = watch::channel(Command::Stop); - tokio::spawn(control(send)); + let _: JoinHandle> = tokio::spawn(async move { + loop { + println!("sensor {:?}", sensor_data.recv().await?); + } + }); + + tokio::spawn(async move { + loop { + if let Err(e) = control(send.clone()).await { + println!("controller exited: {e}"); + } + } + }); println!("starting"); write.send(Command::Enable).await?; diff --git a/southbridge/src/main.rs b/southbridge/src/main.rs index 3ccef24..2599bb2 100644 --- a/southbridge/src/main.rs +++ b/southbridge/src/main.rs @@ -94,7 +94,7 @@ async fn main(spawner: Spawner) { command } else { // stop all motors - //drive_pwm.set_config(&stopped); + drive_pwm.set_config(&stopped); info!("waiting for command"); COMMANDS.receive().await }; @@ -167,15 +167,11 @@ async fn telemetry(mut tx: BufferedUartTx<'static, UART1>, enabled: &'static Ato uptime_micros: Instant::now().as_micros(), }; - let Ok(serialized) = postcard::to_vec::<_, 1024>(&packet) else { + let Ok(serialized) = postcard::to_vec_cobs::<_, 1024>(&packet) else { error!("serialization error"); continue; }; - let len = u32::to_be_bytes(serialized.len() as u32); - if let Err(e) = tx.write_all(&len).await { - error!("transport error {e:?}"); - } if let Err(e) = tx.write_all(&serialized).await { error!("transport error {e:?}"); }