From 1d98390c8dd15e23b1bd896258fd51152007cd45 Mon Sep 17 00:00:00 2001 From: Andy Killorin <37423245+Speedy6451@users.noreply.github.com> Date: Thu, 28 Dec 2023 00:28:24 -0600 Subject: [PATCH] synchronous chunked task --- server/Cargo.toml | 1 + server/src/mine.rs | 204 +++++++++++++++++++++++++-------------------- 2 files changed, 114 insertions(+), 91 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 1f517ae..76b24a6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -43,3 +43,4 @@ opentelemetry_sdk = { version = "0.21.1", features = ["trace"] } memoize = "0.4.2" tracing-appender = "0.2.3" ron = "0.8.1" +crossbeam = "0.8.3" diff --git a/server/src/mine.rs b/server/src/mine.rs index 771819a..1f26c97 100644 --- a/server/src/mine.rs +++ b/server/src/mine.rs @@ -1,5 +1,6 @@ -use std::sync::{Arc, atomic::{AtomicUsize, Ordering, AtomicI32}}; +use std::{sync::{Arc, atomic::{AtomicUsize, Ordering, AtomicI32}}, ops::Deref}; +use crossbeam::channel::{Sender, Receiver}; use tracing::{info, warn, error, instrument}; use serde::{Serialize, Deserialize}; use tokio::{task::{JoinHandle, AbortHandle}, sync::RwLock}; @@ -296,27 +297,27 @@ impl Task for Quarry { fn run(&mut self,turtle:TurtleCommander) -> AbortHandle { let owned = self.clone(); tokio::spawn(async move { - let chunk = owned.progress.next_chunk().await; + let chunk = owned.progress.next_chunk(); if let None = chunk { error!("scheduled quarry out of range"); return; } - let chunk = chunk.unwrap(); + let mut chunk = chunk.unwrap(); - info!("#{} doing chunk {chunk}", turtle.name().to_str()); + info!("#{} doing chunk {}", turtle.name().to_str(), *chunk); let max_chunk = Vec3::new(4,4,4); let e = owned.size.component_div(&max_chunk); - let rel_pos = fill(e, chunk).component_mul(&max_chunk); + let rel_pos = fill(e, *chunk).component_mul(&max_chunk); let abs_pos = rel_pos + owned.pos; if let None = mine_chunk_and_sweep(turtle, abs_pos, max_chunk).await { error!("mining at {abs_pos} failed"); - owned.progress.cancel(chunk).await; + chunk.cancel(); } else { - owned.progress.mark_done(chunk).await; + chunk.finish(); } owned.miners.fetch_sub(1, Ordering::AcqRel); }).abort_handle() @@ -350,38 +351,48 @@ impl Task for Quarry { #[derive(Serialize, Deserialize, Clone)] struct ChunkedTask { confirmed: Arc, + max: i32, #[serde(skip_deserializing)] head: Arc, // highest active chunk #[serde(skip)] - canceled: Arc>>, // must remain sorted - max: i32, + canceled: Option<(Sender, Receiver)>, } -impl ChunkedTask { - fn new(parts: i32) -> Self { +impl Default for ChunkedTask { + fn default() -> Self { Self { confirmed: Default::default(), head: Default::default(), - canceled: Default::default(), + canceled: None, + max: 0, + } + } +} + +impl ChunkedTask { + pub fn new(parts: i32) -> Self { + Self { max: parts, + canceled: Some(crossbeam::channel::unbounded()), + ..Default::default() } } - fn done(&self) -> bool { + pub fn done(&self) -> bool { let backstop = self.confirmed.load(Ordering::SeqCst); backstop + 1 >= self.max } - fn allocated(&self) -> bool { + pub fn allocated(&self) -> bool { let front = self.head.load(Ordering::SeqCst); - front + 1 >= self.max + front + 1 >= self.max && self.canceled.clone().unwrap().0.is_empty() } - async fn next_chunk(&self) -> Option { - let mut cancelled = self.canceled.clone().write_owned().await; + pub fn next_chunk(&self) -> Option { + let cancelled = self.canceled.clone().unwrap().1; - if let Some(chunk) = cancelled.pop() { - return Some(chunk); + if let Some(chunk) = cancelled.try_recv().ok() { + return Some(ChunkedTaskGuard::with_task(self.clone(), chunk)); } loop { // update head (from a save) @@ -395,21 +406,14 @@ impl ChunkedTask { let head = self.head.fetch_add(1, Ordering::AcqRel); if head < self.max { - Some(head) - } else { + Some(ChunkedTaskGuard::with_task(self.clone(), head)) + } else { None } } - async fn mark_done(&self, chunk: i32) { - let canceled = self.canceled.read().await; - - let min = match canceled.iter().min() { - None => true, - Some(minima) => chunk < *minima, - }; - - if min { + fn mark_done(&self, chunk: i32) { + if self.clone().canceled.unwrap().1.len() == 0 { loop { let curr = self.confirmed.load(Ordering::SeqCst); if let Ok(_) = self.confirmed.compare_exchange(curr, curr.max(chunk), Ordering::AcqRel, Ordering::SeqCst) { @@ -419,12 +423,10 @@ impl ChunkedTask { } } - async fn cancel(&self, chunk: i32) { - let mut in_flight = self.canceled.write().await; + fn cancel(&self, chunk: i32) { let max = self.head.load(Ordering::SeqCst); if chunk < max { - in_flight.push(chunk); - in_flight.sort_unstable(); + self.clone().canceled.unwrap().0.send(chunk).unwrap() } else { error!("attempted to cancel a job that hasn't happened yet"); @@ -432,6 +434,51 @@ impl ChunkedTask { } } +#[derive(Clone)] +struct ChunkedTaskGuard { + parent: ChunkedTask, + chunk: i32, + complete: bool, +} + +impl ChunkedTaskGuard { + fn with_task(task: ChunkedTask, chunk: i32) -> Self { + Self { + parent: task, + chunk, + complete: false, + } + } + + fn id(&self) -> i32 { + self.chunk + } + + fn finish(mut self) { + self.parent.mark_done(self.chunk); + self.complete = true; + } + + fn cancel(self) { + drop(self) // nop + } +} + +impl Drop for ChunkedTaskGuard { + fn drop(&mut self) { + if !self.complete { // if only "don't drop" was a feature of rustc + self.parent.cancel(self.chunk); + } + } +} + +impl Deref for ChunkedTaskGuard { + type Target = i32; + + fn deref(&self) -> &Self::Target { + &self.chunk + } +} #[cfg(test)] @@ -439,63 +486,38 @@ mod tests { use super::*; #[tokio::test] - async fn linear() { + async fn guard() { let tracker = ChunkedTask::new(5); - assert_eq!(tracker.next_chunk().await, Some(0)); - tracker.mark_done(0).await; - assert_eq!(tracker.next_chunk().await, Some(1)); - tracker.mark_done(1).await; - assert_eq!(tracker.next_chunk().await, Some(2)); - assert_eq!(tracker.done(), false); - tracker.mark_done(2).await; - assert_eq!(tracker.next_chunk().await, Some(3)); - tracker.mark_done(3).await; - assert_eq!(tracker.next_chunk().await, Some(4)); - assert_eq!(tracker.next_chunk().await, None); - assert_eq!(tracker.done(), false); - assert_eq!(tracker.next_chunk().await, None); - assert_eq!(tracker.allocated(), true); - tracker.mark_done(4).await; - assert_eq!(tracker.done(), true); - } - - #[tokio::test] - async fn cancel_replay() { - let tracker = ChunkedTask::new(5); - assert_eq!(tracker.next_chunk().await, Some(0)); - tracker.mark_done(0).await; - assert_eq!(tracker.next_chunk().await, Some(1)); - tracker.mark_done(1).await; - tracker.cancel(2).await; - assert_eq!(tracker.next_chunk().await, Some(2)); - assert_eq!(tracker.done(), false); - tracker.mark_done(2).await; - assert_eq!(tracker.next_chunk().await, Some(3)); - tracker.mark_done(3).await; - tracker.cancel(2).await; - assert_eq!(tracker.next_chunk().await, Some(2)); - assert_eq!(tracker.next_chunk().await, Some(4)); - tracker.cancel(1).await; - assert_eq!(tracker.next_chunk().await, Some(1)); - assert_eq!(tracker.done(), false); - assert_eq!(tracker.next_chunk().await, None); - assert_eq!(tracker.allocated(), true); - tracker.mark_done(4).await; - assert_eq!(tracker.done(), true); - } - - #[tokio::test] - async fn cancel_unexisting() { - let tracker = ChunkedTask::new(5); - assert_eq!(tracker.next_chunk().await, Some(0)); - assert_eq!(tracker.next_chunk().await, Some(1)); - assert_eq!(tracker.next_chunk().await, Some(2)); - tracker.cancel(2).await; - tracker.cancel(5).await; - tracker.cancel(3).await; - assert_eq!(tracker.next_chunk().await, Some(2)); - assert_eq!(tracker.next_chunk().await, Some(3)); - assert_eq!(tracker.next_chunk().await, Some(4)); - assert_eq!(tracker.next_chunk().await, None); + assert_eq!(tracker.next_chunk().unwrap().id(), 0); + let one = tracker.next_chunk(); + let two = tracker.next_chunk(); + { + assert_eq!(one.as_deref(), Some(&0)); + assert_eq!(two.as_deref(), Some(&1)); + one.unwrap().cancel(); + two.unwrap().finish(); + } + let a = tracker.next_chunk().unwrap(); + assert_eq!(*a, 0); + let b = tracker.next_chunk().unwrap(); + assert_eq!(*b, 2); + let c = tracker.next_chunk().unwrap(); + assert_eq!(*c, 3); + let d = tracker.next_chunk().unwrap(); + assert_eq!(*d, 4); + assert!(!tracker.done()); + assert!(tracker.allocated()); + drop(c); + assert!(!tracker.allocated()); + let e = tracker.next_chunk().unwrap(); + assert_eq!(*e, 3); + let f = tracker.next_chunk().is_none(); + assert_eq!(f, true); + a.finish(); + b.finish(); + d.finish(); + e.finish(); + assert!(tracker.done()); + assert!(tracker.allocated()); } }