-
Notifications
You must be signed in to change notification settings - Fork 84
/
multiprocess.py
114 lines (109 loc) · 4.22 KB
/
multiprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# Linear Partitions [20.4.1]
import pandas as pd
import numpy as np
import time
import sys
def linParts(numAtoms,numThreads):
# partition of atoms with a single loop
parts=np.linspace(0,numAtoms,min(numThreads,numAtoms)+1)
parts=np.ceil(parts).astype(int)
return parts
def nestedParts(numAtoms,numThreads,upperTriang=False):
# partition of atoms with an inner loop
parts,numThreads_=[0],min(numThreads,numAtoms)
for num in range(numThreads_):
part=1+4*(parts[-1]**2+parts[-1]+numAtoms*(numAtoms+1.)/numThreads_)
part=(-1+part**.5)/2.
parts.append(part)
parts=np.round(parts).astype(int)
if upperTriang: # the first rows are heaviest
parts=np.cumsum(np.diff(parts)[::-1])
parts=np.append(np.array([0]),parts)
return parts
def mpPandasObj(func,pdObj,numThreads=24,mpBatches=1,linMols=True,**kargs):
'''
Parallelize jobs, return a dataframe or series
+ func: function to be parallelized. Returns a DataFrame
+ pdObj[0]: Name of argument used to pass the molecule
+ pdObj[1]: List of atoms that will be grouped into molecules
+ kwds: any other argument needed by func
Example: df1=mpPandasObj(func,('molecule',df0.index),24,**kwds)
'''
import pandas as pd
#if linMols:parts=linParts(len(argList[1]),numThreads*mpBatches)
#else:parts=nestedParts(len(argList[1]),numThreads*mpBatches)
if linMols:parts=linParts(len(pdObj[1]),numThreads*mpBatches)
else:parts=nestedParts(len(pdObj[1]),numThreads*mpBatches)
jobs=[]
for i in range(1,len(parts)):
job={pdObj[0]:pdObj[1][parts[i-1]:parts[i]],'func':func}
job.update(kargs)
jobs.append(job)
if numThreads==1:out=processJobs_(jobs)
else: out=processJobs(jobs,numThreads=numThreads)
if isinstance(out[0],pd.DataFrame):df0=pd.DataFrame()
elif isinstance(out[0],pd.Series):df0=pd.Series()
else:return out
for i in out:df0=df0.append(i)
df0=df0.sort_index()
return df0
# =======================================================
# single-thread execution for debugging [20.8]
def processJobs_(jobs):
# Run jobs sequentially, for debugging
out=[]
for job in jobs:
out_=expandCall(job)
out.append(out_)
return out
# =======================================================
# Example of async call to multiprocessing lib [20.9]
import multiprocessing as mp
import datetime as dt
#________________________________
def reportProgress(jobNum,numJobs,time0,task):
# Report progress as asynch jobs are completed
msg=[float(jobNum)/numJobs, (time.time()-time0)/60.]
msg.append(msg[1]*(1/msg[0]-1))
timeStamp=str(dt.datetime.fromtimestamp(time.time()))
msg=timeStamp+' '+str(round(msg[0]*100,2))+'% '+task+' done after '+ \
str(round(msg[1],2))+' minutes. Remaining '+str(round(msg[2],2))+' minutes.'
if jobNum<numJobs:sys.stderr.write(msg+'\r')
else:sys.stderr.write(msg+'\n')
return
#________________________________
def processJobs(jobs,task=None,numThreads=24):
# Run in parallel.
# jobs must contain a 'func' callback, for expandCall
if task is None:task=jobs[0]['func'].__name__
pool=mp.Pool(processes=numThreads)
outputs,out,time0=pool.imap_unordered(expandCall,jobs),[],time.time()
# Process asyn output, report progress
for i,out_ in enumerate(outputs,1):
out.append(out_)
reportProgress(i,len(jobs),time0,task)
pool.close();pool.join() # this is needed to prevent memory leaks
return out
# =======================================================
# Unwrapping the Callback [20.10]
def expandCall(kargs):
# Expand the arguments of a callback function, kargs['func']
func=kargs['func']
del kargs['func']
out=func(**kargs)
return out
# =======================================================
# Pickle Unpickling Objects [20.11]
def _pickle_method(method):
func_name=method.im_func.__name__
obj=method.im_self
cls=method.im_class
return _unpickle_method, (func_name,obj,cls)
#________________________________
def _unpickle_method(func_name,obj,cls):
for cls in cls.mro():
try:func=cls.__dict__[func_name]
except KeyError:pass
else:break
return func.__get__(obj,cls)
#________________________________