From caef52335750c6ff47e414264d3e8551eece5df6 Mon Sep 17 00:00:00 2001 From: Andy Killorin <37423245+Speedy6451@users.noreply.github.com> Date: Sun, 24 Dec 2023 17:55:26 -0600 Subject: [PATCH] await tasks in shutdown (not proven to work) --- server/src/main.rs | 18 +++++++++++------- server/src/safe_kill.rs | 6 +----- server/src/tasks.rs | 18 ++++++++++++++++++ server/src/turtle_api.rs | 21 +++++++++++++++++++++ 4 files changed, 51 insertions(+), 12 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 1568888..16127a0 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -16,7 +16,7 @@ use rstar::RTree; use names::Name; use tasks::Scheduler; use tokio::{sync::{ - RwLock, mpsc, OnceCell, Mutex + RwLock, mpsc, OnceCell, Mutex, watch }, fs, time::Instant}; use turtle::{Turtle, TurtleCommander}; use serde::{Deserialize, Serialize}; @@ -54,7 +54,9 @@ async fn main() -> Result<(), Error> { log4rs::init_file(SAVE.get().unwrap().join("log.yml"), Default::default())?; - let state = read_from_disk().await?; + let (kill_send, kill_recv) = watch::channel(()); + + let state = read_from_disk(kill_send).await?; let state = SharedControl::new(RwLock::new(state)); @@ -67,13 +69,13 @@ async fn main() -> Result<(), Error> { let listener = tokio::net::TcpListener::bind(("0.0.0.0", *PORT.get().unwrap())) .await.unwrap(); - let server = safe_kill::serve(server, listener).await; + safe_kill::serve(server, listener, kill_recv).await; info!("writing"); write_to_disk(&*state.read().await).await?; info!("written"); - server.closed().await; + state.write().await.kill.closed().await; Ok(()) } @@ -101,7 +103,7 @@ async fn write_to_disk(state: &LiveState) -> anyhow::Result<()> { Ok(()) } -async fn read_from_disk() -> anyhow::Result { +async fn read_from_disk(kill: watch::Sender<()>) -> anyhow::Result { let turtles = match tokio::fs::OpenOptions::new() .read(true) .open(SAVE.get().unwrap().join("turtles.json")) @@ -154,7 +156,7 @@ async fn read_from_disk() -> anyhow::Result { depots, }; - let mut live = LiveState::from_save(saved, scheduler); + let mut live = LiveState::from_save(saved, scheduler, kill); for turtle in live.turtles.iter() { live.tasks.add_turtle(&TurtleCommander::new(turtle.read().await.name,&live).await.unwrap()) @@ -177,6 +179,7 @@ struct LiveState { world: blocks::World, depots: Depots, started: Instant, + kill: watch::Sender<()>, } impl LiveState { @@ -189,7 +192,7 @@ impl LiveState { SavedState { turtles, world: self.world.tree().await, depots } } - fn from_save(save: SavedState, scheduler: Scheduler) -> Self { + fn from_save(save: SavedState, scheduler: Scheduler, sender: watch::Sender<()>) -> Self { let mut turtles = Vec::new(); for turtle in save.turtles.into_iter() { let (tx, rx) = mpsc::channel(1); @@ -200,6 +203,7 @@ impl LiveState { Self { turtles: turtles.into_iter().map(|t| Arc::new(RwLock::new(t))).collect(), tasks: scheduler, world: World::from_tree(save.world), depots, started: Instant::now(), + kill:sender, } } diff --git a/server/src/safe_kill.rs b/server/src/safe_kill.rs index 511ae6e..d064f85 100644 --- a/server/src/safe_kill.rs +++ b/server/src/safe_kill.rs @@ -20,9 +20,7 @@ use tokio::sync::watch::Sender; use axum::Router; -pub(crate) async fn serve(server: Router, listener: TcpListener) -> Sender<()> { - let (close_tx, close_rx) = watch::channel(()); - +pub(crate) async fn serve(server: Router, listener: TcpListener, close_rx: watch::Receiver<()>) { loop { let (socket, _) = tokio::select! { result = listener.accept() => { @@ -69,8 +67,6 @@ pub(crate) async fn serve(server: Router, listener: TcpListener) -> Sender<()> { } drop(listener); - - close_tx } pub(crate) async fn shutdown_signal() { diff --git a/server/src/tasks.rs b/server/src/tasks.rs index ee50577..ad43663 100644 --- a/server/src/tasks.rs +++ b/server/src/tasks.rs @@ -1,5 +1,6 @@ use log::{info, trace}; use serde::{Deserialize, Serialize}; +use tokio::sync::oneshot; use tokio::task::{JoinHandle, AbortHandle}; use crate::names::Name; @@ -24,6 +25,8 @@ pub struct Scheduler { #[serde(skip)] turtles: Vec<(TurtleCommander, Option)>, tasks: Vec>, + #[serde(skip)] + shutdown: Option>, } impl Default for Scheduler { @@ -31,6 +34,7 @@ impl Default for Scheduler { Self { turtles: Vec::new(), tasks: Vec::new(), + shutdown:None, } } } @@ -63,6 +67,14 @@ impl Scheduler { } } + if self.shutdown.is_some() { + if !self.turtles.iter().any(|t| t.1.is_some()) { + self.shutdown.take().unwrap().send(()).unwrap(); + } + + return; + } + let mut free_turtles: Vec<&mut (TurtleCommander, Option)> = self.turtles.iter_mut().filter(|t| t.1.is_none()).collect(); @@ -108,4 +120,10 @@ impl Scheduler { } Some(()) } + + pub fn shutdown(&mut self) -> oneshot::Receiver<()>{ + let (send, recv) = oneshot::channel(); + self.shutdown = Some(send); + recv + } } diff --git a/server/src/turtle_api.rs b/server/src/turtle_api.rs index e31a3ff..48ec7df 100644 --- a/server/src/turtle_api.rs +++ b/server/src/turtle_api.rs @@ -50,6 +50,7 @@ pub fn turtle_api() -> Router { .route("/createMine", post(dig)) .route("/registerDepot", post(new_depot)) .route("/pollScheduler", get(poll)) + .route("/shutdown", get(shutdown)) // probably tramples the rfc .route("/updateAll", get(update_turtles)) } @@ -138,6 +139,26 @@ pub(crate) async fn poll( "ACK" } +pub(crate) async fn shutdown( + State(state): State, +) -> &'static str { + let signal = { + let mut state = state.write().await; + let signal = state.tasks.shutdown(); + state.tasks.poll().await; + signal + }; + + info!("waiting for tasks to finish"); + signal.await.unwrap(); + + let state = state.write().await; + info!("waiting for connections to finish"); + state.kill.send(()).unwrap(); + + "ACK" +} + pub(crate) async fn fell( State(state): State, Json(req): Json,