#![feature(async_closure)] use std::{sync::Arc, time::Duration}; use anyhow::{Context, Result}; use common::{Response, RobotState, SensorData, BAUDRATE}; use crossbeam::queue::ArrayQueue; use framed_codec::FramedCodec; use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt}; use nalgebra::{SimdPartialOrd, SimdValue, Vector3}; use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpSocket}, sync::{broadcast::{self, error::RecvError, Receiver, Sender}, watch::{self, channel }, RwLock}, task::JoinHandle, time::{error::Elapsed, sleep, timeout}}; use tokio_serial::SerialPortBuilderExt; use tokio_util::codec::Framed; mod framed_codec; #[derive(Default)] struct RobotInfo { bus_voltage: f32, } #[tokio::main] async fn main() -> Result<()> { let mut serial = tokio_serial::new("/dev/ttyAMA0", BAUDRATE).open_native_async()?; let (sensor_sender, sensor_data) = broadcast::channel(5); serial.set_exclusive(false)?; let (mut write, mut read) = Framed::new(serial, FramedCodec::new()).split(); let (sender, mut command_receiver) = channel(RobotState::stopped()); let state = Arc::new(RwLock::new(RobotInfo { bus_voltage: f32::MAX, ..Default::default() })); tokio::spawn(update_telem(state.clone(), sensor_data.resubscribe())); let control_telem = sensor_data.resubscribe(); tokio::spawn(async move { loop { if let Err(e) = control(sender.clone(), control_telem.resubscribe()).await { println!("controller exited: {e}"); } } }); tokio::spawn(async move { let telem_sender = sensor_sender.clone(); read.for_each( async |telem| { let Ok(telem) = telem else {return;}; let Some(data) = telem.sensor_data else {return;}; telem_sender.send(data).unwrap(); }).await; }); loop { let _ = timeout(Duration::from_millis(20), command_receiver.changed()); let command = command_receiver.borrow(); println!("sending {command:?}"); write.send(command.clone()).await?; } } async fn control(sender: watch::Sender, mut telem: Receiver) -> Result<()> { let listener = TcpListener::bind("0.0.0.0:3322").await?; let (stream, addr) = listener.accept().await?; println!("connected to {addr:?}"); let (mut read, mut write) = stream.into_split(); let _: JoinHandle> = tokio::spawn(async move { loop { let data = match telem.recv().await { Ok(it) => it, Err(RecvError::Lagged(_)) => {continue;}, Err(e) => {Err(e)?}, }; let data = postcard::to_stdvec(&data)?; write.write_u32(data.len() as u32).await?; write.write_all(&data).await?; } }); loop { let len = loop { match timeout(Duration::from_millis(150), read.read_u32()).await { Ok(it) => { break it?; }, Err(Elapsed) => { sender.send(RobotState::stopped())?; continue; }, }; }; let mut buf = vec![0; len as usize]; timeout(Duration::from_millis(100), read.read_exact(&mut buf)).await??; let cmd: RobotState = postcard::from_bytes(&buf)?; sender.send(cmd)?; } } async fn update_telem(state: Arc>, mut telem: Receiver) -> Result<()> { loop { let telem = telem.recv().await?; println!("sensor {telem:?}"); match telem { SensorData::BusVoltage(voltage) => { state.write().await.bus_voltage = voltage; } _ => {} } } }