1
Fork 0

subchunk-level parallelism

This commit is contained in:
Andy Killorin 2023-12-25 15:04:46 -06:00
parent 77f1547428
commit 9d7f3f45d5
Signed by: ank
GPG key ID: B6241CA3B552BCA4
2 changed files with 52 additions and 27 deletions

View file

@ -67,9 +67,10 @@ async fn main() -> Result<(), Error> {
.with_default(Level::INFO) .with_default(Level::INFO)
.with_target("server::tasks", Level::TRACE) .with_target("server::tasks", Level::TRACE)
.with_target("server::turtle", Level::ERROR) .with_target("server::turtle", Level::ERROR)
.with_target("server::paths", Level::ERROR)
.with_target("server::turtle_api", Level::INFO) .with_target("server::turtle_api", Level::INFO)
.with_target("server::fell", Level::INFO) .with_target("server::fell", Level::WARN)
.with_target("server::mine", Level::INFO) .with_target("server::mine", Level::WARN)
.with_target("server::depot", Level::TRACE); .with_target("server::depot", Level::TRACE);
let subscriber = tracing_subscriber::fmt::layer() let subscriber = tracing_subscriber::fmt::layer()

View file

@ -2,7 +2,7 @@ use std::sync::{Arc, atomic::{AtomicUsize, Ordering, AtomicI32}};
use tracing::{info, warn, error, instrument}; use tracing::{info, warn, error, instrument};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use tokio::task::{JoinHandle, AbortHandle}; use tokio::{task::{JoinHandle, AbortHandle}, sync::RwLock};
use typetag::serde; use typetag::serde;
use crate::{blocks::{Position, Vec3, Direction}, turtle::{TurtleCommand, TurtleCommander, TurtleCommandResponse, InventorySlot}, paths::TRANSPARENT, tasks::{Task, TaskState}}; use crate::{blocks::{Position, Vec3, Direction}, turtle::{TurtleCommand, TurtleCommander, TurtleCommandResponse, InventorySlot}, paths::TRANSPARENT, tasks::{Task, TaskState}};
@ -216,13 +216,19 @@ impl Task for Mine {
} }
} }
const MAX_MINERS: usize = 4;
#[derive(Serialize, Deserialize,Clone)] #[derive(Serialize, Deserialize,Clone)]
pub struct Quarry { pub struct Quarry {
pos: Vec3, pos: Vec3,
size: Vec3, size: Vec3,
#[serde(skip_deserializing)] #[serde(skip_deserializing)]
miners: Arc<AtomicUsize>, miners: Arc<AtomicUsize>,
chunk: Arc<AtomicI32> confirmed: Arc<AtomicI32>,
#[serde(skip_deserializing)]
head: Arc<AtomicI32>,
#[serde(skip)]
in_flight: Arc<RwLock<Vec<i32>>>,
} }
impl Quarry { impl Quarry {
@ -233,42 +239,60 @@ impl Quarry {
pos: lower, pos: lower,
size, size,
miners: Arc::new(AtomicUsize::new(0)), miners: Arc::new(AtomicUsize::new(0)),
chunk: Arc::new(AtomicI32::new(0)), confirmed: Arc::new(AtomicI32::new(0)),
head: Arc::new(AtomicI32::new(0)),
in_flight: Arc::new(RwLock::new(Vec::new())),
} }
} }
pub fn chunk(pos: Vec3) -> Self { pub fn chunk(pos: Vec3) -> Self {
let base = pos.map(|n| n%16); let base = pos - pos.map(|n| n%16);
Self { Self::new(base, base+Vec3::new(16,16,16))
pos: base, }
size: Vec3::new(16,16,16),
miners: Arc::new(AtomicUsize::new(0)), async fn next_chunk(&self) -> i32 {
chunk: Arc::new(AtomicI32::new(0)), let chunk = self.head.fetch_add(1, Ordering::AcqRel);
self.in_flight.write().await.push(chunk);
chunk
}
async fn mark_done(&self, chunk: i32) {
let mut in_flight = self.in_flight.write().await;
let min = in_flight.iter().min() == Some(&chunk);
in_flight.retain(|c| c != &chunk);
if min { // make sure that head is no less than min
loop {
let curr = self.head.load(Ordering::SeqCst);
if let Ok(_) = self.head.compare_exchange(curr, curr.min(chunk), Ordering::AcqRel, Ordering::SeqCst) {
break;
}
}
} }
} }
} }
#[serde] #[serde]
impl Task for Quarry { impl Task for Quarry {
#[instrument(skip(self))]
fn run(&mut self,turtle:TurtleCommander) -> AbortHandle { fn run(&mut self,turtle:TurtleCommander) -> AbortHandle {
let chunk = self.chunk.fetch_add(1, Ordering::AcqRel); let owned = self.clone();
// TODO: partial chunks on corners
let max_chunk = Vec3::new(4,4,4);
let e = self.size.component_div(&max_chunk);
let rel_pos = fill(e, chunk).component_mul(&max_chunk);
let abs_pos = rel_pos
+ self.pos;
let owned = self.miners.clone();
tokio::spawn(async move { tokio::spawn(async move {
// TODO: handle failure let chunk = owned.next_chunk().await;
// another turtle should get the next chunk while this is in flight, but program // TODO: partial chunks on corners
// termination or a None return should reschedule the chunk let max_chunk = Vec3::new(4,4,4);
let e = owned.size.component_div(&max_chunk);
let rel_pos = fill(e, chunk).component_mul(&max_chunk);
let abs_pos = rel_pos
+ owned.pos;
if let None = mine_chunk_and_sweep(turtle, abs_pos, max_chunk).await { if let None = mine_chunk_and_sweep(turtle, abs_pos, max_chunk).await {
error!("mining at {abs_pos} failed"); error!("mining at {abs_pos} failed");
} else {
owned.mark_done(chunk).await;
} }
owned.fetch_sub(1, Ordering::AcqRel); owned.miners.fetch_sub(1, Ordering::AcqRel);
}).abort_handle() }).abort_handle()
} }
@ -276,12 +300,12 @@ impl Task for Quarry {
let max_chunk = Vec3::new(4,4,4); let max_chunk = Vec3::new(4,4,4);
let chunks = self.size.component_div(&max_chunk); let chunks = self.size.component_div(&max_chunk);
if self.chunk.load(Ordering::SeqCst) >= chunks.product() { if self.confirmed.load(Ordering::SeqCst) >= chunks.product() {
return TaskState::Complete; return TaskState::Complete;
} }
let only = self.miners.fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| { let only = self.miners.fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| {
if n < 1 { if n < MAX_MINERS {
Some(n+1) Some(n+1)
}else { }else {
None None