1
Fork 0

factored progress into its own thing (partial)

This commit is contained in:
Andy Killorin 2023-12-26 16:30:02 -06:00
parent a7377c5855
commit 0f1b345aa4
Signed by: ank
GPG key ID: B6241CA3B552BCA4

View file

@ -227,24 +227,21 @@ pub struct Quarry {
size: Vec3, size: Vec3,
#[serde(skip_deserializing)] #[serde(skip_deserializing)]
miners: Arc<AtomicUsize>, miners: Arc<AtomicUsize>,
confirmed: Arc<AtomicI32>, pub progress: ChunkedTask,
#[serde(skip_deserializing)]
head: Arc<AtomicI32>,
#[serde(skip)]
in_flight: Arc<RwLock<Vec<i32>>>,
} }
impl Quarry { impl Quarry {
pub fn new(lower: Vec3, upper: Vec3) -> Self { pub fn new(lower: Vec3, upper: Vec3) -> Self {
let size = upper - lower; let size = upper - lower;
let max_chunk = Vec3::new(4,4,4);
let chunks = size.component_div(&max_chunk);
Self { Self {
pos: lower, pos: lower,
size, size,
miners: Arc::new(AtomicUsize::new(0)), miners: Arc::new(AtomicUsize::new(0)),
confirmed: Arc::new(AtomicI32::new(0)), progress: ChunkedTask::new(chunks.product())
head: Arc::new(AtomicI32::new(0)),
in_flight: Arc::new(RwLock::new(Vec::new())),
} }
} }
@ -252,37 +249,6 @@ impl Quarry {
let base = pos - pos.map(|n| n%16); let base = pos - pos.map(|n| n%16);
Self::new(base, base+Vec3::new(16,16,16)) Self::new(base, base+Vec3::new(16,16,16))
} }
async fn next_chunk(&self) -> i32 {
loop { // this might be unsound, I don't really know
let chunk = self.head.load(Ordering::SeqCst);
let backstop = self.confirmed.load(Ordering::SeqCst);
if let Ok(_) = self.head.compare_exchange(chunk, chunk.max(backstop), Ordering::AcqRel, Ordering::SeqCst) {
break;
}
}
let chunk = self.head.fetch_add(1, Ordering::AcqRel);
let backstop = self.confirmed.load(Ordering::SeqCst);
self.in_flight.write().await.push(chunk.max(backstop));
chunk
}
async fn mark_done(&self, chunk: i32) {
let mut in_flight = self.in_flight.write().await;
let min = in_flight.iter().max() == Some(&chunk);
in_flight.retain(|c| c != &chunk);
if min { // make sure that head is no less than min
loop {
let curr = self.confirmed.load(Ordering::SeqCst);
if let Ok(_) = self.confirmed.compare_exchange(curr, curr.max(chunk), Ordering::AcqRel, Ordering::SeqCst) {
break;
}
}
}
}
} }
#[serde] #[serde]
@ -291,8 +257,14 @@ impl Task for Quarry {
fn run(&mut self,turtle:TurtleCommander) -> AbortHandle { fn run(&mut self,turtle:TurtleCommander) -> AbortHandle {
let owned = self.clone(); let owned = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
let chunk = owned.next_chunk().await; let chunk = owned.progress.next_chunk().await;
// TODO: partial chunks on corners
if let None = chunk {
error!("scheduled quarry out of range");
return;
}
let chunk = chunk.unwrap();
let max_chunk = Vec3::new(4,4,4); let max_chunk = Vec3::new(4,4,4);
let e = owned.size.component_div(&max_chunk); let e = owned.size.component_div(&max_chunk);
@ -301,8 +273,9 @@ impl Task for Quarry {
+ owned.pos; + owned.pos;
if let None = mine_chunk_and_sweep(turtle, abs_pos, max_chunk).await { if let None = mine_chunk_and_sweep(turtle, abs_pos, max_chunk).await {
error!("mining at {abs_pos} failed"); error!("mining at {abs_pos} failed");
owned.progress.cancel(chunk).await;
} else { } else {
owned.mark_done(chunk).await; owned.progress.mark_done(chunk).await;
} }
owned.miners.fetch_sub(1, Ordering::AcqRel); owned.miners.fetch_sub(1, Ordering::AcqRel);
}).abort_handle() }).abort_handle()
@ -312,11 +285,11 @@ impl Task for Quarry {
let max_chunk = Vec3::new(4,4,4); let max_chunk = Vec3::new(4,4,4);
let chunks = self.size.component_div(&max_chunk); let chunks = self.size.component_div(&max_chunk);
if self.confirmed.load(Ordering::SeqCst) >= chunks.product() { if self.progress.done() {
return TaskState::Complete; return TaskState::Complete;
} }
if self.head.load(Ordering::SeqCst) >= chunks.product() { if self.progress.allocated().await {
return TaskState::Waiting; return TaskState::Waiting;
} }
@ -335,3 +308,131 @@ impl Task for Quarry {
TaskState::Waiting TaskState::Waiting
} }
} }
#[derive(Serialize, Deserialize, Clone)]
struct ChunkedTask {
confirmed: Arc<AtomicI32>,
#[serde(skip)]
in_flight: Arc<RwLock<Vec<i32>>>,
max: i32,
}
impl ChunkedTask {
fn new(parts: i32) -> Self {
Self {
confirmed: Default::default(),
in_flight: Default::default(),
max: parts,
}
}
fn done(&self) -> bool {
let backstop = self.confirmed.load(Ordering::SeqCst);
backstop >= self.max
}
async fn allocated(&self) -> bool {
let mut in_flight = self.in_flight.clone().write_owned().await;
in_flight.sort_unstable();
let backstop = self.confirmed.load(Ordering::SeqCst);
for i in backstop..self.max {
if in_flight.get(i as usize).is_none() {
return false;
}
}
return true;
}
async fn next_chunk(&self) -> Option<i32> {
let mut in_flight = self.in_flight.clone().write_owned().await;
in_flight.sort_unstable();
let backstop = self.confirmed.load(Ordering::SeqCst);
for i in backstop..self.max {
if in_flight.get(i as usize).is_none() {
in_flight.push(i);
return Some(i);
}
}
return None;
}
async fn mark_done(&self, chunk: i32) {
let mut in_flight = self.in_flight.write().await;
let min = in_flight.iter().max() == Some(&chunk);
in_flight.retain(|c| c != &chunk);
if min { // make sure that head is no less than min
loop {
let curr = self.confirmed.load(Ordering::SeqCst);
if let Ok(_) = self.confirmed.compare_exchange(curr, curr.max(chunk), Ordering::AcqRel, Ordering::SeqCst) {
break;
}
}
}
}
async fn cancel(&self, chunk: i32) {
let mut in_flight = self.in_flight.write().await;
in_flight.retain(|c| c != &chunk);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn linear() {
let tracker = ChunkedTask::new(5);
assert_eq!(tracker.next_chunk().await, Some(0));
assert_eq!(tracker.next_chunk().await, Some(1));
assert_eq!(tracker.next_chunk().await, Some(2));
assert_eq!(tracker.done(), false);
assert_eq!(tracker.next_chunk().await, Some(3));
assert_eq!(tracker.next_chunk().await, Some(4));
assert_eq!(tracker.next_chunk().await, None);
assert_eq!(tracker.done(), false);
assert_eq!(tracker.allocated().await, true);
tracker.mark_done(0).await;
tracker.mark_done(1).await;
tracker.mark_done(2).await;
tracker.mark_done(3).await;
tracker.mark_done(4).await;
tracker.mark_done(5).await;
assert_eq!(tracker.done(), true);
}
#[tokio::test]
async fn cancel_replay() {
let tracker = ChunkedTask::new(5);
assert_eq!(tracker.next_chunk().await, Some(0));
assert_eq!(tracker.next_chunk().await, Some(1));
assert_eq!(tracker.next_chunk().await, Some(2));
tracker.cancel(2).await;
assert_eq!(tracker.next_chunk().await, Some(2));
}
#[tokio::test]
async fn cancel_unexisting() {
let tracker = ChunkedTask::new(5);
assert_eq!(tracker.next_chunk().await, Some(0));
assert_eq!(tracker.next_chunk().await, Some(1));
assert_eq!(tracker.next_chunk().await, Some(2));
tracker.cancel(2).await;
tracker.cancel(5).await;
tracker.cancel(3).await;
assert_eq!(tracker.next_chunk().await, Some(2));
assert_eq!(tracker.next_chunk().await, Some(3));
assert_eq!(tracker.next_chunk().await, Some(4));
assert_eq!(tracker.next_chunk().await, None);
}
}