From b7e33ae057c2d88a614b8f3aeedeacd121eb066b Mon Sep 17 00:00:00 2001 From: Andy Killorin <37423245+Speedy6451@users.noreply.github.com> Date: Mon, 18 Dec 2023 09:10:25 -0600 Subject: [PATCH] message passing --- client/client.lua | 2 ++ server/src/main.rs | 68 +++++++++++++++++++++++++++++--------------- server/src/paths.rs | 3 +- server/src/turtle.rs | 37 ++++++++++++++++++------ 4 files changed, 78 insertions(+), 32 deletions(-) diff --git a/client/client.lua b/client/client.lua index 5a99677..e001fff 100644 --- a/client/client.lua +++ b/client/client.lua @@ -146,6 +146,8 @@ repeat end end + command = nil + local ret_table = nil if type(ret) == "boolean" then if ret then diff --git a/server/src/main.rs b/server/src/main.rs index 0433702..2563555 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -18,10 +18,10 @@ use names::Name; use serde::{Deserialize, Serialize}; use tokio::sync::{ watch::{self}, - Mutex, RwLock, + Mutex, RwLock, mpsc }; use tower::Service; -use turtle::TurtleTask; +use turtle::{TurtleTask, Iota, Receiver, Sender, Turtle}; use crate::{blocks::Block, paths::route}; @@ -33,13 +33,19 @@ mod safe_kill; mod turtle; #[derive(Serialize, Deserialize)] -struct ControlState { +struct SavedState { turtles: Vec, tasks: Vec>, world: blocks::World, //chunkloaders: unimplemented!(), } +struct ControlState { + saved: SavedState, + turtle_senders: Vec>, + turtle_receivers: Vec> +} + type SharedControl = Arc>; #[tokio::main] @@ -51,7 +57,7 @@ async fn main() -> Result<(), Error> { { tokio::io::Result::Ok(file) => serde_json::from_reader(file.into_std().await)?, tokio::io::Result::Err(e) => match e.kind() { - ErrorKind::NotFound => ControlState { + ErrorKind::NotFound => SavedState { turtles: Vec::new(), world: World::new(), tasks: Vec::new(), @@ -60,6 +66,12 @@ async fn main() -> Result<(), Error> { }, }; + let state = ControlState { saved: state, turtle_senders: + Vec::new() + , turtle_receivers: + Vec::new() + }; + let state = SharedControl::new(RwLock::new(state)); let server = Router::new() @@ -67,6 +79,7 @@ async fn main() -> Result<(), Error> { .route("/turtle/:id/update", post(command)) .route("/turtle/client.lua", get(client)) .route("/turtle/:id/setGoal", post(set_goal)) + .route("/turtle/:id/cancelTask", post(cancel)) .route("/turtle/:id/info", get(turtle_info)) .route("/turtle/updateAll", get(update_turtles)) .route("/flush", get(flush)) @@ -84,7 +97,7 @@ async fn main() -> Result<(), Error> { async fn write_to_disk(state: SharedControl) -> anyhow::Result<()> { - let json = serde_json::to_string_pretty(&(*state.read().await))?; + let json = serde_json::to_string_pretty(&state.read().await.saved)?; tokio::fs::write("state.json", json).await?; Ok(()) } @@ -100,13 +113,15 @@ async fn create_turtle( Json(req): Json, ) -> Json { let state = &mut state.write().await; - let turtles = &mut state.turtles; - let id = turtles.len() as u32; - turtles.push(turtle::Turtle::new(id, req.position, req.facing, req.fuel)); - state.tasks.push(VecDeque::new()); + let id = state.saved.turtles.len() as u32; + let (send, receive) = mpsc::channel(1); + state.turtle_senders.push(Mutex::new(send)); + state.turtle_receivers.push(Mutex::new(receive)); + state.saved.turtles.push(turtle::Turtle::new(id, req.position, req.facing, req.fuel)); + state.saved.tasks.push(VecDeque::new()); - println!("turt {id}"); + println!("new turtle: {id}"); Json(turtle::TurtleResponse { name: Name::from_num(id).to_str(), @@ -120,18 +135,27 @@ async fn set_goal( State(state): State, Json(req): Json, ) -> &'static str { - state.write().await.tasks[id as usize].push_back( + state.write().await.saved.tasks[id as usize].push_back( TurtleMineJob::chunk(req.0) ); "ACK" } +async fn cancel( + Path(id): Path, + State(state): State, +) -> &'static str { + state.write().await.saved.tasks[id as usize].pop_front(); + + "ACK" +} + async fn update_turtles(State(state): State) -> &'static str { state .write() .await - .turtles + .saved.turtles .iter_mut() .for_each(|t| t.pending_update = true); "ACK" @@ -142,16 +166,14 @@ async fn turtle_info( State(state): State, ) -> Json { let state = &mut state.read().await; - let turtle = &state.turtles[id as usize]; + let turtle = &state.saved.turtles[id as usize]; - let cloned = turtle::Turtle { - name: turtle.name.clone(), - fuel: turtle.fuel, - queued_movement: turtle.queued_movement.clone(), - position: turtle.position.clone(), - goal: turtle.goal.clone(), - pending_update: turtle.pending_update, - }; + let cloned = Turtle::new( + turtle.name.to_num(), + turtle.position.to_owned().0, + turtle.position.to_owned().1, + turtle.fuel + ); Json(cloned) } @@ -165,12 +187,12 @@ async fn command( println!("{:?}", &req); - if id as usize > state.turtles.len() { + if id as usize > state.saved.turtles.len() { return Json(turtle::TurtleCommand::Update); } Json( - turtle::process_turtle_update(id, &mut state, req).unwrap_or(turtle::TurtleCommand::Update), + turtle::process_turtle_update(id, &mut state, req).await.unwrap_or(turtle::TurtleCommand::Update), ) } diff --git a/server/src/paths.rs b/server/src/paths.rs index 494205b..8fed9ff 100644 --- a/server/src/paths.rs +++ b/server/src/paths.rs @@ -58,12 +58,13 @@ fn next(from: &Position, world: &World) -> Vec<(Position, u32)> { } /// Blocks that are fine to tunnel through -const GARBAGE: [&str; 5] = [ +const GARBAGE: [&str; 6] = [ "minecraft:stone", "minecraft:dirt", "minecraft:andesite", "minecraft:sand", "minecraft:gravel", + "minecraft:sandstone", ]; /// time taken to go through uncharted territory (in turtle. calls) diff --git a/server/src/turtle.rs b/server/src/turtle.rs index b2806ad..baa8059 100644 --- a/server/src/turtle.rs +++ b/server/src/turtle.rs @@ -11,10 +11,13 @@ use anyhow::Ok; use anyhow; use anyhow::Context; use tokio::sync::RwLock; +use tokio::sync::mpsc; +use tokio::sync::oneshot; use super::ControlState; use std::collections::VecDeque; +use std::future::Ready; use std::sync::Arc; @@ -34,8 +37,13 @@ pub(crate) struct Turtle { pub(crate) position: Position, pub(crate) goal: Option, pub(crate) pending_update: bool, + #[serde(skip)] + callback: Option>, } +pub type Sender = mpsc::Sender<(Iota, oneshot::Sender)>; +pub type Receiver = mpsc::Receiver<(Iota, oneshot::Sender)>; + impl Default for Turtle { fn default() -> Self { Self { @@ -45,6 +53,7 @@ impl Default for Turtle { position: (Vec3::zeros(), Direction::North), goal: Default::default(), pending_update: Default::default(), + callback: None, } } } @@ -58,24 +67,25 @@ impl Turtle { position: (position, facing), goal: None, pending_update: true, + callback: None, } } } -pub(crate) fn process_turtle_update( +pub(crate) async fn process_turtle_update( id: u32, state: &mut ControlState, update: TurtleUpdate, ) -> anyhow::Result { let turtle = state - .turtles + .saved.turtles .get_mut(id as usize) .context("nonexisting turtle")?; let tasks = state - .tasks + .saved.tasks .get_mut(id as usize) .context("state gone?????").unwrap(); - let world = &mut state.world; + let world = &mut state.saved.world; if turtle.pending_update { turtle.pending_update = false; @@ -90,21 +100,21 @@ pub(crate) fn process_turtle_update( } let above = Block { - name: update.above, + name: update.above.clone(), pos: turtle.position.0 + Vec3::y(), }; world.remove_at_point(&above.pos.into()); world.insert(above.clone()); let ahead = Block { - name: update.ahead, + name: update.ahead.clone(), pos: turtle.position.0 + turtle.position.1.clone().unit(), }; world.remove_at_point(&ahead.pos.into()); world.insert(ahead.clone()); let below = Block { - name: update.below, + name: update.below.clone(), pos: turtle.position.0 - Vec3::y(), }; world.remove_at_point(&below.pos.into()); @@ -116,13 +126,24 @@ pub(crate) fn process_turtle_update( task.handle_block(ahead); } - if let Some(goal) = tasks.front_mut().map(|t| t.next(&turtle)) { + if let Some(send) = turtle.callback.take() { + send.send(update).unwrap(); + } + + if let Some((cmd, ret)) = state.turtle_receivers.get(id as usize).unwrap().lock().await.try_recv().ok() { + turtle.callback = Some(ret); + + turtle.goal = Some(cmd); + } + + if let Some(goal) = turtle.goal.take().or_else(|| tasks.front_mut().map(|t| t.next(&turtle))) { let command = match goal { Iota::End => { tasks.pop_front(); TurtleCommand::Wait(0) // TODO: fix }, Iota::Goto(pos) => { + println!("gogto: {:?}", pos); pathstep(turtle, world, pos).unwrap() }, Iota::Mine(pos) => {