-
Notifications
You must be signed in to change notification settings - Fork 0
/
optimize.py
113 lines (77 loc) · 2.31 KB
/
optimize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from __future__ import annotations
import builtins
class FakeOpen:
_open = builtins.open
_lines = []
_line_count = 0
_content = None
_skip = lambda *_: None
write = _skip
close = _skip
__exit__ = _skip
def __iter__(self):
return iter(self._lines)
def __call__(self, *_) -> FakeOpen:
return self
__enter__ = __call__
@classmethod
def get_file_line(cls, lineno):
if lineno > cls._line_count:
return "\n"
return cls._lines[lineno - 1]
@classmethod
def set_size(cls, size):
print('set size to', cls._line_count)
cls._lines = ['\n'] * size
cls._line_count = size
cls._content = None
@classmethod
def write_block_to_disk(cls, blockn, content):
cls._lines[blockn - 1] = str(content or '\n')
cls._content = None
@classmethod
def read(cls):
if cls._content is not None:
return cls._content
cls._content = ''.join(cls._lines)
return cls._content
fo = FakeOpen()
def hook_disk(disk):
def init(func):
def wrapped(disk_name, disk_size, cache_size):
fo.set_size(disk_size)
return func(disk_name, disk_size, cache_size)
return wrapped
disk.init = init(disk.init)
disk.fs.fs.get_file_line = fo.get_file_line
disk.fs.fs.write_block_to_disk = fo.write_block_to_disk
builtins.open = fo
def nuke_blockcache_system(disk):
_blocks = {}
_next_block = 0
def write_block(_, blockn, content):
nonlocal _next_block
if _next_block < blockn:
_next_block = blockn
print("Write block", blockn)
_blocks[blockn] = content
def read_block(self, blockn):
print("Read block", blockn)
blk = _blocks.get(blockn)
if blk is not None:
return blk
blk = self.read_block_from_disk(blockn)
self.write(blockn, blk)
return blk
def find_free_block(_):
return _next_block + 1
disk.fs.fs.read_block = read_block
disk.fs.fs.write_block = write_block
disk.fs.fs.get_cache_for = read_block
disk.fs.fs.find_free_block = find_free_block
def override_copy(disk):
class FakeCopy:
@staticmethod
def deepcopy(n):
return n
disk.fs.copy = FakeCopy