From 572ab4719e7eed606cd8529322dfe3c9207afbac Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Fri, 13 Nov 2020 09:49:44 +0000 Subject: [PATCH] Add metric for queued jobs by client --- scheduler/cluster_scheduler.ml | 27 ++++++++++++++++++++++++--- scheduler/pool.ml | 3 +++ scheduler/pool.mli | 3 +++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/scheduler/cluster_scheduler.ml b/scheduler/cluster_scheduler.ml index 34ce5e64..32e76ccf 100644 --- a/scheduler/cluster_scheduler.ml +++ b/scheduler/cluster_scheduler.ml @@ -7,6 +7,21 @@ let () = Sqlite_loader.Ty.register "ocluster-client" Client let restart_timeout = 600.0 (* Maximum time to wait for a worker to reconnect after it disconnects. *) +module Metrics = struct + open Prometheus + + let namespace = "ocluster" + let subsystem = "scheduler" + + let priority ~urgent = + if urgent then "high" else "low" + + let client_queued_jobs = + let help = "Items currently queued" in + let f = Gauge.v_labels ~label_names:["client"; "pool"; "priority"] ~help ~namespace ~subsystem "client_submitted_jobs" in + fun ~client_id ~pool ~urgent -> Gauge.labels f [client_id; pool; priority ~urgent] +end + module Item = struct type t = { descr : Cluster_api.Queue.job_desc; @@ -46,11 +61,17 @@ module Pool_api = struct let cond = Lwt_condition.create () in { pool; workers; cond } - let submit client ~urgent ~client_id (descr : Cluster_api.Queue.job_desc) : Cluster_api.Ticket.t = + let submit client ~urgent (descr : Cluster_api.Queue.job_desc) : Cluster_api.Ticket.t = let job, set_job = Capability.promise () in - Log.info (fun f -> f "Received new job request from %S (urgent=%b)" client_id urgent); + Log.info (fun f -> f "Received new job request from %S (urgent=%b)" (Pool.Client.client_id client) urgent); let item = { Item.descr; set_job } in let ticket = Pool.Client.submit ~urgent client item in + let queued_jobs = Metrics.client_queued_jobs ~client_id:(Pool.Client.client_id client) ~pool:(Pool.Client.pool_id client) ~urgent in + Prometheus.Gauge.inc_one queued_jobs; + Lwt.async (fun () -> + Capability.wait_until_settled job >|= fun () -> + Prometheus.Gauge.dec_one queued_jobs + ); let cancel () = match Pool.Client.cancel client ticket with | Ok () -> @@ -184,7 +205,7 @@ let submission_service ~validate ~sturdy_ref t client_id = | false -> Capability.broken (Capnp_rpc.Exception.v "Access has been revoked") | true -> match get_client pool with - | Ok client -> Pool_api.submit ~urgent ~client_id client descr + | Ok client -> Pool_api.submit ~urgent client descr | Error ex -> Capability.broken ex in Cluster_api.Submission.local ~submit ~sturdy_ref diff --git a/scheduler/pool.ml b/scheduler/pool.ml index 303b7a62..e13d74f6 100644 --- a/scheduler/pool.ml +++ b/scheduler/pool.ml @@ -618,6 +618,9 @@ module Make (Item : S.ITEM)(Time : S.TIME) = struct fn (); Ok () + let client_id t = t.info.id + let pool_id t = t.parent.pool + let v parent info = { parent; info } end diff --git a/scheduler/pool.mli b/scheduler/pool.mli index 666590a0..517b7d7f 100644 --- a/scheduler/pool.mli +++ b/scheduler/pool.mli @@ -34,6 +34,9 @@ module Make (Item : S.ITEM) (Time : S.TIME) : sig val get_rate : t -> float (** [get_rate t] is the rate previously set by [set_rate] (or [1.0] if never set). *) + + val client_id : t -> string + val pool_id : t -> string end val create : name:string -> db:Dao.t -> t