use std::{array::from_ref, net::SocketAddr, result, sync::Arc, thread::{self}, time::{Duration, SystemTime, UNIX_EPOCH}}; use battery::Manager; use image::{codecs::jpeg::JpegEncoder, ImageBuffer, Rgb}; use nokhwa::{pixel_format::RgbFormat, utils::{ApiBackend, RequestedFormat, RequestedFormatType, Resolution}, Camera}; use anyhow::{Context, Ok, Result}; use serde::{Deserialize, Serialize}; use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, net::{TcpListener, TcpStream}, runtime::Runtime, sync::{Notify, RwLock}, task::{JoinHandle, LocalSet}, time::sleep}; fn main() -> Result<()>{ let await_frame = Arc::new(Notify::new()); let latest_frame = Arc::new(RwLock::new(Vec::new())); { let runtime = Runtime::new()?; let await_frame = await_frame.clone(); let latest_frame = latest_frame.clone(); thread::spawn(move || { let local = LocalSet::new(); local.block_on(&runtime, camera_manager(await_frame, latest_frame)).unwrap(); }); } { let runtime = Runtime::new()?; let await_frame = await_frame.clone(); let latest_frame = latest_frame.clone(); thread::spawn(move || { let local = LocalSet::new(); local.block_on(&runtime, server(await_frame, latest_frame)).unwrap(); }); } let runtime = Runtime::new()?; runtime.block_on(telemetry_server()).unwrap(); Ok(()) } #[derive(Serialize)] struct Telemetry { battery_level: f32, timestamp: u64, } #[derive(Deserialize, Debug)] struct Control { #[serde(rename = "Data")] data: f32, } async fn telemetry_server() -> Result<()>{ let port = 2994; let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?; while let result::Result::Ok((connection, addr)) = listener.accept().await { println!("telemetry connection from {:?}", addr); let (receiver, mut sender) = connection.into_split(); tokio::spawn(async move { let mut reader = BufReader::new(receiver).lines(); while let Some(line) = reader.next_line().await? { //println!("recv: {line}"); if let result::Result::Ok(control) = serde_json::from_str::(&line) { //println!("deserialized: data is {}", control.data); } } Ok(()) }); let _: JoinHandle> = tokio::spawn(async move { let battery = Manager::new()?.batteries()?.nth(0).context("no battery")??; let full = battery.energy_full(); loop { let time = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis(); let mock = Telemetry { battery_level: (battery.energy()/full).value * 100., timestamp: time as u64 } ; let mut send = serde_json::to_vec(&mock)?; send.push(b'\r'); send.push(b'\n'); sender.write_all(&send).await?; sleep(Duration::from_millis(20)).await; } }); } Ok(()) } async fn camera_manager(await_frame: Arc, latest_frame: Arc>>) -> Result<()>{ let cameras = nokhwa::query(ApiBackend::Auto)?; let camera = cameras.get(2).context("no cameras")?; println!("using: {}",camera.human_name()); let requested = RequestedFormat::new::(RequestedFormatType::HighestResolution(Resolution::new(320, 240))); let mut camera = Camera::new(camera.index().clone(), requested)?; camera.open_stream()?; loop { let frame = camera.frame()?; let frame: ImageBuffer, Vec> = frame.decode_image::()?; let mut output = Vec::new(); let mut encoder = JpegEncoder::new_with_quality(&mut output, 30); encoder.encode_image(&frame)?; *latest_frame.write().await = output; await_frame.notify_waiters(); } } async fn stream_video(await_frame: Arc, latest_frame: Arc>>, mut client: TcpStream) -> Result<()>{ loop { await_frame.notified().await; let data = latest_frame.read().await; println!("len: {}", data.len()); let len = data.len(); // holding data across await, likely nonissue client.write_u32(len as u32).await?; client.write(&data).await?; } } async fn server(await_frame: Arc, latest_frame: Arc>>) -> Result<()> { let port = 2993; let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?; println!("listening on {port}"); while let result::Result::Ok((client, addr)) = listener.accept().await { println!("connected to {:?}", addr); tokio::task::spawn_local(stream_video(await_frame.clone(), latest_frame.clone(), client)); } Ok(()) }