-
Notifications
You must be signed in to change notification settings - Fork 0
/
redis_test.py
86 lines (63 loc) · 2.44 KB
/
redis_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from mpi4py import MPI
import numpy as np
import argparse
import platform
import pickle
from redis_q import RedisQueue
import redis
import sys
import time
from funcx.serialize import FuncXSerializer
fxs = FuncXSerializer()
def avg(x):
return sum(x)/len(x)
def process_numpy(serialized):
data = deserialize(serialized_data)
return serialize(data.sum())
def serialize(data):
return fxs.serialize(data)
def deserialize(data):
return fxs.deserialize(data)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--size", default='100',
help="Size of nd array")
parser.add_argument("-n", "--ntimes", default='10',
help="Number of times to repeat")
parser.add_argument("-r", "--redis", default='127.0.0.1',
help="redis host address")
args = parser.parse_args()
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print("Current process rank: {}, total ranks: {}".format(rank, size))
# redis client
rc = redis.StrictRedis(host=args.redis, port=6379, decode_responses=True)
if rank == 0:
# Raw numpy in process
raw = np.random.rand(int(args.size), 1)
start = time.time()
serialized_data = serialize(raw)
rc.hset('1', 'task', serialized_data)
rc.rpush('task_list', '1')
delta = time.time() - start
print(f"[Rank:0, len={len(raw)}] Cum time to put: {delta*1000:8.3f}ms")
x = rc.blpop('result_list', timeout=0)
result_list, result_id = x
result = rc.hget(result_id, 'result')
delta = time.time() - start
print(f"[Rank:0, len={len(raw)}] Time to send and receive results: {delta*1000:8.3f}ms")
print(f"Got result: {result}, deserialized: {deserialize(result)}")
else:
x = rc.blpop('task_list', timeout=0)
task_list, task_id = x
serialized_data = rc.hget(task_id, 'task')
print("[TASKS] Got task_id {}".format(task_id))
start = time.time()
serialized_result = process_numpy(serialized_data)
delta = time.time() - start
print(f"[Rank:0, len={args.size}] Time on worker process: {delta*1000:8.3f}ms")
rc.rpush('result_list', '1')
rc.hset('1', 'result', serialized_result)
delta = time.time() - start
print(f"[Rank:0, len={args.size}] Recv,deserialize,send: {delta*1000:8.3f}ms")