1
Fork 0

async callbacks

This commit is contained in:
Andy Killorin 2023-12-18 09:39:28 -06:00
parent b7e33ae057
commit fef8f7bce3
Signed by: ank
GPG key ID: B6241CA3B552BCA4
2 changed files with 73 additions and 16 deletions

View file

@ -21,7 +21,7 @@ use tokio::sync::{
Mutex, RwLock, mpsc
};
use tower::Service;
use turtle::{TurtleTask, Iota, Receiver, Sender, Turtle};
use turtle::{TurtleTask, Iota, Receiver, Sender, Turtle, TurtleUpdate};
use crate::{blocks::Block, paths::route};
@ -42,8 +42,6 @@ struct SavedState {
struct ControlState {
saved: SavedState,
turtle_senders: Vec<Mutex<Sender>>,
turtle_receivers: Vec<Mutex<Receiver>>
}
type SharedControl = Arc<RwLock<ControlState>>;
@ -66,10 +64,7 @@ async fn main() -> Result<(), Error> {
},
};
let state = ControlState { saved: state, turtle_senders:
Vec::new()
, turtle_receivers:
Vec::new()
let state = ControlState { saved: state,
};
let state = SharedControl::new(RwLock::new(state));
@ -81,6 +76,7 @@ async fn main() -> Result<(), Error> {
.route("/turtle/:id/setGoal", post(set_goal))
.route("/turtle/:id/cancelTask", post(cancel))
.route("/turtle/:id/info", get(turtle_info))
.route("/turtle/:id/placeUp", get(place_up))
.route("/turtle/updateAll", get(update_turtles))
.route("/flush", get(flush))
.with_state(state.clone());
@ -115,9 +111,7 @@ async fn create_turtle(
let state = &mut state.write().await;
let id = state.saved.turtles.len() as u32;
let (send, receive) = mpsc::channel(1);
state.turtle_senders.push(Mutex::new(send));
state.turtle_receivers.push(Mutex::new(receive));
state.saved.turtles.push(turtle::Turtle::new(id, req.position, req.facing, req.fuel));
state.saved.turtles.push(turtle::Turtle::with_channel(id, req.position, req.facing, req.fuel, send,receive));
state.saved.tasks.push(VecDeque::new());
@ -130,6 +124,16 @@ async fn create_turtle(
})
}
async fn place_up(
Path(id): Path<u32>,
State(state): State<SharedControl>,
) -> Json<TurtleUpdate> {
let turtle = state.write().await.saved.turtles.get(id as usize).unwrap()
.cmd();
Json(turtle.execute(Iota::Execute(turtle::TurtleCommand::PlaceUp)).await)
}
async fn set_goal(
Path(id): Path<u32>,
State(state): State<SharedControl>,

View file

@ -39,6 +39,10 @@ pub(crate) struct Turtle {
pub(crate) pending_update: bool,
#[serde(skip)]
callback: Option<oneshot::Sender<TurtleUpdate>>,
#[serde(skip)]
sender: Option<Arc<Sender>>,
#[serde(skip)]
receiver: Option<Receiver>,
}
pub type Sender = mpsc::Sender<(Iota, oneshot::Sender<TurtleUpdate>)>;
@ -51,9 +55,11 @@ impl Default for Turtle {
fuel: Default::default(),
queued_movement: Default::default(),
position: (Vec3::zeros(), Direction::North),
goal: Default::default(),
goal: None,
pending_update: Default::default(),
callback: None,
sender: None,
receiver: None,
}
}
}
@ -65,11 +71,56 @@ impl Turtle {
fuel,
queued_movement: Vec3::new(0, 0, 0),
position: (position, facing),
goal: None,
pending_update: true,
callback: None,
..Default::default()
}
}
pub fn with_channel(id: u32, position: Vec3, facing: Direction, fuel: usize, sender: Sender, receiver: Receiver) -> Self {
Self {
name: Name::from_num(id),
fuel,
queued_movement: Vec3::new(0, 0, 0),
position: (position, facing),
pending_update: true,
sender: Some(Arc::new(sender)),
receiver: Some(receiver),
..Default::default()
}
}
pub fn cmd(&self) -> TurtleCommander {
TurtleCommander { sender: self.sender.as_ref().unwrap().clone() }
}
}
pub struct TurtleCommander {
sender: Arc<Sender>,
}
impl TurtleCommander {
pub async fn execute(&self, command: Iota) -> TurtleUpdate {
let (send, recv) = oneshot::channel::<TurtleUpdate>();
self.sender.to_owned().send((command,send)).await.unwrap();
recv.await.unwrap()
}
pub async fn command(&self, command: TurtleCommand) -> TurtleUpdate {
self.execute(Iota::Execute(command)).await
}
pub async fn goto(&self, pos: Position) -> TurtleUpdate {
self.execute(Iota::Goto(pos)).await
}
pub async fn mine(&self, pos: Vec3) -> TurtleUpdate {
self.execute(Iota::Mine(pos)).await
}
}
pub(crate) async fn process_turtle_update(
@ -130,10 +181,12 @@ pub(crate) async fn process_turtle_update(
send.send(update).unwrap();
}
if let Some((cmd, ret)) = state.turtle_receivers.get(id as usize).unwrap().lock().await.try_recv().ok() {
turtle.callback = Some(ret);
if let Some(recv) = turtle.receiver.as_mut() {
if let Some((cmd, ret)) = recv.try_recv().ok() {
turtle.callback = Some(ret);
turtle.goal = Some(cmd);
turtle.goal = Some(cmd);
}
}
if let Some(goal) = turtle.goal.take().or_else(|| tasks.front_mut().map(|t| t.next(&turtle))) {