diff --git a/server/Cargo.toml b/server/Cargo.toml index c76a6d6..7baa0fd 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,16 +10,12 @@ anyhow = "1.0.75" axum = "0.7.2" bincode = "1.3.3" bit-struct = "0.3.2" -const_format = "0.2.32" erased-serde = "0.4.1" feistel_rs = "0.1.0" -future-parking_lot = "0.3.3" -hilbert_index = "0.2.0" hyper = "1.0.1" hyper-util = "0.1.1" indoc = "2.0.4" nalgebra = { version = "0.32.3", features = ["serde-serialize"] } -parking_lot = { version = "0.11", features = ["serde"] } pathfinding = "4.6.0" rstar = { version = "0.11.0", features = ["serde"] } rustmatica = "0.1.1" @@ -40,7 +36,7 @@ tracing = "0.1" typetag = "0.2.14" ucnlnav = { git = "https://github.com/ucnl/UCNLNav.git", version = "0.1.0" } tracing-subscriber = { version = "0.3", features = ["registry"] } -console-subscriber = "0.1.5" -opentelemetry = "0.17.0" -tracing-opentelemetry = "0.17.2" -opentelemetry-jaeger = "0.16.0" +opentelemetry = "0.21.0" +tracing-opentelemetry = "0.22" +opentelemetry-jaeger = { version = "0.20", features = ["rt-tokio"] } +opentelemetry_sdk = { version = "0.21.1", features = ["trace"] } diff --git a/server/src/depot.rs b/server/src/depot.rs index 74a0f7d..5d3b1ff 100644 --- a/server/src/depot.rs +++ b/server/src/depot.rs @@ -26,7 +26,6 @@ impl Depots { .map(|d| d) } - #[tracing::instrument(skip(self))] pub async fn dock(&self, turtle: TurtleCommander) -> Option { let depot = self.clone().nearest(turtle.pos().await).await?; trace!("depot at {depot:?}"); diff --git a/server/src/main.rs b/server/src/main.rs index 58e22fe..add04f7 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,6 +1,6 @@ #![feature(iter_map_windows, iter_collect_into)] -use std::{collections::VecDeque, io::ErrorKind, sync::Arc, env::args, path, borrow::BorrowMut}; +use std::{collections::VecDeque, io::ErrorKind, sync::Arc, env::args, path, borrow::BorrowMut, time::Duration}; use anyhow::{Error, Ok}; use axum::{ @@ -11,6 +11,7 @@ use axum::{ use blocks::{World, Position, }; use depot::Depots; use opentelemetry::global; +use opentelemetry_sdk::{runtime::Tokio, trace::BatchConfig}; use tower_http::trace::TraceLayer; use tracing::{info, span, Level}; use rstar::RTree; @@ -57,11 +58,6 @@ async fn main() -> Result<(), Error> { global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); - let tracer = opentelemetry_jaeger::new_pipeline() - .with_service_name("avarus") - .install_simple()?; - - let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); let filter = filter::Targets::new() .with_default(Level::INFO) @@ -80,13 +76,31 @@ async fn main() -> Result<(), Error> { .with_span_events(FmtSpan::ACTIVE) .with_filter(filter); - tracing_subscriber::registry() - .with(opentelemetry) - .with(subscriber) - .try_init()?; + let reg = tracing_subscriber::registry() + .with(subscriber); + + let otel = false; + if otel { + let batch = BatchConfig::default() + .with_max_queue_size(65536) + .with_scheduled_delay(Duration::from_millis(800)); + + let tracer = opentelemetry_jaeger::new_agent_pipeline() + .with_service_name(format!("avarus-{}", SAVE.get().unwrap().display())) + .with_auto_split_batch(true) + .with_batch_processor_config(batch) + .install_batch(Tokio)?; + + reg.with(tracing_opentelemetry::layer().with_tracer(tracer)) + .try_init()?; + } else { + reg.try_init()?; + } + info!("starting"); + let (kill_send, kill_recv) = watch::channel(false); let state = read_from_disk(kill_send).await?; diff --git a/server/src/mine.rs b/server/src/mine.rs index 2aeaf96..de27456 100644 --- a/server/src/mine.rs +++ b/server/src/mine.rs @@ -216,7 +216,7 @@ impl Task for Mine { } } -const MAX_MINERS: usize = 4; +const MAX_MINERS: usize = 42; #[derive(Serialize, Deserialize,Clone)] pub struct Quarry { @@ -313,6 +313,10 @@ impl Task for Quarry { return TaskState::Complete; } + if self.head.load(Ordering::SeqCst) >= chunks.product() { + return TaskState::Waiting; + } + let only = self.miners.fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| { if n < MAX_MINERS { Some(n+1) diff --git a/server/src/paths.rs b/server/src/paths.rs index 0c136d0..c5276df 100644 --- a/server/src/paths.rs +++ b/server/src/paths.rs @@ -1,6 +1,7 @@ use crate::{ blocks::{World, Position, Direction, Vec3, WorldReadLock}, }; +use tokio::task::spawn_blocking; use tracing::{trace, error}; use pathfinding::prelude::astar; @@ -8,7 +9,7 @@ const LOOKUP_LIMIT: usize = 10_000_000; #[tracing::instrument(skip(world))] pub async fn route_facing(from: Position, to: Vec3, world: &World) -> Option> { - let facing = |p: &Position| { + let facing = move |p: &Position| { let ahead = p.dir.unit() + p.pos; let above = Vec3::y() + p.pos; let below = -Vec3::y() + p.pos; @@ -26,17 +27,19 @@ pub async fn route(from: Position, to: Position, world: &World) -> Option(from: Position, to: Vec3, mut done: D, world: &World) -> Option> -where D: FnMut(&Position) -> bool { +where D: FnMut(&Position) -> bool + Send + 'static { // lock once, we'll be doing a lot of lookups let world = world.clone().lock().await; let mut limit = LOOKUP_LIMIT; - let route = astar( + let route = + spawn_blocking( move || + astar( &from, move |p| next(p, &world), |p1| (p1.pos - &to).abs().sum() as u32, @@ -48,7 +51,7 @@ where D: FnMut(&Position) -> bool { done(p) } }, - )?; + )).await.unwrap()?; trace!("scanned {} states", LOOKUP_LIMIT-limit); if limit != 0 { diff --git a/server/src/turtle.rs b/server/src/turtle.rs index 5715b39..743599e 100644 --- a/server/src/turtle.rs +++ b/server/src/turtle.rs @@ -233,6 +233,7 @@ impl TurtleCommander { if let Some(fuel) = res { return fuel; } + error!("depot lock failed"); // this is a poor way to do this, but I feel like select! ing on 30 different things // would be harder tokio::time::sleep(Duration::from_millis(wait)).await; diff --git a/server/src/turtle_api.rs b/server/src/turtle_api.rs index a146a8e..e7dace0 100644 --- a/server/src/turtle_api.rs +++ b/server/src/turtle_api.rs @@ -46,6 +46,7 @@ pub fn turtle_api() -> Router { .route("/:id/manual", post(run_command)) .route("/:id/dock", post(dock)) .route("/:id/info", get(turtle_info)) + .route("/:id/register", get(register_turtle)) .route("/createTreeFarm", post(fell)) .route("/createMine", post(dig)) .route("/registerDepot", post(new_depot)) @@ -54,6 +55,18 @@ pub fn turtle_api() -> Router { .route("/updateAll", get(update_turtles)) } +pub(crate) async fn register_turtle( + Path(id): Path, + State(state): State, +) -> &'static str { + let state = &mut state.write().await; + let commander = state.get_turtle(id).await.unwrap().clone(); + state.tasks.add_turtle(&commander); + info!("registered turtle: {id}"); + + "ACK" +} + pub(crate) async fn create_turtle( State(state): State, Json(req): Json,