-
Notifications
You must be signed in to change notification settings - Fork 20
/
clusterFunctionsSSH.R
144 lines (137 loc) · 5.87 KB
/
clusterFunctionsSSH.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#' Create SSH worker for SSH cluster functions.
#'
#' @param nodename [\code{character(1)}]\cr
#' Host name of node.
#' @param ssh.cmd [\code{character(1)})]\cr
#' CLI command to ssh into remote node.
#' Default is \dQuote{ssh}.
#' @param ssh.args [\code{character}]\cr
#' CLI args for \code{ssh.cmd}.
#' Default is none.
#' @param rhome [\code{character(1)}]\cr
#' Path to R installation on worker.
#' \dQuote{} means R installation on the PATH is used,
#' of course this implies that it must be on the PATH
#' (also for non-interactive shells)!
#' Default is \dQuote{}.
#' @param ncpus [\code{integers(1)}]\cr
#' Number of VPUs of worker.
#' Default means to query the worker via \dQuote{/proc/cpuinfo}.
#' @param max.jobs [\code{integer(1)}]\cr
#' Maximal number of jobs that can run concurrently for the current registry.
#' Default is \code{ncpus}.
#' @param max.load [\code{numeric(1)}]\cr
#' Load average (of the last 5 min) at which the worker is considered occupied,
#' so that no job can be submitted.
#' Default is \code{ncpus-1}.
#' @param nice [\code{integer(1)}]\cr
#' Process priority to run R with set via nice. Integers between -20 and 19 are allowed.
#' If missing, processes are not nice'd and the system default applies (usually 0).
#' Default is no niceing.
#' @param r.options [\code{character}]\cr
#' Options for R and Rscript, one option per element of the vector,
#' a la \dQuote{--vanilla}.
#' Default is \code{c("--no-save", "--no-restore", "--no-init-file", "--no-site-file")}.
#' @param script [\code{character(1)}]\cr
#' Path to helper bash script which interacts with the worker.
#' You really should not have to touch this, as this would imply that we have screwed up and
#' published an incompatible version for your system.
#' This option is only provided as a last resort for very experienced hackers.
#' Note that the path has to be absolute.
#' This is what is done in the package:
#' \url{https://github.com/tudo-r/BatchJobs/blob/master/inst/bin/linux-helper}
#' Default means to take it from package directory.
#' @return [\code{\link{SSHWorker}}].
#' @export
#' @aliases SSHWorker
makeSSHWorker = function(nodename, ssh.cmd = "ssh", ssh.args = character(0L), rhome = "", ncpus, max.jobs, max.load, nice,
r.options = c("--no-save", "--no-restore", "--no-init-file", "--no-site-file"), script) {
worker = makeWorkerRemoteLinux(nodename, ssh.cmd, ssh.args, rhome, r.options, script, ncpus, max.jobs, max.load, nice)
class(worker) = c("SSHWorker", class(worker))
return(worker)
}
#' Create an SSH cluster to execute jobs.
#'
#' Worker nodes must share the same file system and be accessible by ssh
#' without manually entering passwords (e.g. by ssh-agent or passwordless pubkey).
#' Note that you can also use this function to parallelize on multiple cores on your local machine.
#' But you still have to run an ssh server and provide passwordless access to
#' localhost.
#'
#' @param ... [\code{\link{SSHWorker}}]\cr
#' Worker objects, all created with \code{\link{makeSSHWorker}}.
#' @param workers [list of \code{\link{SSHWorker}}]\cr
#' Alternative way to pass workers.
#' @return [\code{ClusterFunctions}].
#'
#' @examples \dontrun{
#'
#' # Assume you have three nodes larry, curley and moe. All have 6
#' # cpu cores. On curley and moe R is installed under
#' # "/opt/R/R-current" and on larry R is installed under
#' # "/usr/local/R/". larry should not be used extensively because
#' # somebody else wants to compute there as well.
#' # Then a call to 'makeClusterFunctionsSSH'
#' # might look like this:
#'
#' cluster.functions = makeClusterFunctionsSSH(
#' makeSSHWorker(nodename = "larry", rhome = "/usr/local/R", max.jobs = 2),
#' makeSSHWorker(nodename = "curley", rhome = "/opt/R/R-current"),
#' makeSSHWorker(nodename = "moe", rhome = "/opt/R/R-current"))
#' }
#' @export
#' @family clusterFunctions
#' @seealso \code{\link{makeSSHWorker}}
makeClusterFunctionsSSH = function(..., workers) {
args = list(...)
if (!xor(length(args) > 0L, !missing(workers)))
stop("You must use exactly only 1 of: '...', 'workers'!")
if (missing(workers))
workers = args
checkListElementClass(workers, "SSHWorker")
if (length(workers) == 0L)
stop("You must pass at least 1 SSH worker!")
nodenames = extractSubList(workers, "nodename")
dup = duplicated(nodenames)
if (any(dup))
stopf("Multiple definitions for worker nodenames: %s!", collapse(nodenames[dup]))
names(workers) = nodenames
rm(nodenames, dup)
submitJob = function(conf, reg, job.name, rscript, log.file, job.dir, resources, arrayjobs) {
worker = findWorker(workers, reg$file.dir, tdiff = 5L)
if (is.null(worker)) {
states = collapse(extractSubList(workers, "available", simplify = TRUE), sep = "")
makeSubmitJobResult(status = 1L, batch.job.id = NULL,
msg = sprintf("Workers busy: %s", states))
} else {
pid = try(startWorkerJob(worker, rscript, log.file))
if (is.error(pid))
makeSubmitJobResult(status = 101L, batch.job.id = NULL, msg = "Submit failed.")
else
makeSubmitJobResult(status = 0L,batch.job.id = paste0(worker$nodename, "#", pid))
}
}
killJob = function(conf, reg, batch.job.id) {
parts = stri_split_fixed(batch.job.id, "#")[[1L]]
nodename = parts[1L]
pid = parts[2L]
worker = workers[[nodename]]
if (is.null(worker))
stopf("Unknown worker node '%s'.", nodename)
killWorkerJob(worker, pid)
}
listJobs = function(conf, reg) {
res = NULL
for (worker in workers) {
nodename = worker[["nodename"]]
pids = listWorkerJobs(worker, reg$file.dir)
if (length(pids) > 0L) {
res = c(res, paste0(nodename, "#", pids))
}
}
res
}
getArrayEnvirName = function() NA_character_
makeClusterFunctions(name = "SSH", submitJob = submitJob, killJob = killJob,
listJobs = listJobs, getArrayEnvirName = getArrayEnvirName)
}