-
Notifications
You must be signed in to change notification settings - Fork 1
/
do_batch.m
164 lines (127 loc) · 5.13 KB
/
do_batch.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
function do_batch(pars, func, No_xpts)
% do a general batch
%
% PARS is a cell array
%
% FUNC is a string defining the function to be used
%
% No_xpts is the number of experiments to do
%
file_id = 1; % could make this something else if we want to make a logfile of progress
% could be useful in overcoming loss of connectivity to
% cluster logged us out?
No_outputs = 0;
fprintf(file_id, '\n\n\n starting batch \n\n\n');
%%%
sched = findResource('scheduler','type','generic');
set(sched, 'SubmitFcn', @submitFunc);
%%
Max_No_concurrent_tasks = 25;
xpt_No_start = 1;
FreedSpace = Max_No_concurrent_tasks;
check_interval = 90; % seconds between checking for free space
Job_no = 1;
Running_tasks = [];
All_tasks = 1:No_xpts;
Pending_tasks = All_tasks;
Finished_tasks = [];
while ~isempty(Pending_tasks)
% Break tasks into bactches
if ~isempty(setdiff(All_tasks, Pending_tasks)) % not first batch
% Cycle through running tasks until find tasks that are finished:
fprintf(file_id, '\n\nWaiting for space to become available \n\n');
finishedtasks = [];
while isempty(finishedtasks)
for z=1:length(Running_tasks)
xpt_No = Running_tasks(z);
if isequal(task_handle(xpt_No).State, 'finished')
finishedtasks = [finishedtasks xpt_No];
end
end
pause(check_interval); % Wait for a bit...
end
% update sets
Running_tasks = setdiff(Running_tasks, finishedtasks);
FreedSpace = length(finishedtasks);
Finished_tasks = union(Finished_tasks, finishedtasks);
% report
log_string = [datestr(now) ' xpts {' num2str(finishedtasks) '} completed '];
fprintf(file_id, '%s\n', log_string);
% check for errors in finished tasks
for j=1:length(finishedtasks)
xpt_No = finishedtasks(j) ;
err = get(task_handle(xpt_No), 'ErrorMessage');
if ~isempty(err)
fprintf(file_id, 'Error in Xpt %d\n\n', xpt_No);
fprintf(file_id, '%s\n', err);
end
end
end
% Create a new SubBatch
xpt_No_end = xpt_No_start + FreedSpace - 1;
if xpt_No_end > No_xpts
xpt_No_end = No_xpts;
end
SubBatch = [xpt_No_start: xpt_No_end];
xpt_No_start = xpt_No_end + 1;
%%%%%%%%%%%%%%% Run Each SubBatch as a Seperate Job %%%%%%%%%%%%%%%%%
jobs(Job_no) = createJob(sched);
fprintf(file_id, 'creating tasks for job %d \n \n', Job_no)
task_string = ['task_handle(xpt_no) = createTask(jobs(Job_no), @' func ', No_outputs, pars{xpt_no});'];
for h = 1:length(SubBatch);
xpt_no = SubBatch(h);
eval(task_string);
%task_handle(xpt_no) = createTask(jobs(Job_no), @STDE_shen_batch, No_outputs, pars{xpt_no});
end
submit(jobs(Job_no));
log_string = [datestr(now) ' Submitting xpts{ ' num2str(SubBatch) '} as job ' num2str(Job_no)];
fprintf(file_id, '%s\n', log_string);
Running_tasks = union(Running_tasks, SubBatch);
Pending_tasks = setdiff(Pending_tasks, Running_tasks);
log_string = ['Summary - Pending tasks {' num2str(Pending_tasks) '}'];
fprintf(file_id, '%s\n', log_string);
log_string = ['Summary - Running tasks {' num2str(Running_tasks) '}'];
fprintf(file_id, '%s\n', log_string);
log_string = ['Summary - Completed tasks {' num2str(Finished_tasks) '}'];
fprintf(file_id, '%s\n', log_string);
Job_no = Job_no + 1;
end
%%%%%%%%%%%% wait for final running tasks to finish before proceeding %%%%%%%%%%
while ~isempty(Running_tasks)
% pick up any finished tasks
finishedtasks = [];
for z=1:length(Running_tasks)
xpt_No = Running_tasks(z);
if isequal(task_handle(xpt_No).State, 'finished')
finishedtasks = [finishedtasks xpt_No];
end
end
if ~isempty(finishedtasks)
Finished_tasks = union(Finished_tasks, finishedtasks);
Running_tasks = setdiff(Running_tasks, finishedtasks);
log_string = [datestr(now) ' xpts ' num2str(finishedtasks) ' completed '];
fprintf(file_id, '%s\n', log_string);
log_string = ['Summary - Completed tasks {' num2str(Finished_tasks) '}'];
fprintf(file_id, '%s\n', log_string);
% check for errors
for j=1:length(finishedtasks)
xpt_No = finishedtasks(j) ;
err = get(task_handle(xpt_No), 'ErrorMessage');
if ~isempty(err)
fprintf(file_id, 'Error in Xpt %d\n\n', xpt_No);
fprintf(file_id, '%s\n', err);
end
end
end
pause(check_interval); % Wait for a bit...
end
% final report
log_string = ['Summary - Pending tasks {' num2str(Pending_tasks) '}'];
fprintf(file_id, '%s\n', log_string);
log_string = ['Summary - Running tasks {' num2str(Running_tasks) '}'];
fprintf(file_id, '%s\n', log_string);
log_string = ['Summary - Completed tasks {' num2str(Finished_tasks) '}'];
fprintf(file_id, '%s\n', log_string);
pause(10); % just to be sure .....
all_jobs = get(sched,'Jobs');
destroy(all_jobs);