From 0f1b345aa42e720edab1f84a7ed5efbcc7dbe2df Mon Sep 17 00:00:00 2001 From: Andy Killorin <37423245+Speedy6451@users.noreply.github.com> Date: Tue, 26 Dec 2023 16:30:02 -0600 Subject: [PATCH] factored progress into its own thing (partial) --- server/src/mine.rs | 189 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 145 insertions(+), 44 deletions(-) diff --git a/server/src/mine.rs b/server/src/mine.rs index 589c62d..69f0aa7 100644 --- a/server/src/mine.rs +++ b/server/src/mine.rs @@ -227,24 +227,21 @@ pub struct Quarry { size: Vec3, #[serde(skip_deserializing)] miners: Arc, - confirmed: Arc, - #[serde(skip_deserializing)] - head: Arc, - #[serde(skip)] - in_flight: Arc>>, + pub progress: ChunkedTask, } impl Quarry { pub fn new(lower: Vec3, upper: Vec3) -> Self { let size = upper - lower; + let max_chunk = Vec3::new(4,4,4); + let chunks = size.component_div(&max_chunk); + Self { pos: lower, size, miners: Arc::new(AtomicUsize::new(0)), - confirmed: Arc::new(AtomicI32::new(0)), - head: Arc::new(AtomicI32::new(0)), - in_flight: Arc::new(RwLock::new(Vec::new())), + progress: ChunkedTask::new(chunks.product()) } } @@ -252,37 +249,6 @@ impl Quarry { let base = pos - pos.map(|n| n%16); Self::new(base, base+Vec3::new(16,16,16)) } - - async fn next_chunk(&self) -> i32 { - loop { // this might be unsound, I don't really know - let chunk = self.head.load(Ordering::SeqCst); - let backstop = self.confirmed.load(Ordering::SeqCst); - if let Ok(_) = self.head.compare_exchange(chunk, chunk.max(backstop), Ordering::AcqRel, Ordering::SeqCst) { - break; - } - } - - let chunk = self.head.fetch_add(1, Ordering::AcqRel); - let backstop = self.confirmed.load(Ordering::SeqCst); - self.in_flight.write().await.push(chunk.max(backstop)); - chunk - } - - async fn mark_done(&self, chunk: i32) { - let mut in_flight = self.in_flight.write().await; - let min = in_flight.iter().max() == Some(&chunk); - - in_flight.retain(|c| c != &chunk); - - if min { // make sure that head is no less than min - loop { - let curr = self.confirmed.load(Ordering::SeqCst); - if let Ok(_) = self.confirmed.compare_exchange(curr, curr.max(chunk), Ordering::AcqRel, Ordering::SeqCst) { - break; - } - } - } - } } #[serde] @@ -291,8 +257,14 @@ impl Task for Quarry { fn run(&mut self,turtle:TurtleCommander) -> AbortHandle { let owned = self.clone(); tokio::spawn(async move { - let chunk = owned.next_chunk().await; - // TODO: partial chunks on corners + let chunk = owned.progress.next_chunk().await; + + if let None = chunk { + error!("scheduled quarry out of range"); + return; + } + let chunk = chunk.unwrap(); + let max_chunk = Vec3::new(4,4,4); let e = owned.size.component_div(&max_chunk); @@ -301,8 +273,9 @@ impl Task for Quarry { + owned.pos; if let None = mine_chunk_and_sweep(turtle, abs_pos, max_chunk).await { error!("mining at {abs_pos} failed"); + owned.progress.cancel(chunk).await; } else { - owned.mark_done(chunk).await; + owned.progress.mark_done(chunk).await; } owned.miners.fetch_sub(1, Ordering::AcqRel); }).abort_handle() @@ -312,11 +285,11 @@ impl Task for Quarry { let max_chunk = Vec3::new(4,4,4); let chunks = self.size.component_div(&max_chunk); - if self.confirmed.load(Ordering::SeqCst) >= chunks.product() { + if self.progress.done() { return TaskState::Complete; } - if self.head.load(Ordering::SeqCst) >= chunks.product() { + if self.progress.allocated().await { return TaskState::Waiting; } @@ -335,3 +308,131 @@ impl Task for Quarry { TaskState::Waiting } } + +#[derive(Serialize, Deserialize, Clone)] +struct ChunkedTask { + confirmed: Arc, + #[serde(skip)] + in_flight: Arc>>, + max: i32, +} + +impl ChunkedTask { + fn new(parts: i32) -> Self { + Self { + confirmed: Default::default(), + in_flight: Default::default(), + max: parts, + } + } + + fn done(&self) -> bool { + let backstop = self.confirmed.load(Ordering::SeqCst); + backstop >= self.max + } + + async fn allocated(&self) -> bool { + let mut in_flight = self.in_flight.clone().write_owned().await; + in_flight.sort_unstable(); + + let backstop = self.confirmed.load(Ordering::SeqCst); + + for i in backstop..self.max { + if in_flight.get(i as usize).is_none() { + return false; + } + } + + return true; + } + async fn next_chunk(&self) -> Option { + let mut in_flight = self.in_flight.clone().write_owned().await; + in_flight.sort_unstable(); + + let backstop = self.confirmed.load(Ordering::SeqCst); + + for i in backstop..self.max { + if in_flight.get(i as usize).is_none() { + in_flight.push(i); + return Some(i); + } + } + + return None; + } + + async fn mark_done(&self, chunk: i32) { + let mut in_flight = self.in_flight.write().await; + + let min = in_flight.iter().max() == Some(&chunk); + + in_flight.retain(|c| c != &chunk); + + if min { // make sure that head is no less than min + loop { + let curr = self.confirmed.load(Ordering::SeqCst); + if let Ok(_) = self.confirmed.compare_exchange(curr, curr.max(chunk), Ordering::AcqRel, Ordering::SeqCst) { + break; + } + } + } + } + + async fn cancel(&self, chunk: i32) { + let mut in_flight = self.in_flight.write().await; + in_flight.retain(|c| c != &chunk); + } +} + + + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn linear() { + let tracker = ChunkedTask::new(5); + assert_eq!(tracker.next_chunk().await, Some(0)); + assert_eq!(tracker.next_chunk().await, Some(1)); + assert_eq!(tracker.next_chunk().await, Some(2)); + assert_eq!(tracker.done(), false); + assert_eq!(tracker.next_chunk().await, Some(3)); + assert_eq!(tracker.next_chunk().await, Some(4)); + assert_eq!(tracker.next_chunk().await, None); + assert_eq!(tracker.done(), false); + assert_eq!(tracker.allocated().await, true); + tracker.mark_done(0).await; + tracker.mark_done(1).await; + tracker.mark_done(2).await; + tracker.mark_done(3).await; + tracker.mark_done(4).await; + tracker.mark_done(5).await; + assert_eq!(tracker.done(), true); + } + + #[tokio::test] + async fn cancel_replay() { + let tracker = ChunkedTask::new(5); + assert_eq!(tracker.next_chunk().await, Some(0)); + assert_eq!(tracker.next_chunk().await, Some(1)); + assert_eq!(tracker.next_chunk().await, Some(2)); + tracker.cancel(2).await; + assert_eq!(tracker.next_chunk().await, Some(2)); + } + + #[tokio::test] + async fn cancel_unexisting() { + let tracker = ChunkedTask::new(5); + assert_eq!(tracker.next_chunk().await, Some(0)); + assert_eq!(tracker.next_chunk().await, Some(1)); + assert_eq!(tracker.next_chunk().await, Some(2)); + tracker.cancel(2).await; + tracker.cancel(5).await; + tracker.cancel(3).await; + assert_eq!(tracker.next_chunk().await, Some(2)); + assert_eq!(tracker.next_chunk().await, Some(3)); + assert_eq!(tracker.next_chunk().await, Some(4)); + assert_eq!(tracker.next_chunk().await, None); + } +}