-
Notifications
You must be signed in to change notification settings - Fork 65
/
one_shot_learning.py
156 lines (140 loc) · 7 KB
/
one_shot_learning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from utils import OmniglotDataLoader, one_hot_decode, five_hot_decode
import tensorflow as tf
import argparse
import numpy as np
from model import NTMOneShotLearningModel
from tensorflow.python import debug as tf_debug
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--mode', default="train")
parser.add_argument('--restore_training', default=False)
parser.add_argument('--debug', default=False)
parser.add_argument('--label_type', default="one_hot", help='one_hot or five_hot')
parser.add_argument('--n_classes', default=5)
parser.add_argument('--seq_length', default=50)
parser.add_argument('--augment', default=True)
parser.add_argument('--model', default="MANN", help='LSTM, MANN, MANN2 or NTM')
parser.add_argument('--read_head_num', default=4)
parser.add_argument('--batch_size', default=16)
parser.add_argument('--num_epoches', default=100000)
parser.add_argument('--learning_rate', default=1e-3)
parser.add_argument('--rnn_size', default=200)
parser.add_argument('--image_width', default=20)
parser.add_argument('--image_height', default=20)
parser.add_argument('--rnn_num_layers', default=1)
parser.add_argument('--memory_size', default=128)
parser.add_argument('--memory_vector_dim', default=40)
parser.add_argument('--shift_range', default=1, help='Only for model=NTM')
parser.add_argument('--write_head_num', default=1, help='Only for model=NTM. For MANN #(write_head) = #(read_head)')
parser.add_argument('--test_batch_num', default=100)
parser.add_argument('--n_train_classes', default=1200)
parser.add_argument('--n_test_classes', default=423)
parser.add_argument('--save_dir', default='./save/one_shot_learning')
parser.add_argument('--tensorboard_dir', default='./summary/one_shot_learning')
args = parser.parse_args()
if args.mode == 'train':
train(args)
elif args.mode == 'test':
test(args)
def train(args):
model = NTMOneShotLearningModel(args)
data_loader = OmniglotDataLoader(
image_size=(args.image_width, args.image_height),
n_train_classses=args.n_train_classes,
n_test_classes=args.n_test_classes
)
with tf.Session() as sess:
if args.debug:
sess = tf_debug.LocalCLIDebugWrapperSession(sess)
if args.restore_training:
saver = tf.train.Saver()
ckpt = tf.train.get_checkpoint_state(args.save_dir + '/' + args.model)
saver.restore(sess, ckpt.model_checkpoint_path)
else:
saver = tf.train.Saver(tf.global_variables())
tf.global_variables_initializer().run()
train_writer = tf.summary.FileWriter(args.tensorboard_dir + '/' + args.model, sess.graph)
print(args)
print("1st\t2nd\t3rd\t4th\t5th\t6th\t7th\t8th\t9th\t10th\tbatch\tloss")
for b in range(args.num_epoches):
# Test
if b % 100 == 0:
x_image, x_label, y = data_loader.fetch_batch(args.n_classes, args.batch_size, args.seq_length,
type='test',
augment=args.augment,
label_type=args.label_type)
feed_dict = {model.x_image: x_image, model.x_label: x_label, model.y: y}
output, learning_loss = sess.run([model.o, model.learning_loss], feed_dict=feed_dict)
merged_summary = sess.run(model.learning_loss_summary, feed_dict=feed_dict)
train_writer.add_summary(merged_summary, b)
# state_list = sess.run(model.state_list, feed_dict=feed_dict) # For debugging
# with open('state_long.txt', 'w') as f:
# print(state_list, file=f)
accuracy = test_f(args, y, output)
for accu in accuracy:
print('%.4f' % accu, end='\t')
print('%d\t%.4f' % (b, learning_loss))
# Save model
if b % 5000 == 0 and b > 0:
saver.save(sess, args.save_dir + '/' + args.model + '/model.tfmodel', global_step=b)
# Train
x_image, x_label, y = data_loader.fetch_batch(args.n_classes, args.batch_size, args.seq_length,
type='train',
augment=args.augment,
label_type=args.label_type)
feed_dict = {model.x_image: x_image, model.x_label: x_label, model.y: y}
sess.run(model.train_op, feed_dict=feed_dict)
def test(args):
model = NTMOneShotLearningModel(args)
data_loader = OmniglotDataLoader(
image_size=(args.image_width, args.image_height),
n_train_classses=args.n_train_classes,
n_test_classes=args.n_test_classes
)
saver = tf.train.Saver()
ckpt = tf.train.get_checkpoint_state(args.save_dir + '/' + args.model)
with tf.Session() as sess:
saver.restore(sess, ckpt.model_checkpoint_path)
print("Test Result\n1st\t2nd\t3rd\t4th\t5th\t6th\t7th\t8th\t9th\t10th\tloss")
y_list = []
output_list = []
loss_list = []
for b in range(args.test_batch_num):
x_image, x_label, y = data_loader.fetch_batch(args.n_classes, args.batch_size, args.seq_length,
type='test',
augment=args.augment,
label_type=args.label_type)
feed_dict = {model.x_image: x_image, model.x_label: x_label, model.y: y}
output, learning_loss = sess.run([model.o, model.learning_loss], feed_dict=feed_dict)
y_list.append(y)
output_list.append(output)
loss_list.append(learning_loss)
accuracy = test_f(args, np.concatenate(y_list, axis=0), np.concatenate(output_list, axis=0))
for accu in accuracy:
print('%.4f' % accu, end='\t')
print(np.mean(loss_list))
def test_f(args, y, output):
correct = [0] * args.seq_length
total = [0] * args.seq_length
if args.label_type == 'one_hot':
y_decode = one_hot_decode(y)
output_decode = one_hot_decode(output)
elif args.label_type == 'five_hot':
y_decode = five_hot_decode(y)
output_decode = five_hot_decode(output)
for i in range(np.shape(y)[0]):
y_i = y_decode[i]
output_i = output_decode[i]
# print(y_i)
# print(output_i)
class_count = {}
for j in range(args.seq_length):
if y_i[j] not in class_count:
class_count[y_i[j]] = 0
class_count[y_i[j]] += 1
total[class_count[y_i[j]]] += 1
if y_i[j] == output_i[j]:
correct[class_count[y_i[j]]] += 1
return [float(correct[i]) / total[i] if total[i] > 0. else 0. for i in range(1, 11)]
if __name__ == '__main__':
main()