1
Fork 0

disabled telemetry

made pathing a blocking task
This commit is contained in:
Andy Killorin 2023-12-25 20:27:22 -06:00
parent ca221497c5
commit 48d7ea164d
Signed by: ank
GPG key ID: B6241CA3B552BCA4
7 changed files with 55 additions and 25 deletions

View file

@ -10,16 +10,12 @@ anyhow = "1.0.75"
axum = "0.7.2" axum = "0.7.2"
bincode = "1.3.3" bincode = "1.3.3"
bit-struct = "0.3.2" bit-struct = "0.3.2"
const_format = "0.2.32"
erased-serde = "0.4.1" erased-serde = "0.4.1"
feistel_rs = "0.1.0" feistel_rs = "0.1.0"
future-parking_lot = "0.3.3"
hilbert_index = "0.2.0"
hyper = "1.0.1" hyper = "1.0.1"
hyper-util = "0.1.1" hyper-util = "0.1.1"
indoc = "2.0.4" indoc = "2.0.4"
nalgebra = { version = "0.32.3", features = ["serde-serialize"] } nalgebra = { version = "0.32.3", features = ["serde-serialize"] }
parking_lot = { version = "0.11", features = ["serde"] }
pathfinding = "4.6.0" pathfinding = "4.6.0"
rstar = { version = "0.11.0", features = ["serde"] } rstar = { version = "0.11.0", features = ["serde"] }
rustmatica = "0.1.1" rustmatica = "0.1.1"
@ -40,7 +36,7 @@ tracing = "0.1"
typetag = "0.2.14" typetag = "0.2.14"
ucnlnav = { git = "https://github.com/ucnl/UCNLNav.git", version = "0.1.0" } ucnlnav = { git = "https://github.com/ucnl/UCNLNav.git", version = "0.1.0" }
tracing-subscriber = { version = "0.3", features = ["registry"] } tracing-subscriber = { version = "0.3", features = ["registry"] }
console-subscriber = "0.1.5" opentelemetry = "0.21.0"
opentelemetry = "0.17.0" tracing-opentelemetry = "0.22"
tracing-opentelemetry = "0.17.2" opentelemetry-jaeger = { version = "0.20", features = ["rt-tokio"] }
opentelemetry-jaeger = "0.16.0" opentelemetry_sdk = { version = "0.21.1", features = ["trace"] }

View file

