From 9d7f3f45d5ead523eaf0d16f872b417ea046507f Mon Sep 17 00:00:00 2001 From: Andy Killorin <37423245+Speedy6451@users.noreply.github.com> Date: Mon, 25 Dec 2023 15:04:46 -0600 Subject: [PATCH] subchunk-level parallelism --- server/src/main.rs | 5 ++-- server/src/mine.rs | 74 ++++++++++++++++++++++++++++++---------------- 2 files changed, 52 insertions(+), 27 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 4077fe0..58e22fe 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -67,9 +67,10 @@ async fn main() -> Result<(), Error> { .with_default(Level::INFO) .with_target("server::tasks", Level::TRACE) .with_target("server::turtle", Level::ERROR) + .with_target("server::paths", Level::ERROR) .with_target("server::turtle_api", Level::INFO) - .with_target("server::fell", Level::INFO) - .with_target("server::mine", Level::INFO) + .with_target("server::fell", Level::WARN) + .with_target("server::mine", Level::WARN) .with_target("server::depot", Level::TRACE); let subscriber = tracing_subscriber::fmt::layer() diff --git a/server/src/mine.rs b/server/src/mine.rs index 223a4b0..d75b46c 100644 --- a/server/src/mine.rs +++ b/server/src/mine.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, atomic::{AtomicUsize, Ordering, AtomicI32}}; use tracing::{info, warn, error, instrument}; use serde::{Serialize, Deserialize}; -use tokio::task::{JoinHandle, AbortHandle}; +use tokio::{task::{JoinHandle, AbortHandle}, sync::RwLock}; use typetag::serde; use crate::{blocks::{Position, Vec3, Direction}, turtle::{TurtleCommand, TurtleCommander, TurtleCommandResponse, InventorySlot}, paths::TRANSPARENT, tasks::{Task, TaskState}}; @@ -216,13 +216,19 @@ impl Task for Mine { } } +const MAX_MINERS: usize = 4; + #[derive(Serialize, Deserialize,Clone)] pub struct Quarry { pos: Vec3, size: Vec3, #[serde(skip_deserializing)] miners: Arc, - chunk: Arc + confirmed: Arc, + #[serde(skip_deserializing)] + head: Arc, + #[serde(skip)] + in_flight: Arc>>, } impl Quarry { @@ -233,42 +239,60 @@ impl Quarry { pos: lower, size, miners: Arc::new(AtomicUsize::new(0)), - chunk: Arc::new(AtomicI32::new(0)), + confirmed: Arc::new(AtomicI32::new(0)), + head: Arc::new(AtomicI32::new(0)), + in_flight: Arc::new(RwLock::new(Vec::new())), } } pub fn chunk(pos: Vec3) -> Self { - let base = pos.map(|n| n%16); - Self { - pos: base, - size: Vec3::new(16,16,16), - miners: Arc::new(AtomicUsize::new(0)), - chunk: Arc::new(AtomicI32::new(0)), + let base = pos - pos.map(|n| n%16); + Self::new(base, base+Vec3::new(16,16,16)) + } + + async fn next_chunk(&self) -> i32 { + let chunk = self.head.fetch_add(1, Ordering::AcqRel); + self.in_flight.write().await.push(chunk); + chunk + } + + async fn mark_done(&self, chunk: i32) { + let mut in_flight = self.in_flight.write().await; + let min = in_flight.iter().min() == Some(&chunk); + + in_flight.retain(|c| c != &chunk); + + if min { // make sure that head is no less than min + loop { + let curr = self.head.load(Ordering::SeqCst); + if let Ok(_) = self.head.compare_exchange(curr, curr.min(chunk), Ordering::AcqRel, Ordering::SeqCst) { + break; + } + } } } } #[serde] impl Task for Quarry { + #[instrument(skip(self))] fn run(&mut self,turtle:TurtleCommander) -> AbortHandle { - let chunk = self.chunk.fetch_add(1, Ordering::AcqRel); - // TODO: partial chunks on corners - let max_chunk = Vec3::new(4,4,4); - let e = self.size.component_div(&max_chunk); - - let rel_pos = fill(e, chunk).component_mul(&max_chunk); - let abs_pos = rel_pos - + self.pos; - - let owned = self.miners.clone(); + let owned = self.clone(); tokio::spawn(async move { - // TODO: handle failure - // another turtle should get the next chunk while this is in flight, but program - // termination or a None return should reschedule the chunk + let chunk = owned.next_chunk().await; + // TODO: partial chunks on corners + let max_chunk = Vec3::new(4,4,4); + let e = owned.size.component_div(&max_chunk); + + let rel_pos = fill(e, chunk).component_mul(&max_chunk); + let abs_pos = rel_pos + + owned.pos; if let None = mine_chunk_and_sweep(turtle, abs_pos, max_chunk).await { error!("mining at {abs_pos} failed"); + } else { + owned.mark_done(chunk).await; } - owned.fetch_sub(1, Ordering::AcqRel); + owned.miners.fetch_sub(1, Ordering::AcqRel); }).abort_handle() } @@ -276,12 +300,12 @@ impl Task for Quarry { let max_chunk = Vec3::new(4,4,4); let chunks = self.size.component_div(&max_chunk); - if self.chunk.load(Ordering::SeqCst) >= chunks.product() { + if self.confirmed.load(Ordering::SeqCst) >= chunks.product() { return TaskState::Complete; } let only = self.miners.fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| { - if n < 1 { + if n < MAX_MINERS { Some(n+1) }else { None