1
Fork 0

synchronous chunked task

This commit is contained in:
Andy Killorin 2023-12-28 00:28:24 -06:00
parent 93e08d1159
commit 1d98390c8d
Signed by: ank
GPG key ID: B6241CA3B552BCA4
2 changed files with 114 additions and 91 deletions

View file

@ -43,3 +43,4 @@ opentelemetry_sdk = { version = "0.21.1", features = ["trace"] }
memoize = "0.4.2" memoize = "0.4.2"
tracing-appender = "0.2.3" tracing-appender = "0.2.3"
ron = "0.8.1" ron = "0.8.1"
crossbeam = "0.8.3"

View file

@ -1,5 +1,6 @@
use std::sync::{Arc, atomic::{AtomicUsize, Ordering, AtomicI32}}; use std::{sync::{Arc, atomic::{AtomicUsize, Ordering, AtomicI32}}, ops::Deref};
use crossbeam::channel::{Sender, Receiver};
use tracing::{info, warn, error, instrument}; use tracing::{info, warn, error, instrument};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use tokio::{task::{JoinHandle, AbortHandle}, sync::RwLock}; use tokio::{task::{JoinHandle, AbortHandle}, sync::RwLock};
@ -296,27 +297,27 @@ impl Task for Quarry {
fn run(&mut self,turtle:TurtleCommander) -> AbortHandle { fn run(&mut self,turtle:TurtleCommander) -> AbortHandle {
let owned = self.clone(); let owned = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
let chunk = owned.progress.next_chunk().await; let chunk = owned.progress.next_chunk();
if let None = chunk { if let None = chunk {
error!("scheduled quarry out of range"); error!("scheduled quarry out of range");
return; return;
} }
let chunk = chunk.unwrap(); let mut chunk = chunk.unwrap();
info!("#{} doing chunk {chunk}", turtle.name().to_str()); info!("#{} doing chunk {}", turtle.name().to_str(), *chunk);
let max_chunk = Vec3::new(4,4,4); let max_chunk = Vec3::new(4,4,4);
let e = owned.size.component_div(&max_chunk); let e = owned.size.component_div(&max_chunk);
let rel_pos = fill(e, chunk).component_mul(&max_chunk); let rel_pos = fill(e, *chunk).component_mul(&max_chunk);
let abs_pos = rel_pos let abs_pos = rel_pos
+ owned.pos; + owned.pos;
if let None = mine_chunk_and_sweep(turtle, abs_pos, max_chunk).await { if let None = mine_chunk_and_sweep(turtle, abs_pos, max_chunk).await {
error!("mining at {abs_pos} failed"); error!("mining at {abs_pos} failed");
owned.progress.cancel(chunk).await; chunk.cancel();
} else { } else {
owned.progress.mark_done(chunk).await; chunk.finish();
} }
owned.miners.fetch_sub(1, Ordering::AcqRel); owned.miners.fetch_sub(1, Ordering::AcqRel);
}).abort_handle() }).abort_handle()
@ -350,38 +351,48 @@ impl Task for Quarry {
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
struct ChunkedTask { struct ChunkedTask {
confirmed: Arc<AtomicI32>, confirmed: Arc<AtomicI32>,
max: i32,
#[serde(skip_deserializing)] #[serde(skip_deserializing)]
head: Arc<AtomicI32>, // highest active chunk head: Arc<AtomicI32>, // highest active chunk
#[serde(skip)] #[serde(skip)]
canceled: Arc<RwLock<Vec<i32>>>, // must remain sorted canceled: Option<(Sender<i32>, Receiver<i32>)>,
max: i32,
} }
impl ChunkedTask { impl Default for ChunkedTask {
fn new(parts: i32) -> Self { fn default() -> Self {
Self { Self {
confirmed: Default::default(), confirmed: Default::default(),
head: Default::default(), head: Default::default(),
canceled: Default::default(), canceled: None,
max: 0,
}
}
}
impl ChunkedTask {
pub fn new(parts: i32) -> Self {
Self {
max: parts, max: parts,
canceled: Some(crossbeam::channel::unbounded()),
..Default::default()
} }
} }
fn done(&self) -> bool { pub fn done(&self) -> bool {
let backstop = self.confirmed.load(Ordering::SeqCst); let backstop = self.confirmed.load(Ordering::SeqCst);
backstop + 1 >= self.max backstop + 1 >= self.max
} }
fn allocated(&self) -> bool { pub fn allocated(&self) -> bool {
let front = self.head.load(Ordering::SeqCst); let front = self.head.load(Ordering::SeqCst);
front + 1 >= self.max front + 1 >= self.max && self.canceled.clone().unwrap().0.is_empty()
} }
async fn next_chunk(&self) -> Option<i32> { pub fn next_chunk(&self) -> Option<ChunkedTaskGuard> {
let mut cancelled = self.canceled.clone().write_owned().await; let cancelled = self.canceled.clone().unwrap().1;
if let Some(chunk) = cancelled.pop() { if let Some(chunk) = cancelled.try_recv().ok() {
return Some(chunk); return Some(ChunkedTaskGuard::with_task(self.clone(), chunk));
} }
loop { // update head (from a save) loop { // update head (from a save)
@ -395,21 +406,14 @@ impl ChunkedTask {
let head = self.head.fetch_add(1, Ordering::AcqRel); let head = self.head.fetch_add(1, Ordering::AcqRel);
if head < self.max { if head < self.max {
Some(head) Some(ChunkedTaskGuard::with_task(self.clone(), head))
} else { } else {
None None
} }
} }
async fn mark_done(&self, chunk: i32) { fn mark_done(&self, chunk: i32) {
let canceled = self.canceled.read().await; if self.clone().canceled.unwrap().1.len() == 0 {
let min = match canceled.iter().min() {
None => true,
Some(minima) => chunk < *minima,
};
if min {
loop { loop {
let curr = self.confirmed.load(Ordering::SeqCst); let curr = self.confirmed.load(Ordering::SeqCst);
if let Ok(_) = self.confirmed.compare_exchange(curr, curr.max(chunk), Ordering::AcqRel, Ordering::SeqCst) { if let Ok(_) = self.confirmed.compare_exchange(curr, curr.max(chunk), Ordering::AcqRel, Ordering::SeqCst) {
@ -419,12 +423,10 @@ impl ChunkedTask {
} }
} }
async fn cancel(&self, chunk: i32) { fn cancel(&self, chunk: i32) {
let mut in_flight = self.canceled.write().await;
let max = self.head.load(Ordering::SeqCst); let max = self.head.load(Ordering::SeqCst);
if chunk < max { if chunk < max {
in_flight.push(chunk); self.clone().canceled.unwrap().0.send(chunk).unwrap()
in_flight.sort_unstable();
} }
else { else {
error!("attempted to cancel a job that hasn't happened yet"); error!("attempted to cancel a job that hasn't happened yet");
@ -432,6 +434,51 @@ impl ChunkedTask {
} }
} }
#[derive(Clone)]
struct ChunkedTaskGuard {
parent: ChunkedTask,
chunk: i32,
complete: bool,
}
impl ChunkedTaskGuard {
fn with_task(task: ChunkedTask, chunk: i32) -> Self {
Self {
parent: task,
chunk,
complete: false,
}
}
fn id(&self) -> i32 {
self.chunk
}
fn finish(mut self) {
self.parent.mark_done(self.chunk);
self.complete = true;
}
fn cancel(self) {
drop(self) // nop
}
}
impl Drop for ChunkedTaskGuard {
fn drop(&mut self) {
if !self.complete { // if only "don't drop" was a feature of rustc
self.parent.cancel(self.chunk);
}
}
}
impl Deref for ChunkedTaskGuard {
type Target = i32;
fn deref(&self) -> &Self::Target {
&self.chunk
}
}
#[cfg(test)] #[cfg(test)]
@ -439,63 +486,38 @@ mod tests {
use super::*; use super::*;
#[tokio::test] #[tokio::test]
async fn linear() { async fn guard() {
let tracker = ChunkedTask::new(5); let tracker = ChunkedTask::new(5);
assert_eq!(tracker.next_chunk().await, Some(0)); assert_eq!(tracker.next_chunk().unwrap().id(), 0);
tracker.mark_done(0).await; let one = tracker.next_chunk();
assert_eq!(tracker.next_chunk().await, Some(1)); let two = tracker.next_chunk();
tracker.mark_done(1).await; {
assert_eq!(tracker.next_chunk().await, Some(2)); assert_eq!(one.as_deref(), Some(&0));
assert_eq!(tracker.done(), false); assert_eq!(two.as_deref(), Some(&1));
tracker.mark_done(2).await; one.unwrap().cancel();
assert_eq!(tracker.next_chunk().await, Some(3)); two.unwrap().finish();
tracker.mark_done(3).await; }
assert_eq!(tracker.next_chunk().await, Some(4)); let a = tracker.next_chunk().unwrap();
assert_eq!(tracker.next_chunk().await, None); assert_eq!(*a, 0);
assert_eq!(tracker.done(), false); let b = tracker.next_chunk().unwrap();
assert_eq!(tracker.next_chunk().await, None); assert_eq!(*b, 2);
assert_eq!(tracker.allocated(), true); let c = tracker.next_chunk().unwrap();
tracker.mark_done(4).await; assert_eq!(*c, 3);
assert_eq!(tracker.done(), true); let d = tracker.next_chunk().unwrap();
} assert_eq!(*d, 4);
assert!(!tracker.done());
#[tokio::test] assert!(tracker.allocated());
async fn cancel_replay() { drop(c);
let tracker = ChunkedTask::new(5); assert!(!tracker.allocated());
assert_eq!(tracker.next_chunk().await, Some(0)); let e = tracker.next_chunk().unwrap();
tracker.mark_done(0).await; assert_eq!(*e, 3);
assert_eq!(tracker.next_chunk().await, Some(1)); let f = tracker.next_chunk().is_none();
tracker.mark_done(1).await; assert_eq!(f, true);
tracker.cancel(2).await; a.finish();
assert_eq!(tracker.next_chunk().await, Some(2)); b.finish();
assert_eq!(tracker.done(), false); d.finish();
tracker.mark_done(2).await; e.finish();
assert_eq!(tracker.next_chunk().await, Some(3)); assert!(tracker.done());
tracker.mark_done(3).await; assert!(tracker.allocated());
tracker.cancel(2).await;
assert_eq!(tracker.next_chunk().await, Some(2));
assert_eq!(tracker.next_chunk().await, Some(4));
tracker.cancel(1).await;
assert_eq!(tracker.next_chunk().await, Some(1));
assert_eq!(tracker.done(), false);
assert_eq!(tracker.next_chunk().await, None);
assert_eq!(tracker.allocated(), true);
tracker.mark_done(4).await;
assert_eq!(tracker.done(), true);
}
#[tokio::test]
async fn cancel_unexisting() {
let tracker = ChunkedTask::new(5);
assert_eq!(tracker.next_chunk().await, Some(0));
assert_eq!(tracker.next_chunk().await, Some(1));
assert_eq!(tracker.next_chunk().await, Some(2));
tracker.cancel(2).await;
tracker.cancel(5).await;
tracker.cancel(3).await;
assert_eq!(tracker.next_chunk().await, Some(2));
assert_eq!(tracker.next_chunk().await, Some(3));
assert_eq!(tracker.next_chunk().await, Some(4));
assert_eq!(tracker.next_chunk().await, None);
} }
} }