1
Fork 0

don't drop connection on spurious lapse

also reduced logging (systemd is a killer)
This commit is contained in:
Andy Killorin 2025-02-09 12:56:09 -05:00
parent 852b95dcc0
commit bd683d9b74
Signed by: ank
GPG key ID: 23F9463ECB67FE8C
3 changed files with 22 additions and 19 deletions

View file

@ -2,7 +2,6 @@ use std::time::Duration;
use anyhow::Context; use anyhow::Context;
use common::{Command, SensorData}; use common::{Command, SensorData};
use futures::Stream;
use gilrs::{Axis, Event, Gilrs}; use gilrs::{Axis, Event, Gilrs};
use tokio::{io::{AsyncReadExt, AsyncWriteExt, BufWriter}, net::{tcp::OwnedReadHalf, TcpStream}, task::JoinHandle, time::{sleep, Instant}}; use tokio::{io::{AsyncReadExt, AsyncWriteExt, BufWriter}, net::{tcp::OwnedReadHalf, TcpStream}, task::JoinHandle, time::{sleep, Instant}};
@ -39,10 +38,7 @@ async fn main() -> anyhow::Result<()> {
let gamepad = gamepads.gamepad(gamepad); let gamepad = gamepads.gamepad(gamepad);
let values = gamepad.state(); let values = gamepad.state();
dbg!(values);
let mut ly= values.axis_data(ly).map(|d| d.value()).unwrap_or(0.0); let mut ly= values.axis_data(ly).map(|d| d.value()).unwrap_or(0.0);
let mut rx= values.axis_data(rx).map(|d| d.value()).unwrap_or(0.0); let mut rx= values.axis_data(rx).map(|d| d.value()).unwrap_or(0.0);

View file

@ -21,7 +21,6 @@ impl Decoder for FramedCodec {
type Error = anyhow::Error; type Error = anyhow::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
println!("len: {}", src.len());
if !src.contains(&0) { if !src.contains(&0) {
return Ok(None); return Ok(None);
} }

View file

@ -6,7 +6,7 @@ use anyhow::{Context, Result};
use common::{Command, Response, SensorData, BAUDRATE}; use common::{Command, Response, SensorData, BAUDRATE};
use framed_codec::FramedCodec; use framed_codec::FramedCodec;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpSocket}, sync::{broadcast::{self, Receiver}, watch::{self, Sender}}, task::JoinHandle, time::timeout}; use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpSocket}, sync::{broadcast::{self, error::RecvError, Receiver}, watch::{self, Sender}}, task::JoinHandle, time::{error::Elapsed, timeout}};
use tokio_serial::SerialPortBuilderExt; use tokio_serial::SerialPortBuilderExt;
use tokio_util::codec::Framed; use tokio_util::codec::Framed;
@ -24,12 +24,12 @@ async fn main() -> Result<()> {
let (send, commands) = watch::channel(Command::Stop); let (send, commands) = watch::channel(Command::Stop);
let mut print_telem = sensor_data.resubscribe(); //let mut print_telem = sensor_data.resubscribe();
let _: JoinHandle<Result<()>> = tokio::spawn(async move { //let _: JoinHandle<Result<()>> = tokio::spawn(async move {
loop { // loop {
println!("sensor {:?}", print_telem.recv().await?); // println!("sensor {:?}", print_telem.recv().await?);
} // }
}); //});
let control_telem = sensor_data.resubscribe(); let control_telem = sensor_data.resubscribe();
tokio::spawn(async move { tokio::spawn(async move {
@ -44,6 +44,7 @@ async fn main() -> Result<()> {
println!("starting"); println!("starting");
write.send(Command::Enable).await?; write.send(Command::Enable).await?;
write.flush().await?; write.flush().await?;
println!("enable");
loop { loop {
let Some(Ok(data)) = read.next().await else { let Some(Ok(data)) = read.next().await else {
@ -61,13 +62,11 @@ async fn main() -> Result<()> {
write.send(Command::Enable).await?; write.send(Command::Enable).await?;
write.flush().await?; write.flush().await?;
//dbg!(&data);
if let Some(data) = data.sensor_data { if let Some(data) = data.sensor_data {
let _ = sensor_sender.send(data); let _ = sensor_sender.send(data);
} }
write.send(dbg!(commands.borrow().clone())).await?; write.send(commands.borrow().clone()).await?;
write.flush().await?; write.flush().await?;
} }
} }
@ -83,7 +82,11 @@ async fn control(sender: Sender<Command>, mut telem: Receiver<SensorData>) -> Re
let _: JoinHandle<Result<()>> = tokio::spawn(async move { let _: JoinHandle<Result<()>> = tokio::spawn(async move {
loop { loop {
let data = telem.recv().await?; let data = match telem.recv().await {
Ok(it) => it,
Err(RecvError::Lagged(_)) => {continue;},
Err(e) => {Err(e)?},
};
let data = postcard::to_stdvec(&data)?; let data = postcard::to_stdvec(&data)?;
write.write_u32(data.len() as u32).await?; write.write_u32(data.len() as u32).await?;
write.write_all(&data).await?; write.write_all(&data).await?;
@ -91,14 +94,19 @@ async fn control(sender: Sender<Command>, mut telem: Receiver<SensorData>) -> Re
}); });
loop { loop {
let len = timeout(Duration::from_millis(100), read.read_u32()).await??; let len = loop {
match timeout(Duration::from_millis(150), read.read_u32()).await {
Ok(it) => { break it?; },
Err(Elapsed) => {
sender.send(Command::Stop)?;
},
};
};
let mut buf = vec![0; len as usize]; let mut buf = vec![0; len as usize];
timeout(Duration::from_millis(100), read.read_exact(&mut buf)).await??; timeout(Duration::from_millis(100), read.read_exact(&mut buf)).await??;
let cmd: Command = postcard::from_bytes(&buf)?; let cmd: Command = postcard::from_bytes(&buf)?;
println!("recv {cmd:?}");
sender.send(cmd)?; sender.send(cmd)?;
} }
} }