@ -26,7 +26,6 @@ impl Depots {
.map(|d| d) .map(|d| d)
} }
#[tracing::instrument(skip(self))]
pub async fn dock(&self, turtle: TurtleCommander) -> Option<usize> { pub async fn dock(&self, turtle: TurtleCommander) -> Option<usize> {
let depot = self.clone().nearest(turtle.pos().await).await?; let depot = self.clone().nearest(turtle.pos().await).await?;
trace!("depot at {depot:?}"); trace!("depot at {depot:?}");

View file

@ -1,6 +1,6 @@
#![feature(iter_map_windows, iter_collect_into)] #![feature(iter_map_windows, iter_collect_into)]
use std::{collections::VecDeque, io::ErrorKind, sync::Arc, env::args, path, borrow::BorrowMut}; use std::{collections::VecDeque, io::ErrorKind, sync::Arc, env::args, path, borrow::BorrowMut, time::Duration};
use anyhow::{Error, Ok}; use anyhow::{Error, Ok};
use axum::{ use axum::{
@ -11,6 +11,7 @@ use axum::{
use blocks::{World, Position, }; use blocks::{World, Position, };
use depot::Depots; use depot::Depots;
use opentelemetry::global; use opentelemetry::global;
use opentelemetry_sdk::{runtime::Tokio, trace::BatchConfig};
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use tracing::{info, span, Level}; use tracing::{info, span, Level};
use rstar::RTree; use rstar::RTree;
@ -57,11 +58,6 @@ async fn main() -> Result<(), Error> {
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("avarus")
.install_simple()?;
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let filter = filter::Targets::new() let filter = filter::Targets::new()
.with_default(Level::INFO) .with_default(Level::INFO)
@ -80,13 +76,31 @@ async fn main() -> Result<(), Error> {
.with_span_events(FmtSpan::ACTIVE) .with_span_events(FmtSpan::ACTIVE)
.with_filter(filter); .with_filter(filter);
tracing_subscriber::registry() let reg = tracing_subscriber::registry()
.with(opentelemetry) .with(subscriber);
.with(subscriber)
.try_init()?; let otel = false;
if otel {
let batch = BatchConfig::default()
.with_max_queue_size(65536)
.with_scheduled_delay(Duration::from_millis(800));
let tracer = opentelemetry_jaeger::new_agent_pipeline()
.with_service_name(format!("avarus-{}", SAVE.get().unwrap().display()))
.with_auto_split_batch(true)
.with_batch_processor_config(batch)
.install_batch(Tokio)?;
reg.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()?;
} else {
reg.try_init()?;
}
info!("starting"); info!("starting");
let (kill_send, kill_recv) = watch::channel(false); let (kill_send, kill_recv) = watch::channel(false);
let state = read_from_disk(kill_send).await?; let state = read_from_disk(kill_send).await?;

View file

@ -216,7 +216,7 @@ impl Task for Mine {
} }
} }
const MAX_MINERS: usize = 4; const MAX_MINERS: usize = 42;
#[derive(Serialize, Deserialize,Clone)] #[derive(Serialize, Deserialize,Clone)]
pub struct Quarry { pub struct Quarry {
@ -313,6 +313,10 @@ impl Task for Quarry {
return TaskState::Complete; return TaskState::Complete;
} }
if self.head.load(Ordering::SeqCst) >= chunks.product() {
return TaskState::Waiting;
}
let only = self.miners.fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| { let only = self.miners.fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| {
if n < MAX_MINERS { if n < MAX_MINERS {
Some(n+1) Some(n+1)

View file

@ -1,6 +1,7 @@
use crate::{ use crate::{
blocks::{World, Position, Direction, Vec3, WorldReadLock}, blocks::{World, Position, Direction, Vec3, WorldReadLock},
}; };
use tokio::task::spawn_blocking;
use tracing::{trace, error}; use tracing::{trace, error};
use pathfinding::prelude::astar; use pathfinding::prelude::astar;
@ -8,7 +9,7 @@ const LOOKUP_LIMIT: usize = 10_000_000;
#[tracing::instrument(skip(world))] #[tracing::instrument(skip(world))]
pub async fn route_facing(from: Position, to: Vec3, world: &World) -> Option<Vec<Position>> { pub async fn route_facing(from: Position, to: Vec3, world: &World) -> Option<Vec<Position>> {
let facing = |p: &Position| { let facing = move |p: &Position| {
let ahead = p.dir.unit() + p.pos; let ahead = p.dir.unit() + p.pos;
let above = Vec3::y() + p.pos; let above = Vec3::y() + p.pos;
let below = -Vec3::y() + p.pos; let below = -Vec3::y() + p.pos;
@ -26,17 +27,19 @@ pub async fn route(from: Position, to: Position, world: &World) -> Option<Vec<Po
{ {
return None; return None;
} }
route_to(from, to.pos, |p| p == &to, world).await route_to(from, to.pos, move |p| p == &to, world).await
} }
async fn route_to<D>(from: Position, to: Vec3, mut done: D, world: &World) -> Option<Vec<Position>> async fn route_to<D>(from: Position, to: Vec3, mut done: D, world: &World) -> Option<Vec<Position>>
where D: FnMut(&Position) -> bool { where D: FnMut(&Position) -> bool + Send + 'static {
// lock once, we'll be doing a lot of lookups // lock once, we'll be doing a lot of lookups
let world = world.clone().lock().await; let world = world.clone().lock().await;
let mut limit = LOOKUP_LIMIT; let mut limit = LOOKUP_LIMIT;
let route = astar( let route =
spawn_blocking( move ||
astar(
&from, &from,
move |p| next(p, &world), move |p| next(p, &world),
|p1| (p1.pos - &to).abs().sum() as u32, |p1| (p1.pos - &to).abs().sum() as u32,
@ -48,7 +51,7 @@ where D: FnMut(&Position) -> bool {
done(p) done(p)
} }
}, },
)?; )).await.unwrap()?;
trace!("scanned {} states", LOOKUP_LIMIT-limit); trace!("scanned {} states", LOOKUP_LIMIT-limit);
if limit != 0 { if limit != 0 {

View file

@ -233,6 +233,7 @@ impl TurtleCommander {
if let Some(fuel) = res { if let Some(fuel) = res {
return fuel; return fuel;
} }
error!("depot lock failed");
// this is a poor way to do this, but I feel like select! ing on 30 different things // this is a poor way to do this, but I feel like select! ing on 30 different things
// would be harder // would be harder
tokio::time::sleep(Duration::from_millis(wait)).await; tokio::time::sleep(Duration::from_millis(wait)).await;

View file

@ -46,6 +46,7 @@ pub fn turtle_api() -> Router<SharedControl> {
.route("/:id/manual", post(run_command)) .route("/:id/manual", post(run_command))
.route("/:id/dock", post(dock)) .route("/:id/dock", post(dock))
.route("/:id/info", get(turtle_info)) .route("/:id/info", get(turtle_info))
.route("/:id/register", get(register_turtle))
.route("/createTreeFarm", post(fell)) .route("/createTreeFarm", post(fell))
.route("/createMine", post(dig)) .route("/createMine", post(dig))
.route("/registerDepot", post(new_depot)) .route("/registerDepot", post(new_depot))
@ -54,6 +55,18 @@ pub fn turtle_api() -> Router<SharedControl> {
.route("/updateAll", get(update_turtles)) .route("/updateAll", get(update_turtles))
} }
pub(crate) async fn register_turtle(
Path(id): Path<u32>,
State(state): State<SharedControl>,
) -> &'static str {
let state = &mut state.write().await;
let commander = state.get_turtle(id).await.unwrap().clone();
state.tasks.add_turtle(&commander);
info!("registered turtle: {id}");
"ACK"
}
pub(crate) async fn create_turtle( pub(crate) async fn create_turtle(
State(state): State<SharedControl>, State(state): State<SharedControl>,
Json(req): Json<turtle::TurtleRegister>, Json(req): Json<turtle::TurtleRegister>,