Skip to content

Commit

Permalink
Merge pull request #88 from talex5/fair-queuing
Browse files Browse the repository at this point in the history
Fair queuing
  • Loading branch information
talex5 authored Nov 16, 2020
2 parents fc053f5 + 572ab47 commit 6e6e2b2
Show file tree
Hide file tree
Showing 13 changed files with 625 additions and 206 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The scheduler tries to schedule similar builds on the same machine, to benefit f
* [Docker jobs](#docker-jobs)
* [OBuilder jobs](#obuilder-jobs)
* [Admin](#admin)
* [Fair queuing](#fair-queuing)
* [API](#api)
* [Security model](#security-model)
* [Prometheus metrics](#prometheus-metrics)
Expand Down Expand Up @@ -236,6 +237,44 @@ may take a while. The `restart` command waits until the worker has reconnected b

You can also give the name of a worker as an extra argument to update just that worker.

### Fair queuing

Some clients may submit many jobs in batches.
In this case, we probably want other client's jobs to enter the queue ahead of them, even if they are submitted afterwards.

To handle this, each client (separately for each pool) can be configured with a "rate",
which is the rate at which they can expect to use the cluster (the number of jobs they will have running at once).
If the cluster has free capacity then this has no effect; all jobs will be run.
However, when queuing the scheduler will use this information to try to schedule jobs so that they run when they
would have run if the client was using the cluster at the expected rate.

For example:

1. The admin sets Alice's rate to 2 and Bob's rate the 1 (the default), using `ocluster-admin set-rate`.
2. Bob submits 10 jobs, each estimated to run for 1 minute.
3. Because Bob's rate is one, the cluster assigns these jobs "fair start times" of now, +1m, +2m, +3m, etc.
4. The cluster will start as many of these jobs as it has capacity for. If its capacity is 3, Bob's first three jobs will start.
5. Alice submits two jobs, also taking one minute each.
The cluster assigns these jobs fair start times of now and now+30s.
These will be the next two jobs to run, because their start times are before all of Bob's jobs.

In the `ocluster-admin show` output, you will see these values.
For example:

```
...
queue: (backlog) [bob:job8@10m alice:job1@27s]
clients: alice(5)+2m bob(3)
```

This means that:

- There are two jobs on the backlog: Alice's `job1` (fair-start time 27s from now), and Bob's `job8`
(fair start time 10m from now). Alice's job will go first, because it has the lower start time.
- Alice has a rate of 5 jobs (5 job-seconds per second) and her next job will have a fair start time
of 2 minutes from now (because she has already submitted more jobs than her rate).
- Bob has a rate of 3 and no penalty. His next job will get a fair start time of now.


## API

Expand Down
18 changes: 17 additions & 1 deletion api/pool_admin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type worker_info = {
active : bool;
}

let local ~show ~workers ~worker ~set_active ~update =
let local ~show ~workers ~worker ~set_active ~update ~set_rate =
let module X = Raw.Service.PoolAdmin in
X.local @@ object
inherit X.service
Expand Down Expand Up @@ -58,6 +58,15 @@ let local ~show ~workers ~worker ~set_active ~update =
let name = Params.worker_get params in
release_param_caps ();
update name

method set_rate_impl params release_param_caps =
let open X.SetRate in
let client_id = Params.id_get params in
let rate = Params.rate_get params in
release_param_caps ();
match set_rate ~client_id rate with
| Ok () -> Service.return_empty ()
| Error `No_such_user -> Service.fail "No such user"
end

module X = Raw.Client.PoolAdmin
Expand Down Expand Up @@ -97,3 +106,10 @@ let update t worker =
let request, params = Capability.Request.create Params.init_pointer in
Params.worker_set params worker;
Capability.call_for_unit t method_id request

let set_rate t ~client_id rate =
let open X.SetRate in
let request, params = Capability.Request.create Params.init_pointer in
Params.id_set params client_id;
Params.rate_set params rate;
Capability.call_for_unit_exn t method_id request
6 changes: 1 addition & 5 deletions api/registration.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ let local ~register =
method register_impl params release_param_caps =
let open X.Register in
let name = Params.name_get params in
let capacity =
let x = Params.capacity_get_int_exn params in
if x > 0 then x
else 32 (* Old workers don't report their capacity. *)
in
let capacity = Params.capacity_get_int_exn params in
let worker = Params.worker_get params in
release_param_caps ();
match worker with
Expand Down
3 changes: 3 additions & 0 deletions api/schema.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ interface PoolAdmin {

update @4 (worker :Text) -> ();
# Drain worker, ask it to restart with the latest version, and return when it comes back.

setRate @5 (id :Text, rate :Float64) -> ();
# Set the expected share of the pool for this client.
}

interface Admin {
Expand Down
27 changes: 22 additions & 5 deletions bin/admin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ let list_clients cap_path =
| [] -> Fmt.epr "No clients.@."
| clients -> List.iter print_endline clients

let set_rate cap_path pool_id client_id rate =
run cap_path @@ fun admin_service ->
let pool = Cluster_api.Admin.pool admin_service pool_id in
Cluster_api.Pool_admin.set_rate pool ~client_id rate

let show cap_path pool =
run cap_path @@ fun admin_service ->
match pool with
Expand Down Expand Up @@ -140,8 +145,8 @@ let connect_addr =
~docv:"ADDR"
[]

let client_id =
Arg.pos 1 Arg.(some string) None @@
let client_id ~pos =
Arg.pos pos Arg.(some string) None @@
Arg.info
~doc:"Unique name or ID for the client"
~docv:"ID"
Expand All @@ -154,6 +159,13 @@ let pool_pos =
~docv:"POOL"
[]

let rate ~pos =
Arg.pos pos Arg.(some float) None @@
Arg.info
~doc:"Number of parallel jobs"
~docv:"RATE"
[]

let worker =
Arg.value @@
Arg.pos 2 Arg.(some string) None @@
Expand All @@ -171,19 +183,24 @@ let all =

let add_client =
let doc = "Create a new client endpoint for submitting jobs" in
Term.(const add_client $ connect_addr $ Arg.required client_id),
Term.(const add_client $ connect_addr $ Arg.required (client_id ~pos:1)),
Term.info "add-client" ~doc

let remove_client =
let doc = "Unregister a client." in
Term.(const remove_client $ connect_addr $ Arg.required client_id),
Term.(const remove_client $ connect_addr $ Arg.required (client_id ~pos:1)),
Term.info "remove-client" ~doc

let list_clients =
let doc = "List registered clients" in
Term.(const list_clients $ connect_addr),
Term.info "list-clients" ~doc

let set_rate =
let doc = "Set expected number of parallel jobs for a pool/client combination" in
Term.(const set_rate $ connect_addr $ Arg.required pool_pos $ Arg.required (client_id ~pos:2) $ Arg.required (rate ~pos:3)),
Term.info "set-rate" ~doc

let show =
let doc = "Show information about a service, pool or worker" in
Term.(const show $ connect_addr $ Arg.value pool_pos),
Expand All @@ -204,7 +221,7 @@ let update =
Term.(const update $ connect_addr $ Arg.required pool_pos $ worker),
Term.info "update" ~doc

let cmds = [add_client; remove_client; list_clients; show; pause; unpause; update]
let cmds = [add_client; remove_client; list_clients; set_rate; show; pause; unpause; update]

let default_cmd =
let doc = "a command-line admin client for the build-scheduler" in
Expand Down
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
cohttp-lwt-unix
sqlite3
obuilder
psq
(mirage-crypto (>= 0.8.5))
(ocaml (>= 4.10.0))
(current_ocluster (and (= :version) :with-test))
Expand Down
1 change: 1 addition & 0 deletions ocluster.opam
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ depends: [
"cohttp-lwt-unix"
"sqlite3"
"obuilder"
"psq"
"mirage-crypto" {>= "0.8.5"}
"ocaml" {>= "4.10.0"}
"current_ocluster" {= version & with-test}
Expand Down
79 changes: 64 additions & 15 deletions scheduler/cluster_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ let () = Sqlite_loader.Ty.register "ocluster-client" Client

let restart_timeout = 600.0 (* Maximum time to wait for a worker to reconnect after it disconnects. *)

module Metrics = struct
open Prometheus

let namespace = "ocluster"
let subsystem = "scheduler"

let priority ~urgent =
if urgent then "high" else "low"

let client_queued_jobs =
let help = "Items currently queued" in
let f = Gauge.v_labels ~label_names:["client"; "pool"; "priority"] ~help ~namespace ~subsystem "client_submitted_jobs" in
fun ~client_id ~pool ~urgent -> Gauge.labels f [client_id; pool; priority ~urgent]
end

module Item = struct
type t = {
descr : Cluster_api.Queue.job_desc;
Expand All @@ -32,7 +47,7 @@ module Item = struct
end

module Pool_api = struct
module Pool = Pool.Make(Item)
module Pool = Pool.Make(Item)(Unix)

type t = {
pool : Pool.t;
Expand All @@ -46,21 +61,27 @@ module Pool_api = struct
let cond = Lwt_condition.create () in
{ pool; workers; cond }

let submit t ~urgent ~client_id (descr : Cluster_api.Queue.job_desc) : Cluster_api.Ticket.t =
let submit client ~urgent (descr : Cluster_api.Queue.job_desc) : Cluster_api.Ticket.t =
let job, set_job = Capability.promise () in
Log.info (fun f -> f "Received new job request from %S (urgent=%b)" client_id urgent);
Log.info (fun f -> f "Received new job request from %S (urgent=%b)" (Pool.Client.client_id client) urgent);
let item = { Item.descr; set_job } in
let ticket = Pool.submit ~urgent t.pool item in
let ticket = Pool.Client.submit ~urgent client item in
let queued_jobs = Metrics.client_queued_jobs ~client_id:(Pool.Client.client_id client) ~pool:(Pool.Client.pool_id client) ~urgent in
Prometheus.Gauge.inc_one queued_jobs;
Lwt.async (fun () ->
Capability.wait_until_settled job >|= fun () ->
Prometheus.Gauge.dec_one queued_jobs
);
let cancel () =
match Pool.cancel ticket with
match Pool.Client.cancel client ticket with
| Ok () ->
Capability.resolve_exn set_job (Capnp_rpc.Exception.v "Ticket cancelled");
Lwt_result.return ()
| Error `Not_queued ->
Cluster_api.Job.cancel job
in
let release () =
match Pool.cancel ticket with
match Pool.Client.cancel client ticket with
| Ok () -> Capability.resolve_exn set_job (Capnp_rpc.Exception.v "Ticket released (cancelled)")
| Error `Not_queued -> ()
in
Expand Down Expand Up @@ -103,7 +124,7 @@ module Pool_api = struct
Capability.inc_ref w;
Some w

let admin_service t =
let admin_service ~validate_client t =
let show () = Fmt.to_to_string Pool.show t.pool in
let workers () =
Pool.connected_workers t.pool
Expand Down Expand Up @@ -142,7 +163,17 @@ module Pool_api = struct
in
Lwt.pick [ aux (); timeout ]
in
Cluster_api.Pool_admin.local ~show ~workers ~worker:(worker t) ~set_active ~update
let set_rate ~client_id rate =
if validate_client client_id then (
let client = Pool.client t.pool ~client_id in
Pool.Client.set_rate client rate;
Ok ()
) else Error `No_such_user
in
Cluster_api.Pool_admin.local ~show ~workers ~worker:(worker t) ~set_active ~update ~set_rate

let remove_client t ~client_id =
Pool.remove_client t.pool ~client_id
end

type t = {
Expand All @@ -155,16 +186,27 @@ let registration_services t =
let pp_pool_name f (name, _) = Fmt.string f name

let submission_service ~validate ~sturdy_ref t client_id =
let pools = Hashtbl.create 3 in
let get_client pool_id =
match Hashtbl.find_opt pools pool_id with
| Some c -> Ok c
| None ->
match String.Map.find_opt pool_id t.pools with
| None ->
let msg = Fmt.strf "Pool ID %S not one of @[<h>{%a}@]" pool_id (String.Map.pp ~sep:Fmt.comma pp_pool_name) t.pools in
Error (Capnp_rpc.Exception.v msg)
| Some pool ->
let client = Pool_api.Pool.client pool.pool ~client_id in
Hashtbl.add pools pool_id client;
Ok client
in
let submit ~pool ~urgent descr =
match validate () with
| false -> Capability.broken (Capnp_rpc.Exception.v "Access has been revoked")
| true ->
match String.Map.find_opt pool t.pools with
| None ->
let msg = Fmt.strf "Pool ID %S not one of @[<h>{%a}@]" pool (String.Map.pp ~sep:Fmt.comma pp_pool_name) t.pools in
Capability.broken (Capnp_rpc.Exception.v msg)
| Some pool ->
Pool_api.submit ~urgent ~client_id pool descr
match get_client pool with
| Ok client -> Pool_api.submit ~urgent client descr
| Error ex -> Capability.broken ex
in
Cluster_api.Submission.local ~submit ~sturdy_ref

Expand All @@ -176,7 +218,13 @@ let admin_service ~loader ~restore t =
let pool name =
match String.Map.find_opt name t.pools with
| None -> Capability.broken (Capnp_rpc.Exception.v "No such pool")
| Some pool_api -> Pool_api.admin_service pool_api
| Some pool_api ->
let validate_client id =
match Sqlite_loader.lookup_by_descr loader (Client, id) with
| [] -> false
| _ -> true
in
Pool_api.admin_service ~validate_client pool_api
in
let add_client name =
let descr = (Client, name) in
Expand All @@ -193,6 +241,7 @@ let admin_service ~loader ~restore t =
match Sqlite_loader.lookup_by_descr loader descr with
| [digest] ->
Sqlite_loader.remove loader digest;
t.pools |> String.Map.iter (fun _ -> Pool_api.remove_client ~client_id:name);
Log.info (fun f -> f "Removed endpoint for client %S" name);
Lwt_result.return ()
| [] -> Lwt_result.fail (`Capnp (Capnp_rpc.Error.exn "Unknown client %S" name))
Expand Down
2 changes: 1 addition & 1 deletion scheduler/dune
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
(library
(name cluster_scheduler)
(libraries ocluster-api logs capnp-rpc-lwt capnp-rpc-net lwt-dllist prometheus db lwt.unix))
(libraries ocluster-api logs capnp-rpc-lwt capnp-rpc-net lwt-dllist prometheus db lwt.unix psq))
Loading

0 comments on commit 6e6e2b2

Please sign in to comment.