1
Fork 0

fixed double-scheduling

This commit is contained in:
Andy Killorin 2023-12-23 19:42:05 -06:00
parent 4c9fab0ed7
commit 50d24c8ea8
Signed by: ank
GPG key ID: B6241CA3B552BCA4
3 changed files with 21 additions and 14 deletions

View file

@ -3,7 +3,7 @@ use std::ops::Mul;
use log::{trace, warn, info}; use log::{trace, warn, info};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::task::JoinHandle; use tokio::task::{JoinHandle, AbortHandle};
use typetag::serde; use typetag::serde;
use crate::{blocks::{Vec3, Position, Direction}, turtle::{TurtleCommander, TurtleCommand, TurtleCommandResponse, InventorySlot}, tasks::{Task, TaskState}, depot::Depots, mine::fill, paths::TRANSPARENT}; use crate::{blocks::{Vec3, Position, Direction}, turtle::{TurtleCommander, TurtleCommand, TurtleCommandResponse, InventorySlot}, tasks::{Task, TaskState}, depot::Depots, mine::fill, paths::TRANSPARENT};
@ -135,11 +135,11 @@ impl TreeFarm {
#[serde] #[serde]
impl Task for TreeFarm { impl Task for TreeFarm {
fn run(&mut self,turtle:TurtleCommander) -> JoinHandle<()> { fn run(&mut self,turtle:TurtleCommander) -> AbortHandle {
let frozen = self.clone(); let frozen = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
frozen.sweep(turtle).await.unwrap(); frozen.sweep(turtle).await.unwrap();
}) }).abort_handle()
} }
fn poll(&mut self) -> TaskState { fn poll(&mut self) -> TaskState {

View file

@ -1,6 +1,6 @@
use log::{info, warn}; use log::{info, warn};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use tokio::task::JoinHandle; use tokio::task::{JoinHandle, AbortHandle};
use typetag::serde; use typetag::serde;
use crate::{blocks::{Position, Vec3, Direction}, turtle::{TurtleCommand, TurtleCommander, TurtleCommandResponse, InventorySlot}, paths::TRANSPARENT, tasks::{Task, TaskState}}; use crate::{blocks::{Position, Vec3, Direction}, turtle::{TurtleCommand, TurtleCommander, TurtleCommandResponse, InventorySlot}, paths::TRANSPARENT, tasks::{Task, TaskState}};
@ -188,12 +188,12 @@ impl Mine {
#[serde] #[serde]
impl Task for Mine { impl Task for Mine {
fn run(&mut self,turtle:TurtleCommander) -> JoinHandle<()> { fn run(&mut self,turtle:TurtleCommander) -> AbortHandle {
self.miners += 1; self.miners += 1;
let frozen = self.clone(); let frozen = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
mine(turtle,frozen.pos, frozen.chunk).await.unwrap(); mine(turtle,frozen.pos, frozen.chunk).await.unwrap();
}) }).abort_handle()
// TODO: mutability after spawn // TODO: mutability after spawn
} }

View file

@ -1,6 +1,6 @@
use log::info; use log::{info, trace};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle; use tokio::task::{JoinHandle, AbortHandle};
use crate::names::Name; use crate::names::Name;
use crate::{turtle::TurtleCommander, blocks::Position}; use crate::{turtle::TurtleCommander, blocks::Position};
@ -14,7 +14,7 @@ pub enum TaskState {
#[typetag::serde(tag = "task")] #[typetag::serde(tag = "task")]
pub trait Task: Send + Sync { pub trait Task: Send + Sync {
/// Execute the task /// Execute the task
fn run(&mut self, turtle: TurtleCommander) -> JoinHandle<()>; fn run(&mut self, turtle: TurtleCommander) -> AbortHandle;
/// Return Some if the task should be scheduled /// Return Some if the task should be scheduled
fn poll(&mut self) -> TaskState; fn poll(&mut self) -> TaskState;
} }
@ -22,7 +22,7 @@ pub trait Task: Send + Sync {
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct Scheduler { pub struct Scheduler {
#[serde(skip)] #[serde(skip)]
turtles: Vec<(TurtleCommander, Option<JoinHandle<()>>)>, turtles: Vec<(TurtleCommander, Option<AbortHandle>)>,
tasks: Vec<Box<dyn Task>>, tasks: Vec<Box<dyn Task>>,
} }
@ -49,7 +49,7 @@ impl Scheduler {
} }
pub fn add_task(&mut self, task: Box<dyn Task>) { pub fn add_task(&mut self, task: Box<dyn Task>) {
info!("new task"); trace!("new task");
self.tasks.push(task); self.tasks.push(task);
} }
@ -57,12 +57,13 @@ impl Scheduler {
for turtle in &mut self.turtles { for turtle in &mut self.turtles {
if let Some(join) = &turtle.1 { if let Some(join) = &turtle.1 {
if join.is_finished() { if join.is_finished() {
trace!("#{} completed task", turtle.0.name().to_num());
turtle.1 = None; turtle.1 = None;
} }
} }
} }
let mut free_turtles: Vec<&mut (TurtleCommander, Option<JoinHandle<()>>)> = let mut free_turtles: Vec<&mut (TurtleCommander, Option<AbortHandle>)> =
self.turtles.iter_mut().filter(|t| t.1.is_none()).collect(); self.turtles.iter_mut().filter(|t| t.1.is_none()).collect();
let mut turtle_positions = Vec::new(); let mut turtle_positions = Vec::new();
@ -74,13 +75,16 @@ impl Scheduler {
for (i, task) in self.tasks.iter_mut().enumerate() { for (i, task) in self.tasks.iter_mut().enumerate() {
let poll = task.poll(); let poll = task.poll();
if let TaskState::Ready(position) = poll { if let TaskState::Ready(position) = poll {
let closest_turtle = match free_turtles.iter_mut().zip(turtle_positions.iter()).min_by_key( |(_,p)| { let closest_turtle = match free_turtles.iter_mut().zip(turtle_positions.iter())
.filter(|t|t.0.1.is_none()) // Don't double-schedule
.min_by_key( |(_,p)| {
p.manhattan(position) p.manhattan(position)
}) { }) {
Some(turtle) => turtle.0, Some(turtle) => turtle.0,
None => break, None => break,
}; };
trace!("scheduling task on #{}", closest_turtle.0.name().to_num());
closest_turtle.1 = Some(task.run(closest_turtle.0.clone())); closest_turtle.1 = Some(task.run(closest_turtle.0.clone()));
} }
if let TaskState::Complete = poll { if let TaskState::Complete = poll {
@ -98,7 +102,10 @@ impl Scheduler {
} }
pub async fn cancel(&mut self, turtle: Name) -> Option<()> { pub async fn cancel(&mut self, turtle: Name) -> Option<()> {
self.turtles.iter_mut().find(|t| t.0.name() == turtle)?.1.take()?.abort(); if let Some(task) = self.turtles.iter_mut().find(|t| t.0.name() == turtle)?.1.as_ref() {
task.abort();
info!("aborted task for #{}", turtle.to_num());
}
Some(()) Some(())
} }
} }