diff --git a/server/src/depot.rs b/server/src/depot.rs index 5d3b1ff..74a0f7d 100644 --- a/server/src/depot.rs +++ b/server/src/depot.rs @@ -26,6 +26,7 @@ impl Depots { .map(|d| d) } + #[tracing::instrument(skip(self))] pub async fn dock(&self, turtle: TurtleCommander) -> Option { let depot = self.clone().nearest(turtle.pos().await).await?; trace!("depot at {depot:?}"); diff --git a/server/src/main.rs b/server/src/main.rs index cb6a9da..4077fe0 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -68,12 +68,15 @@ async fn main() -> Result<(), Error> { .with_target("server::tasks", Level::TRACE) .with_target("server::turtle", Level::ERROR) .with_target("server::turtle_api", Level::INFO) - .with_target("server::fell", Level::INFO); + .with_target("server::fell", Level::INFO) + .with_target("server::mine", Level::INFO) + .with_target("server::depot", Level::TRACE); let subscriber = tracing_subscriber::fmt::layer() .compact() .with_file(false) .with_target(true) + .with_span_events(FmtSpan::ACTIVE) .with_filter(filter); tracing_subscriber::registry() @@ -83,7 +86,7 @@ async fn main() -> Result<(), Error> { info!("starting"); - let (kill_send, kill_recv) = watch::channel(()); + let (kill_send, kill_recv) = watch::channel(false); let state = read_from_disk(kill_send).await?; @@ -133,7 +136,7 @@ async fn write_to_disk(state: &LiveState) -> anyhow::Result<()> { Ok(()) } -async fn read_from_disk(kill: watch::Sender<()>) -> anyhow::Result { +async fn read_from_disk(kill: watch::Sender) -> anyhow::Result { let turtles = match tokio::fs::OpenOptions::new() .read(true) .open(SAVE.get().unwrap().join("turtles.json")) @@ -205,7 +208,7 @@ struct LiveState { world: blocks::World, depots: Depots, started: Instant, - kill: watch::Sender<()>, + kill: watch::Sender, } impl LiveState { @@ -218,7 +221,7 @@ impl LiveState { SavedState { turtles, world: self.world.tree().await, depots } } - fn from_save(save: SavedState, scheduler: Scheduler, sender: watch::Sender<()>) -> Self { + fn from_save(save: SavedState, scheduler: Scheduler, sender: watch::Sender) -> Self { let mut turtles = Vec::new(); for turtle in save.turtles.into_iter() { let (tx, rx) = mpsc::channel(1); diff --git a/server/src/mine.rs b/server/src/mine.rs index 6b40e6f..223a4b0 100644 --- a/server/src/mine.rs +++ b/server/src/mine.rs @@ -1,6 +1,6 @@ use std::sync::{Arc, atomic::{AtomicUsize, Ordering, AtomicI32}}; -use tracing::{info, warn, error}; +use tracing::{info, warn, error, instrument}; use serde::{Serialize, Deserialize}; use tokio::task::{JoinHandle, AbortHandle}; use typetag::serde; @@ -33,6 +33,7 @@ pub async fn mine(turtle: TurtleCommander, pos: Vec3, chunk: Vec3) -> Option<()> } } +#[instrument] pub async fn mine_chunk_and_sweep(turtle: TurtleCommander, pos: Vec3, chunk: Vec3) -> Option<()> { let volume = chunk.x * chunk.y * chunk.z; let mut valuables = Vec::new(); @@ -77,6 +78,7 @@ async fn near_valuables(turtle: &TurtleCommander, pos: Vec3, chunk: Vec3) -> Vec .map(|b|b.pos).collect() } +#[instrument] pub async fn mine_chunk(turtle: TurtleCommander, pos: Vec3, chunk: Vec3) -> Option<()> { let turtle = turtle.clone(); let volume = chunk.x * chunk.y * chunk.z; diff --git a/server/src/safe_kill.rs b/server/src/safe_kill.rs index fdc21dd..f8f5c97 100644 --- a/server/src/safe_kill.rs +++ b/server/src/safe_kill.rs @@ -20,7 +20,7 @@ use tokio::sync::watch::Sender; use axum::Router; -pub(crate) async fn serve(server: Router, listener: TcpListener, close_rx: watch::Receiver<()>) { +pub(crate) async fn serve(server: Router, listener: TcpListener, mut close_rx: watch::Receiver) { loop { let (socket, _) = tokio::select! { result = listener.accept() => { @@ -30,10 +30,14 @@ pub(crate) async fn serve(server: Router, listener: TcpListener, close_rx: watch info!("cancelled connection"); break; } + _ = close_rx.wait_for(|k| *k) => { + info!("cancelled connection"); + break; + } }; let tower = server.clone(); - let close_rx = close_rx.clone(); + let mut close_rx = close_rx.clone(); tokio::spawn(async move { let socket = TokioIo::new(socket); @@ -59,6 +63,10 @@ pub(crate) async fn serve(server: Router, listener: TcpListener, close_rx: watch info!("starting shutdown"); conn.as_mut().graceful_shutdown(); } + _ = close_rx.wait_for(|k| *k) => { + info!("starting shutdown"); + conn.as_mut().graceful_shutdown(); + } } } diff --git a/server/src/tasks.rs b/server/src/tasks.rs index 030f39e..489b840 100644 --- a/server/src/tasks.rs +++ b/server/src/tasks.rs @@ -1,4 +1,4 @@ -use tracing::{info, trace}; +use tracing::{info, trace, instrument}; use serde::{Deserialize, Serialize}; use tokio::sync::oneshot; use tokio::task::{JoinHandle, AbortHandle}; @@ -58,6 +58,7 @@ impl Scheduler { self.tasks.push(task); } + #[instrument(skip(self))] pub async fn poll(&mut self) { for turtle in &mut self.turtles { if let Some(join) = &turtle.1 { @@ -69,10 +70,16 @@ impl Scheduler { } if self.shutdown.is_some() { + trace!("checking remaining tasks"); if !self.turtles.iter().any(|t| t.1.is_some()) { + trace!("all tasks complete"); self.shutdown.take().unwrap().send(()).unwrap(); } + for turtle in self.turtles.iter().filter(|t| t.1.is_some()) { + trace!("waiting on {}", turtle.0.name().to_str()); + } + return; } diff --git a/server/src/turtle_api.rs b/server/src/turtle_api.rs index db6862b..a146a8e 100644 --- a/server/src/turtle_api.rs +++ b/server/src/turtle_api.rs @@ -152,9 +152,10 @@ pub(crate) async fn shutdown( info!("waiting for tasks to finish"); signal.await.unwrap(); + info!("waiting for lock"); let state = state.write().await; info!("waiting for connections to finish"); - state.kill.send(()).unwrap(); + state.kill.send(true).unwrap(); "ACK" }