-
Notifications
You must be signed in to change notification settings - Fork 1
/
tools.py
145 lines (110 loc) · 4.99 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import shutil
import os
import sys
import numpy as np
import _pickle as cpickle
import subprocess
def check_directory_exists(directory):
if not os.path.exists(directory):
print("ERROR: the directory {} doesn't exist.".format(directory))
sys.exit()
def create_directory(directory, safe=True):
"""Create the directory or ask permission to overwrite it if it already exists"""
if os.path.exists(directory) and safe:
ans = input("> WARNING: The folder {} already exists; do you want to add new trials? [y,n]: ".format(directory))
if ans in ["y", "Y", "yes", "Yes", "YES"]:
return
else:
ans = input("> WARNING: Do you want to overwrite the folder {}? [y,n]: ".format(directory))
if ans in ["y", "Y", "yes", "Yes", "YES"]:
shutil.rmtree(directory)
else:
print("exiting the program")
sys.exit()
os.makedirs(directory)
return True
def get_git_hash():
try:
binary_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]).strip()
hash = binary_hash.decode("utf-8")
except:
hash = "no git commit"
return hash
def load_sensorimotor_transitions(data_directory, n_transitions=None):
"""
Loads sensorimotor transitions from a file created by generate_sensorimotor_data.py.
Returns the data in a dictionary.
"""
# check dir_data
check_directory_exists(data_directory)
print("loading sensorimotor data from {}...".format(data_directory))
with open(data_directory, 'rb') as f:
data = cpickle.load(f)
# identify potential NaN entries
to_discard = np.argwhere(np.logical_or(np.isnan(data["sensor_t"][:, 0]), np.isnan(data["sensor_tp"][:, 0])))
# remove NaN entries
for i in ["motor_t", "sensor_t", "shift_t", "motor_tp", "sensor_tp", "shift_tp"]:
data[i] = np.delete(data[i], to_discard, axis=0)
# get the number of transitions
k = data["motor_t"].shape[0]
# reduce the size of the dataset if necessary
if n_transitions is None:
n_transitions = k
elif n_transitions < k:
to_discard = np.arange(n_transitions, k)
for i in ["motor_t", "sensor_t", "shift_t", "motor_tp", "sensor_tp", "shift_tp"]:
data[i] = np.delete(data[i], to_discard, axis=0)
else:
n_transitions = k
print("Warning: the requested number of data is greater than the size of the dataset.")
print("loaded {} sensorimotor data".format(n_transitions))
return data
def load_compressed_sensorimotor_transitions(data_directory, n_transitions=None):
"""
Loads compressed sensorimotor transitions from a npz file created by generate_sensorimotor_data.py.
Returns the data in a dictionary.
"""
# check dir_data
check_directory_exists(data_directory)
print("loading sensorimotor data from {}...".format(data_directory))
with np.load(data_directory) as npzfile:
data = dict(zip(npzfile.files, [npzfile[x] for x in npzfile.files]))
# identify potential NaN entries
to_discard = np.argwhere(np.logical_or(np.isnan(data["sensor_t"][:, 0]), np.isnan(data["sensor_tp"][:, 0])))
# remove NaN entries
for i in ["motor_t", "sensor_t", "shift_t", "motor_tp", "sensor_tp", "shift_tp"]:
data[i] = np.delete(data[i], to_discard, axis=0)
# get the number of transitions
k = data["motor_t"].shape[0]
# reduce the size of the dataset if necessary
if n_transitions is None:
n_transitions = k
elif n_transitions < k:
to_discard = np.arange(n_transitions, k)
for i in ["motor_t", "sensor_t", "shift_t", "motor_tp", "sensor_tp", "shift_tp"]:
data[i] = np.delete(data[i], to_discard, axis=0)
else:
n_transitions = k
print("Warning: the requested number of data is greater than the size of the dataset.")
print("loaded {} sensorimotor data".format(n_transitions))
return data
def normalize_data(data):
"""
Normalize the data such that the motor and sensor components are in [-1, 1]
We don't normalize the positions of the sensor and shift of the environment, to keep the real scale of the external space.
"""
# get the min/max of the motor configurations, sensations, and shifts
motor_min = np.nanmin(data["motor_t"], axis=0)
motor_max = np.nanmax(data["motor_t"], axis=0)
#
sensor_min = np.nanmin(data["sensor_t"], axis=0)
sensor_max = np.nanmax(data["sensor_t"], axis=0)
# normalize the data in [-1, 1]
data["motor_t"] = 2 * (data["motor_t"] - motor_min) / (motor_max - motor_min) - 1
data["motor_tp"] = 2 * (data["motor_tp"] - motor_min) / (motor_max - motor_min) - 1
#
data["sensor_t"] = 2 * (data["sensor_t"] - sensor_min) / (sensor_max - sensor_min) - 1
data["sensor_tp"] = 2 * (data["sensor_tp"] - sensor_min) / (sensor_max - sensor_min) - 1
# normalize the grid of motor configurations
data["grid_motor"] = 2 * (data["grid_motor"] - motor_min) / (motor_max - motor_min) - 1
return data