1
Fork 0

Compare commits

...

2 commits

3 changed files with 39 additions and 29 deletions

View file

@ -1,5 +1,7 @@
use std::result;
use anyhow::{anyhow, Ok}; use anyhow::{anyhow, Ok};
use common::Command; use common::{Command, Response};
use framed::{BoxPayload, FRAME_END_SYMBOL}; use framed::{BoxPayload, FRAME_END_SYMBOL};
use tokio_util::{bytes::BytesMut, codec::{Decoder, Encoder}}; use tokio_util::{bytes::BytesMut, codec::{Decoder, Encoder}};
@ -15,26 +17,26 @@ impl FramedCodec {
} }
impl Decoder for FramedCodec { impl Decoder for FramedCodec {
type Item = Vec<u8>; type Item = Response;
type Error = anyhow::Error; type Error = anyhow::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < 4 { println!("len: {}", src.len());
if !src.contains(&0) {
return Ok(None); return Ok(None);
} }
let len = u32::from_be_bytes(src[..4].try_into()?) as usize; let (message, remainder) = match postcard::take_from_bytes_cobs(src) {
result::Result::Ok(v) => v,
result::Result::Err(e) => {
src.clear();
Err(e)?
}
};
if src.len() < len + 4 { src.clear();
return Ok(None);
}
let data = src.split_to(len + 4); Ok(Some(message))
let mut out = Vec::new();
out.extend_from_slice(&data[4..]);
Ok(Some(out))
} }
} }

View file

@ -1,10 +1,12 @@
#![feature(async_closure)] #![feature(async_closure)]
use std::time::Duration;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use common::{Command, Response, BAUDRATE}; use common::{Command, Response, BAUDRATE};
use framed_codec::FramedCodec; use framed_codec::FramedCodec;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use tokio::{io::AsyncReadExt, net::{TcpListener, TcpSocket}, sync::{broadcast, watch::{self, Sender}}}; use tokio::{io::AsyncReadExt, net::{TcpListener, TcpSocket}, sync::{broadcast, watch::{self, Sender}}, task::JoinHandle, time::timeout};
use tokio_serial::SerialPortBuilderExt; use tokio_serial::SerialPortBuilderExt;
use tokio_util::codec::Framed; use tokio_util::codec::Framed;
@ -14,19 +16,28 @@ mod framed_codec;
async fn main() -> Result<()> { async fn main() -> Result<()> {
let mut serial = tokio_serial::new("/dev/ttyAMA0", BAUDRATE).open_native_async()?; let mut serial = tokio_serial::new("/dev/ttyAMA0", BAUDRATE).open_native_async()?;
let (sensor_sender, sensor_data) = broadcast::channel(5); let (sensor_sender, mut sensor_data) = broadcast::channel(5);
serial.set_exclusive(false)?; serial.set_exclusive(false)?;
let (mut write, read) = Framed::new(serial, FramedCodec::new()).split(); let (mut write, mut read) = Framed::new(serial, FramedCodec::new()).split();
let mut read = read.map(|data| -> Result<Response> {
Ok(postcard::from_bytes::<Response>(&data.ok().context("decode err")?)?)
});
let (send, commands) = watch::channel(Command::Stop); let (send, commands) = watch::channel(Command::Stop);
tokio::spawn(control(send)); let _: JoinHandle<Result<()>> = tokio::spawn(async move {
loop {
println!("sensor {:?}", sensor_data.recv().await?);
}
});
tokio::spawn(async move {
loop {
let _ = send.send(Command::Stop);
if let Err(e) = control(send.clone()).await {
println!("controller exited: {e}");
}
}
});
println!("starting"); println!("starting");
write.send(Command::Enable).await?; write.send(Command::Enable).await?;
@ -67,9 +78,10 @@ async fn control(sender: Sender<Command>) -> Result<()> {
println!("connected to {addr:?}"); println!("connected to {addr:?}");
loop { loop {
let len = stream.read_u32().await?;
let len = timeout(Duration::from_millis(30), stream.read_u32()).await??;
let mut buf = vec![0; len as usize]; let mut buf = vec![0; len as usize];
stream.read_exact(&mut buf).await?; timeout(Duration::from_millis(30), stream.read_exact(&mut buf)).await??;
let cmd: Command = postcard::from_bytes(&buf)?; let cmd: Command = postcard::from_bytes(&buf)?;

View file

@ -94,7 +94,7 @@ async fn main(spawner: Spawner) {
command command
} else { } else {
// stop all motors // stop all motors
//drive_pwm.set_config(&stopped); drive_pwm.set_config(&stopped);
info!("waiting for command"); info!("waiting for command");
COMMANDS.receive().await COMMANDS.receive().await
}; };
@ -167,15 +167,11 @@ async fn telemetry(mut tx: BufferedUartTx<'static, UART1>, enabled: &'static Ato
uptime_micros: Instant::now().as_micros(), uptime_micros: Instant::now().as_micros(),
}; };
let Ok(serialized) = postcard::to_vec::<_, 1024>(&packet) else { let Ok(serialized) = postcard::to_vec_cobs::<_, 1024>(&packet) else {
error!("serialization error"); error!("serialization error");
continue; continue;
}; };
let len = u32::to_be_bytes(serialized.len() as u32);
if let Err(e) = tx.write_all(&len).await {
error!("transport error {e:?}");
}
if let Err(e) = tx.write_all(&serialized).await { if let Err(e) = tx.write_all(&serialized).await {
error!("transport error {e:?}"); error!("transport error {e:?}");
} }