1
Fork 0

Compare commits

...

2 commits

3 changed files with 39 additions and 29 deletions

View file

@ -1,5 +1,7 @@
use std::result;
use anyhow::{anyhow, Ok};
use common::Command;
use common::{Command, Response};
use framed::{BoxPayload, FRAME_END_SYMBOL};
use tokio_util::{bytes::BytesMut, codec::{Decoder, Encoder}};
@ -15,26 +17,26 @@ impl FramedCodec {
}
impl Decoder for FramedCodec {
type Item = Vec<u8>;
type Item = Response;
type Error = anyhow::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < 4 {
println!("len: {}", src.len());
if !src.contains(&0) {
return Ok(None);
}
let len = u32::from_be_bytes(src[..4].try_into()?) as usize;
if src.len() < len + 4 {
return Ok(None);
let (message, remainder) = match postcard::take_from_bytes_cobs(src) {
result::Result::Ok(v) => v,
result::Result::Err(e) => {
src.clear();
Err(e)?
}
};
let data = src.split_to(len + 4);
src.clear();
let mut out = Vec::new();
out.extend_from_slice(&data[4..]);
Ok(Some(out))
Ok(Some(message))
}
}

View file

@ -1,10 +1,12 @@
#![feature(async_closure)]
use std::time::Duration;
use anyhow::{Context, Result};
use common::{Command, Response, BAUDRATE};
use framed_codec::FramedCodec;
use futures::{SinkExt, StreamExt};
use tokio::{io::AsyncReadExt, net::{TcpListener, TcpSocket}, sync::{broadcast, watch::{self, Sender}}};
use tokio::{io::AsyncReadExt, net::{TcpListener, TcpSocket}, sync::{broadcast, watch::{self, Sender}}, task::JoinHandle, time::timeout};
use tokio_serial::SerialPortBuilderExt;
use tokio_util::codec::Framed;
@ -14,19 +16,28 @@ mod framed_codec;
async fn main() -> Result<()> {
let mut serial = tokio_serial::new("/dev/ttyAMA0", BAUDRATE).open_native_async()?;
let (sensor_sender, sensor_data) = broadcast::channel(5);
let (sensor_sender, mut sensor_data) = broadcast::channel(5);
serial.set_exclusive(false)?;
let (mut write, read) = Framed::new(serial, FramedCodec::new()).split();
let mut read = read.map(|data| -> Result<Response> {
Ok(postcard::from_bytes::<Response>(&data.ok().context("decode err")?)?)
});
let (mut write, mut read) = Framed::new(serial, FramedCodec::new()).split();
let (send, commands) = watch::channel(Command::Stop);
tokio::spawn(control(send));
let _: JoinHandle<Result<()>> = tokio::spawn(async move {
loop {
println!("sensor {:?}", sensor_data.recv().await?);
}
});
tokio::spawn(async move {
loop {
let _ = send.send(Command::Stop);
if let Err(e) = control(send.clone()).await {
println!("controller exited: {e}");
}
}
});
println!("starting");
write.send(Command::Enable).await?;
@ -67,9 +78,10 @@ async fn control(sender: Sender<Command>) -> Result<()> {
println!("connected to {addr:?}");
loop {
let len = stream.read_u32().await?;
let len = timeout(Duration::from_millis(30), stream.read_u32()).await??;
let mut buf = vec![0; len as usize];
stream.read_exact(&mut buf).await?;
timeout(Duration::from_millis(30), stream.read_exact(&mut buf)).await??;
let cmd: Command = postcard::from_bytes(&buf)?;

View file

@ -94,7 +94,7 @@ async fn main(spawner: Spawner) {
command
} else {
// stop all motors
//drive_pwm.set_config(&stopped);
drive_pwm.set_config(&stopped);
info!("waiting for command");
COMMANDS.receive().await
};
@ -167,15 +167,11 @@ async fn telemetry(mut tx: BufferedUartTx<'static, UART1>, enabled: &'static Ato
uptime_micros: Instant::now().as_micros(),
};
let Ok(serialized) = postcard::to_vec::<_, 1024>(&packet) else {
let Ok(serialized) = postcard::to_vec_cobs::<_, 1024>(&packet) else {
error!("serialization error");
continue;
};
let len = u32::to_be_bytes(serialized.len() as u32);
if let Err(e) = tx.write_all(&len).await {
error!("transport error {e:?}");
}
if let Err(e) = tx.write_all(&serialized).await {
error!("transport error {e:?}");
}