1
Fork 0

fixed shutdown

This commit is contained in:
Andy Killorin 2023-12-25 14:09:43 -06:00
parent 1b60bd74d7
commit 77f1547428
Signed by: ank
GPG key ID: B6241CA3B552BCA4
6 changed files with 32 additions and 10 deletions

View file

@ -26,6 +26,7 @@ impl Depots {
.map(|d| d) .map(|d| d)
} }
#[tracing::instrument(skip(self))]
pub async fn dock(&self, turtle: TurtleCommander) -> Option<usize> { pub async fn dock(&self, turtle: TurtleCommander) -> Option<usize> {
let depot = self.clone().nearest(turtle.pos().await).await?; let depot = self.clone().nearest(turtle.pos().await).await?;
trace!("depot at {depot:?}"); trace!("depot at {depot:?}");

View file

@ -68,12 +68,15 @@ async fn main() -> Result<(), Error> {
.with_target("server::tasks", Level::TRACE) .with_target("server::tasks", Level::TRACE)
.with_target("server::turtle", Level::ERROR) .with_target("server::turtle", Level::ERROR)
.with_target("server::turtle_api", Level::INFO) .with_target("server::turtle_api", Level::INFO)
.with_target("server::fell", Level::INFO); .with_target("server::fell", Level::INFO)
.with_target("server::mine", Level::INFO)
.with_target("server::depot", Level::TRACE);
let subscriber = tracing_subscriber::fmt::layer() let subscriber = tracing_subscriber::fmt::layer()
.compact() .compact()
.with_file(false) .with_file(false)
.with_target(true) .with_target(true)
.with_span_events(FmtSpan::ACTIVE)
.with_filter(filter); .with_filter(filter);
tracing_subscriber::registry() tracing_subscriber::registry()
@ -83,7 +86,7 @@ async fn main() -> Result<(), Error> {
info!("starting"); info!("starting");
let (kill_send, kill_recv) = watch::channel(()); let (kill_send, kill_recv) = watch::channel(false);
let state = read_from_disk(kill_send).await?; let state = read_from_disk(kill_send).await?;
@ -133,7 +136,7 @@ async fn write_to_disk(state: &LiveState) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
async fn read_from_disk(kill: watch::Sender<()>) -> anyhow::Result<LiveState> { async fn read_from_disk(kill: watch::Sender<bool>) -> anyhow::Result<LiveState> {
let turtles = match tokio::fs::OpenOptions::new() let turtles = match tokio::fs::OpenOptions::new()
.read(true) .read(true)
.open(SAVE.get().unwrap().join("turtles.json")) .open(SAVE.get().unwrap().join("turtles.json"))
@ -205,7 +208,7 @@ struct LiveState {
world: blocks::World, world: blocks::World,
depots: Depots, depots: Depots,
started: Instant, started: Instant,
kill: watch::Sender<()>, kill: watch::Sender<bool>,
} }
impl LiveState { impl LiveState {
@ -218,7 +221,7 @@ impl LiveState {
SavedState { turtles, world: self.world.tree().await, depots } SavedState { turtles, world: self.world.tree().await, depots }
} }
fn from_save(save: SavedState, scheduler: Scheduler, sender: watch::Sender<()>) -> Self { fn from_save(save: SavedState, scheduler: Scheduler, sender: watch::Sender<bool>) -> Self {
let mut turtles = Vec::new(); let mut turtles = Vec::new();
for turtle in save.turtles.into_iter() { for turtle in save.turtles.into_iter() {
let (tx, rx) = mpsc::channel(1); let (tx, rx) = mpsc::channel(1);

View file

@ -1,6 +1,6 @@
use std::sync::{Arc, atomic::{AtomicUsize, Ordering, AtomicI32}}; use std::sync::{Arc, atomic::{AtomicUsize, Ordering, AtomicI32}};
use tracing::{info, warn, error}; use tracing::{info, warn, error, instrument};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use tokio::task::{JoinHandle, AbortHandle}; use tokio::task::{JoinHandle, AbortHandle};
use typetag::serde; use typetag::serde;
@ -33,6 +33,7 @@ pub async fn mine(turtle: TurtleCommander, pos: Vec3, chunk: Vec3) -> Option<()>
} }
} }
#[instrument]
pub async fn mine_chunk_and_sweep(turtle: TurtleCommander, pos: Vec3, chunk: Vec3) -> Option<()> { pub async fn mine_chunk_and_sweep(turtle: TurtleCommander, pos: Vec3, chunk: Vec3) -> Option<()> {
let volume = chunk.x * chunk.y * chunk.z; let volume = chunk.x * chunk.y * chunk.z;
let mut valuables = Vec::new(); let mut valuables = Vec::new();
@ -77,6 +78,7 @@ async fn near_valuables(turtle: &TurtleCommander, pos: Vec3, chunk: Vec3) -> Vec
.map(|b|b.pos).collect() .map(|b|b.pos).collect()
} }
#[instrument]
pub async fn mine_chunk(turtle: TurtleCommander, pos: Vec3, chunk: Vec3) -> Option<()> { pub async fn mine_chunk(turtle: TurtleCommander, pos: Vec3, chunk: Vec3) -> Option<()> {
let turtle = turtle.clone(); let turtle = turtle.clone();
let volume = chunk.x * chunk.y * chunk.z; let volume = chunk.x * chunk.y * chunk.z;

View file

@ -20,7 +20,7 @@ use tokio::sync::watch::Sender;
use axum::Router; use axum::Router;
pub(crate) async fn serve(server: Router, listener: TcpListener, close_rx: watch::Receiver<()>) { pub(crate) async fn serve(server: Router, listener: TcpListener, mut close_rx: watch::Receiver<bool>) {
loop { loop {
let (socket, _) = tokio::select! { let (socket, _) = tokio::select! {
result = listener.accept() => { result = listener.accept() => {
@ -30,10 +30,14 @@ pub(crate) async fn serve(server: Router, listener: TcpListener, close_rx: watch
info!("cancelled connection"); info!("cancelled connection");
break; break;
} }
_ = close_rx.wait_for(|k| *k) => {
info!("cancelled connection");
break;
}
}; };
let tower = server.clone(); let tower = server.clone();
let close_rx = close_rx.clone(); let mut close_rx = close_rx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let socket = TokioIo::new(socket); let socket = TokioIo::new(socket);
@ -59,6 +63,10 @@ pub(crate) async fn serve(server: Router, listener: TcpListener, close_rx: watch
info!("starting shutdown"); info!("starting shutdown");
conn.as_mut().graceful_shutdown(); conn.as_mut().graceful_shutdown();
} }
_ = close_rx.wait_for(|k| *k) => {
info!("starting shutdown");
conn.as_mut().graceful_shutdown();
}
} }
} }

View file

@ -1,4 +1,4 @@
use tracing::{info, trace}; use tracing::{info, trace, instrument};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::task::{JoinHandle, AbortHandle}; use tokio::task::{JoinHandle, AbortHandle};
@ -58,6 +58,7 @@ impl Scheduler {
self.tasks.push(task); self.tasks.push(task);
} }
#[instrument(skip(self))]
pub async fn poll(&mut self) { pub async fn poll(&mut self) {
for turtle in &mut self.turtles { for turtle in &mut self.turtles {
if let Some(join) = &turtle.1 { if let Some(join) = &turtle.1 {
@ -69,10 +70,16 @@ impl Scheduler {
} }
if self.shutdown.is_some() { if self.shutdown.is_some() {
trace!("checking remaining tasks");
if !self.turtles.iter().any(|t| t.1.is_some()) { if !self.turtles.iter().any(|t| t.1.is_some()) {
trace!("all tasks complete");
self.shutdown.take().unwrap().send(()).unwrap(); self.shutdown.take().unwrap().send(()).unwrap();
} }
for turtle in self.turtles.iter().filter(|t| t.1.is_some()) {
trace!("waiting on {}", turtle.0.name().to_str());
}
return; return;
} }

View file

@ -152,9 +152,10 @@ pub(crate) async fn shutdown(
info!("waiting for tasks to finish"); info!("waiting for tasks to finish");
signal.await.unwrap(); signal.await.unwrap();
info!("waiting for lock");
let state = state.write().await; let state = state.write().await;
info!("waiting for connections to finish"); info!("waiting for connections to finish");
state.kill.send(()).unwrap(); state.kill.send(true).unwrap();
"ACK" "ACK"
} }