Skip to content

Commit

Permalink
Add metric for queued jobs by client
Browse files Browse the repository at this point in the history
  • Loading branch information
talex5 committed Nov 16, 2020
1 parent ab53578 commit 572ab47
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
27 changes: 24 additions & 3 deletions scheduler/cluster_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ let () = Sqlite_loader.Ty.register "ocluster-client" Client

let restart_timeout = 600.0 (* Maximum time to wait for a worker to reconnect after it disconnects. *)

module Metrics = struct
open Prometheus

let namespace = "ocluster"
let subsystem = "scheduler"

let priority ~urgent =
if urgent then "high" else "low"

let client_queued_jobs =
let help = "Items currently queued" in
let f = Gauge.v_labels ~label_names:["client"; "pool"; "priority"] ~help ~namespace ~subsystem "client_submitted_jobs" in
fun ~client_id ~pool ~urgent -> Gauge.labels f [client_id; pool; priority ~urgent]
end

module Item = struct
type t = {
descr : Cluster_api.Queue.job_desc;
Expand Down Expand Up @@ -46,11 +61,17 @@ module Pool_api = struct
let cond = Lwt_condition.create () in
{ pool; workers; cond }

let submit client ~urgent ~client_id (descr : Cluster_api.Queue.job_desc) : Cluster_api.Ticket.t =
let submit client ~urgent (descr : Cluster_api.Queue.job_desc) : Cluster_api.Ticket.t =
let job, set_job = Capability.promise () in
Log.info (fun f -> f "Received new job request from %S (urgent=%b)" client_id urgent);
Log.info (fun f -> f "Received new job request from %S (urgent=%b)" (Pool.Client.client_id client) urgent);
let item = { Item.descr; set_job } in
let ticket = Pool.Client.submit ~urgent client item in
let queued_jobs = Metrics.client_queued_jobs ~client_id:(Pool.Client.client_id client) ~pool:(Pool.Client.pool_id client) ~urgent in
Prometheus.Gauge.inc_one queued_jobs;
Lwt.async (fun () ->
Capability.wait_until_settled job >|= fun () ->
Prometheus.Gauge.dec_one queued_jobs
);
let cancel () =
match Pool.Client.cancel client ticket with
| Ok () ->
Expand Down Expand Up @@ -184,7 +205,7 @@ let submission_service ~validate ~sturdy_ref t client_id =
| false -> Capability.broken (Capnp_rpc.Exception.v "Access has been revoked")
| true ->
match get_client pool with
| Ok client -> Pool_api.submit ~urgent ~client_id client descr
| Ok client -> Pool_api.submit ~urgent client descr
| Error ex -> Capability.broken ex
in
Cluster_api.Submission.local ~submit ~sturdy_ref
Expand Down
3 changes: 3 additions & 0 deletions scheduler/pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,9 @@ module Make (Item : S.ITEM)(Time : S.TIME) = struct
fn ();
Ok ()

let client_id t = t.info.id
let pool_id t = t.parent.pool

let v parent info = { parent; info }
end

Expand Down
3 changes: 3 additions & 0 deletions scheduler/pool.mli
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ module Make (Item : S.ITEM) (Time : S.TIME) : sig

val get_rate : t -> float
(** [get_rate t] is the rate previously set by [set_rate] (or [1.0] if never set). *)

val client_id : t -> string
val pool_id : t -> string
end

val create : name:string -> db:Dao.t -> t
Expand Down

0 comments on commit 572ab47

Please sign in to comment.