1
Fork 0

de-async (work ahead of time)

This commit is contained in:
Andy Killorin 2023-12-26 16:45:08 -06:00
parent 0f1b345aa4
commit 97f383cb78
Signed by: ank
GPG key ID: B6241CA3B552BCA4

View file

@ -289,7 +289,7 @@ impl Task for Quarry {
return TaskState::Complete; return TaskState::Complete;
} }
if self.progress.allocated().await { if self.progress.allocated() {
return TaskState::Waiting; return TaskState::Waiting;
} }
@ -312,8 +312,9 @@ impl Task for Quarry {
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
struct ChunkedTask { struct ChunkedTask {
confirmed: Arc<AtomicI32>, confirmed: Arc<AtomicI32>,
head: Arc<AtomicI32>, // highest active chunk
#[serde(skip)] #[serde(skip)]
in_flight: Arc<RwLock<Vec<i32>>>, in_flight: Arc<RwLock<Vec<i32>>>, // must remain sorted
max: i32, max: i32,
} }
@ -321,6 +322,7 @@ impl ChunkedTask {
fn new(parts: i32) -> Self { fn new(parts: i32) -> Self {
Self { Self {
confirmed: Default::default(), confirmed: Default::default(),
head: Default::default(),
in_flight: Default::default(), in_flight: Default::default(),
max: parts, max: parts,
} }
@ -328,32 +330,27 @@ impl ChunkedTask {
fn done(&self) -> bool { fn done(&self) -> bool {
let backstop = self.confirmed.load(Ordering::SeqCst); let backstop = self.confirmed.load(Ordering::SeqCst);
backstop >= self.max backstop + 1 >= self.max
} }
async fn allocated(&self) -> bool { fn allocated(&self) -> bool {
let mut in_flight = self.in_flight.clone().write_owned().await; let front = self.head.load(Ordering::SeqCst);
in_flight.sort_unstable(); front + 1 >= self.max
let backstop = self.confirmed.load(Ordering::SeqCst);
for i in backstop..self.max {
if in_flight.get(i as usize).is_none() {
return false;
}
}
return true;
} }
async fn next_chunk(&self) -> Option<i32> { async fn next_chunk(&self) -> Option<i32> {
let mut in_flight = self.in_flight.clone().write_owned().await; let mut in_flight = self.in_flight.clone().write_owned().await;
in_flight.sort_unstable();
let backstop = self.confirmed.load(Ordering::SeqCst); let backstop = self.confirmed.load(Ordering::SeqCst);
// we have a mutex anyway
if let Some(highest) = in_flight.last() {
self.head.store(*highest, Ordering::SeqCst);
}
for i in backstop..self.max { for i in backstop..self.max {
if in_flight.get(i as usize).is_none() { if in_flight.get(i as usize).is_none() {
in_flight.push(i); in_flight.push(i);
in_flight.sort_unstable();
return Some(i); return Some(i);
} }
} }
@ -401,7 +398,7 @@ mod tests {
assert_eq!(tracker.next_chunk().await, Some(4)); assert_eq!(tracker.next_chunk().await, Some(4));
assert_eq!(tracker.next_chunk().await, None); assert_eq!(tracker.next_chunk().await, None);
assert_eq!(tracker.done(), false); assert_eq!(tracker.done(), false);
assert_eq!(tracker.allocated().await, true); assert_eq!(tracker.allocated(), true);
tracker.mark_done(0).await; tracker.mark_done(0).await;
tracker.mark_done(1).await; tracker.mark_done(1).await;
tracker.mark_done(2).await; tracker.mark_done(2).await;