1
Fork 0

message passing

This commit is contained in:
Andy Killorin 2023-12-18 09:10:25 -06:00
parent 5f00bb9526
commit b7e33ae057
Signed by: ank
GPG key ID: B6241CA3B552BCA4
4 changed files with 78 additions and 32 deletions

View file

@ -146,6 +146,8 @@ repeat
end
end
command = nil
local ret_table = nil
if type(ret) == "boolean" then
if ret then

View file

@ -18,10 +18,10 @@ use names::Name;
use serde::{Deserialize, Serialize};
use tokio::sync::{
watch::{self},
Mutex, RwLock,
Mutex, RwLock, mpsc
};
use tower::Service;
use turtle::TurtleTask;
use turtle::{TurtleTask, Iota, Receiver, Sender, Turtle};
use crate::{blocks::Block, paths::route};
@ -33,13 +33,19 @@ mod safe_kill;
mod turtle;
#[derive(Serialize, Deserialize)]
struct ControlState {
struct SavedState {
turtles: Vec<turtle::Turtle>,
tasks: Vec<VecDeque<TurtleMineJob>>,
world: blocks::World,
//chunkloaders: unimplemented!(),
}
struct ControlState {
saved: SavedState,
turtle_senders: Vec<Mutex<Sender>>,
turtle_receivers: Vec<Mutex<Receiver>>
}
type SharedControl = Arc<RwLock<ControlState>>;
#[tokio::main]
@ -51,7 +57,7 @@ async fn main() -> Result<(), Error> {
{
tokio::io::Result::Ok(file) => serde_json::from_reader(file.into_std().await)?,
tokio::io::Result::Err(e) => match e.kind() {
ErrorKind::NotFound => ControlState {
ErrorKind::NotFound => SavedState {
turtles: Vec::new(),
world: World::new(),
tasks: Vec::new(),
@ -60,6 +66,12 @@ async fn main() -> Result<(), Error> {
},
};
let state = ControlState { saved: state, turtle_senders:
Vec::new()
, turtle_receivers:
Vec::new()
};
let state = SharedControl::new(RwLock::new(state));
let server = Router::new()
@ -67,6 +79,7 @@ async fn main() -> Result<(), Error> {
.route("/turtle/:id/update", post(command))
.route("/turtle/client.lua", get(client))
.route("/turtle/:id/setGoal", post(set_goal))
.route("/turtle/:id/cancelTask", post(cancel))
.route("/turtle/:id/info", get(turtle_info))
.route("/turtle/updateAll", get(update_turtles))
.route("/flush", get(flush))
@ -84,7 +97,7 @@ async fn main() -> Result<(), Error> {
async fn write_to_disk(state: SharedControl) -> anyhow::Result<()> {
let json = serde_json::to_string_pretty(&(*state.read().await))?;
let json = serde_json::to_string_pretty(&state.read().await.saved)?;
tokio::fs::write("state.json", json).await?;
Ok(())
}
@ -100,13 +113,15 @@ async fn create_turtle(
Json(req): Json<turtle::TurtleRegister>,
) -> Json<turtle::TurtleResponse> {
let state = &mut state.write().await;
let turtles = &mut state.turtles;
let id = turtles.len() as u32;
turtles.push(turtle::Turtle::new(id, req.position, req.facing, req.fuel));
state.tasks.push(VecDeque::new());
let id = state.saved.turtles.len() as u32;
let (send, receive) = mpsc::channel(1);
state.turtle_senders.push(Mutex::new(send));
state.turtle_receivers.push(Mutex::new(receive));
state.saved.turtles.push(turtle::Turtle::new(id, req.position, req.facing, req.fuel));
state.saved.tasks.push(VecDeque::new());
println!("turt {id}");
println!("new turtle: {id}");
Json(turtle::TurtleResponse {
name: Name::from_num(id).to_str(),
@ -120,18 +135,27 @@ async fn set_goal(
State(state): State<SharedControl>,
Json(req): Json<Position>,
) -> &'static str {
state.write().await.tasks[id as usize].push_back(
state.write().await.saved.tasks[id as usize].push_back(
TurtleMineJob::chunk(req.0)
);
"ACK"
}
async fn cancel(
Path(id): Path<u32>,
State(state): State<SharedControl>,
) -> &'static str {
state.write().await.saved.tasks[id as usize].pop_front();
"ACK"
}
async fn update_turtles(State(state): State<SharedControl>) -> &'static str {
state
.write()
.await
.turtles
.saved.turtles
.iter_mut()
.for_each(|t| t.pending_update = true);
"ACK"
@ -142,16 +166,14 @@ async fn turtle_info(
State(state): State<SharedControl>,
) -> Json<turtle::Turtle> {
let state = &mut state.read().await;
let turtle = &state.turtles[id as usize];
let turtle = &state.saved.turtles[id as usize];
let cloned = turtle::Turtle {
name: turtle.name.clone(),
fuel: turtle.fuel,
queued_movement: turtle.queued_movement.clone(),
position: turtle.position.clone(),
goal: turtle.goal.clone(),
pending_update: turtle.pending_update,
};
let cloned = Turtle::new(
turtle.name.to_num(),
turtle.position.to_owned().0,
turtle.position.to_owned().1,
turtle.fuel
);
Json(cloned)
}
@ -165,12 +187,12 @@ async fn command(
println!("{:?}", &req);
if id as usize > state.turtles.len() {
if id as usize > state.saved.turtles.len() {
return Json(turtle::TurtleCommand::Update);
}
Json(
turtle::process_turtle_update(id, &mut state, req).unwrap_or(turtle::TurtleCommand::Update),
turtle::process_turtle_update(id, &mut state, req).await.unwrap_or(turtle::TurtleCommand::Update),
)
}

View file

@ -58,12 +58,13 @@ fn next(from: &Position, world: &World) -> Vec<(Position, u32)> {
}
/// Blocks that are fine to tunnel through
const GARBAGE: [&str; 5] = [
const GARBAGE: [&str; 6] = [
"minecraft:stone",
"minecraft:dirt",
"minecraft:andesite",
"minecraft:sand",
"minecraft:gravel",
"minecraft:sandstone",
];
/// time taken to go through uncharted territory (in turtle. calls)

View file

@ -11,10 +11,13 @@ use anyhow::Ok;
use anyhow;
use anyhow::Context;
use tokio::sync::RwLock;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use super::ControlState;
use std::collections::VecDeque;
use std::future::Ready;
use std::sync::Arc;
@ -34,8 +37,13 @@ pub(crate) struct Turtle {
pub(crate) position: Position,
pub(crate) goal: Option<Iota>,
pub(crate) pending_update: bool,
#[serde(skip)]
callback: Option<oneshot::Sender<TurtleUpdate>>,
}
pub type Sender = mpsc::Sender<(Iota, oneshot::Sender<TurtleUpdate>)>;
pub type Receiver = mpsc::Receiver<(Iota, oneshot::Sender<TurtleUpdate>)>;
impl Default for Turtle {
fn default() -> Self {
Self {
@ -45,6 +53,7 @@ impl Default for Turtle {
position: (Vec3::zeros(), Direction::North),
goal: Default::default(),
pending_update: Default::default(),
callback: None,
}
}
}
@ -58,24 +67,25 @@ impl Turtle {
position: (position, facing),
goal: None,
pending_update: true,
callback: None,
}
}
}
pub(crate) fn process_turtle_update(
pub(crate) async fn process_turtle_update(
id: u32,
state: &mut ControlState,
update: TurtleUpdate,
) -> anyhow::Result<TurtleCommand> {
let turtle = state
.turtles
.saved.turtles
.get_mut(id as usize)
.context("nonexisting turtle")?;
let tasks = state
.tasks
.saved.tasks
.get_mut(id as usize)
.context("state gone?????").unwrap();
let world = &mut state.world;
let world = &mut state.saved.world;
if turtle.pending_update {
turtle.pending_update = false;
@ -90,21 +100,21 @@ pub(crate) fn process_turtle_update(
}
let above = Block {
name: update.above,
name: update.above.clone(),
pos: turtle.position.0 + Vec3::y(),
};
world.remove_at_point(&above.pos.into());
world.insert(above.clone());
let ahead = Block {
name: update.ahead,
name: update.ahead.clone(),
pos: turtle.position.0 + turtle.position.1.clone().unit(),
};
world.remove_at_point(&ahead.pos.into());
world.insert(ahead.clone());
let below = Block {
name: update.below,
name: update.below.clone(),
pos: turtle.position.0 - Vec3::y(),
};
world.remove_at_point(&below.pos.into());
@ -116,13 +126,24 @@ pub(crate) fn process_turtle_update(
task.handle_block(ahead);
}
if let Some(goal) = tasks.front_mut().map(|t| t.next(&turtle)) {
if let Some(send) = turtle.callback.take() {
send.send(update).unwrap();
}
if let Some((cmd, ret)) = state.turtle_receivers.get(id as usize).unwrap().lock().await.try_recv().ok() {
turtle.callback = Some(ret);
turtle.goal = Some(cmd);
}
if let Some(goal) = turtle.goal.take().or_else(|| tasks.front_mut().map(|t| t.next(&turtle))) {
let command = match goal {
Iota::End => {
tasks.pop_front();
TurtleCommand::Wait(0) // TODO: fix
},
Iota::Goto(pos) => {
println!("gogto: {:?}", pos);
pathstep(turtle, world, pos).unwrap()
},
Iota::Mine(pos) => {