From fef8f7bce3613f8f8ae0d7b297f17468e6e38d12 Mon Sep 17 00:00:00 2001 From: Andy Killorin <37423245+Speedy6451@users.noreply.github.com> Date: Mon, 18 Dec 2023 09:39:28 -0600 Subject: [PATCH] async callbacks --- server/src/main.rs | 24 +++++++++------- server/src/turtle.rs | 65 ++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 73 insertions(+), 16 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 2563555..ac57a79 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -21,7 +21,7 @@ use tokio::sync::{ Mutex, RwLock, mpsc }; use tower::Service; -use turtle::{TurtleTask, Iota, Receiver, Sender, Turtle}; +use turtle::{TurtleTask, Iota, Receiver, Sender, Turtle, TurtleUpdate}; use crate::{blocks::Block, paths::route}; @@ -42,8 +42,6 @@ struct SavedState { struct ControlState { saved: SavedState, - turtle_senders: Vec>, - turtle_receivers: Vec> } type SharedControl = Arc>; @@ -66,10 +64,7 @@ async fn main() -> Result<(), Error> { }, }; - let state = ControlState { saved: state, turtle_senders: - Vec::new() - , turtle_receivers: - Vec::new() + let state = ControlState { saved: state, }; let state = SharedControl::new(RwLock::new(state)); @@ -81,6 +76,7 @@ async fn main() -> Result<(), Error> { .route("/turtle/:id/setGoal", post(set_goal)) .route("/turtle/:id/cancelTask", post(cancel)) .route("/turtle/:id/info", get(turtle_info)) + .route("/turtle/:id/placeUp", get(place_up)) .route("/turtle/updateAll", get(update_turtles)) .route("/flush", get(flush)) .with_state(state.clone()); @@ -115,9 +111,7 @@ async fn create_turtle( let state = &mut state.write().await; let id = state.saved.turtles.len() as u32; let (send, receive) = mpsc::channel(1); - state.turtle_senders.push(Mutex::new(send)); - state.turtle_receivers.push(Mutex::new(receive)); - state.saved.turtles.push(turtle::Turtle::new(id, req.position, req.facing, req.fuel)); + state.saved.turtles.push(turtle::Turtle::with_channel(id, req.position, req.facing, req.fuel, send,receive)); state.saved.tasks.push(VecDeque::new()); @@ -130,6 +124,16 @@ async fn create_turtle( }) } +async fn place_up( + Path(id): Path, + State(state): State, +) -> Json { + let turtle = state.write().await.saved.turtles.get(id as usize).unwrap() + .cmd(); + + Json(turtle.execute(Iota::Execute(turtle::TurtleCommand::PlaceUp)).await) +} + async fn set_goal( Path(id): Path, State(state): State, diff --git a/server/src/turtle.rs b/server/src/turtle.rs index baa8059..5268940 100644 --- a/server/src/turtle.rs +++ b/server/src/turtle.rs @@ -39,6 +39,10 @@ pub(crate) struct Turtle { pub(crate) pending_update: bool, #[serde(skip)] callback: Option>, + #[serde(skip)] + sender: Option>, + #[serde(skip)] + receiver: Option, } pub type Sender = mpsc::Sender<(Iota, oneshot::Sender)>; @@ -51,9 +55,11 @@ impl Default for Turtle { fuel: Default::default(), queued_movement: Default::default(), position: (Vec3::zeros(), Direction::North), - goal: Default::default(), + goal: None, pending_update: Default::default(), callback: None, + sender: None, + receiver: None, } } } @@ -65,11 +71,56 @@ impl Turtle { fuel, queued_movement: Vec3::new(0, 0, 0), position: (position, facing), - goal: None, pending_update: true, - callback: None, + ..Default::default() + } } + + pub fn with_channel(id: u32, position: Vec3, facing: Direction, fuel: usize, sender: Sender, receiver: Receiver) -> Self { + Self { + name: Name::from_num(id), + fuel, + queued_movement: Vec3::new(0, 0, 0), + position: (position, facing), + pending_update: true, + sender: Some(Arc::new(sender)), + receiver: Some(receiver), + ..Default::default() + + } + } + + pub fn cmd(&self) -> TurtleCommander { + TurtleCommander { sender: self.sender.as_ref().unwrap().clone() } + } + +} + +pub struct TurtleCommander { + sender: Arc, +} + +impl TurtleCommander { + pub async fn execute(&self, command: Iota) -> TurtleUpdate { + let (send, recv) = oneshot::channel::(); + + self.sender.to_owned().send((command,send)).await.unwrap(); + + recv.await.unwrap() + } + + pub async fn command(&self, command: TurtleCommand) -> TurtleUpdate { + self.execute(Iota::Execute(command)).await + } + + pub async fn goto(&self, pos: Position) -> TurtleUpdate { + self.execute(Iota::Goto(pos)).await + } + + pub async fn mine(&self, pos: Vec3) -> TurtleUpdate { + self.execute(Iota::Mine(pos)).await + } } pub(crate) async fn process_turtle_update( @@ -130,10 +181,12 @@ pub(crate) async fn process_turtle_update( send.send(update).unwrap(); } - if let Some((cmd, ret)) = state.turtle_receivers.get(id as usize).unwrap().lock().await.try_recv().ok() { - turtle.callback = Some(ret); + if let Some(recv) = turtle.receiver.as_mut() { + if let Some((cmd, ret)) = recv.try_recv().ok() { + turtle.callback = Some(ret); - turtle.goal = Some(cmd); + turtle.goal = Some(cmd); + } } if let Some(goal) = turtle.goal.take().or_else(|| tasks.front_mut().map(|t| t.next(&turtle))) {