-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathshared_tuple_list.py
More file actions
237 lines (193 loc) · 8.95 KB
/
shared_tuple_list.py
File metadata and controls
237 lines (193 loc) · 8.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""
SharedTupleList (maybe better named as ShareableTupleList) uses ShareableList elements to emulate an array of tuple
that can be shared across multiple processes via SharedMemory.
Note: requires Python >= 3.8 due to usage of multiprocessing.shared_memory
(c) 2020 Gerd Langer
"""
from multiprocessing.shared_memory import ShareableList
class SharedTupleList:
"""
SharedTupleList uses a list of ShareableList to emulate a list of tuples.
The fields tuple passed is used as prototype and default value.
Each field of the tuple will end up in a separate ShareableList with its
own SharedMemory.
Optionally, each tuple field might be named passed field_names.
"""
# Constructor and convenience factory method
def __init__(self, fields, field_names=None, initializer=None, size=None, shm_names=None):
"""
Create a ShareTupleList newly or as reference to an existing one.
Optionally, each tuple field might be named passed field_names.
Either pass the size to create the ShareTupleList in a new SharedMemory,
or pass a lambda function as initializer, which has an index for tuple fields as parameter and
defines a generator with the initial values for each tuple, i.e. tuple fields.
Obviously, the generator must create the same amount of elements for each tuple field, e.g.
initializer=(lambda idx:[(x, x+1)[idx] for x in range(100)]) for 100 two-member tuples.
Alternatively pass the list of names of existing SharedMemory using the same order
as the elements of the fields parameter, i.e. these SharedMemory is now shared.
:param fields: initializer tuple
:param field_names: optionally to get, set fields of tuple by name
:param initializer: a lambda function with index (field index) as argument and a generator as function body
:param size: number of tuples in the list (storage capacity), used to create a list
:param shm_names: pass the names of the existing list (SharedMemory)
"""
assert (type(fields) == tuple), "passed fields must be a tuple"
assert (size is not None or shm_names is not None or initializer is not None), \
"pass size, initializer or shm_names"
self._create_shm = shm_names is None
self._size = size
self._prototype = fields
self._tuple_len = len(fields)
self._fields_type = [type(f) for f in fields]
self._fields_smm_name = []
self._fields_list = []
self._fields_name = field_names
if field_names is not None:
self._fields_names_map = {n: i for (i, n) in enumerate(field_names)}
for i, f in enumerate(fields):
if self._create_shm:
if initializer is None:
shl = ShareableList([f for _ in range(size)])
else:
shl = ShareableList(initializer(i))
else:
shl = ShareableList(name=shm_names[i])
if self._size is None:
self._size = len(shl)
self._fields_list.append(shl)
self._fields_smm_name.append(shl.shm.name)
@staticmethod
def create_by_ref(stl):
"""
create_by_ref generates a new ShareTupleList based on the passed stl.
The newly created list uses the SharedMemory of the passed stl.
:param stl: the stl used to make a copy of it using the same shared memory
:return: a new SharedTupleList
"""
return SharedTupleList(stl.get_prototype(), stl.get_field_names(), shm_names=stl.get_shm_names())
# internal methods for field name to index handling
def _item2idx(self, item):
"""_item2idx return the item itself, if it is an int value or the index of the passed field_name"""
if type(item) == int:
return item
else:
return self._fields_names_map.get(item)
def _get_list(self, item):
"""_get_list returns the ShareableList of the given index or field_name"""
return self._fields_list[self._item2idx(item)]
# shared memory name retrieval
def get_shm_name(self, item):
"""get_shm_name returns the name of the underlying shared memory for the index or field_name"""
return self._fields_smm_name[self._item2idx(item)]
def get_shm_names(self):
"""get_shm_names returns all names of the underlying shared memories in order of the tuple fields"""
return self._fields_smm_name.copy()
# meta info retrieval
def get_prototype(self):
"""get_prototype returns the original tuple used to define and initiate the SharedTupleList"""
return self._prototype
def get_field_name(self, item):
"""get_field_name returns the name of the tuple's field at the given index (or name ... itself :-))"""
return self._fields_name[self._item2idx(item)]
def get_field_names(self):
"""get_field_names returns all names of tuple's fields in order of the tuple fields"""
if self._fields_name is None:
return None
else:
return self._fields_name.copy()
def get_field_type(self, item):
"""get_field_type returns the type of the tuple's field at the given index or name"""
return self._fields_type[self._item2idx(item)]
def get_field_types(self):
"""get_field_type returns all types of the tuple's fields in order of the tuple fields"""
return self._fields_type.copy()
def get_tuple_len(self):
"""get_tuple_len returns the number of fields of the original tuple (the prototype)"""
return self._tuple_len
def get_width(self):
"""get_width it the same as get_tuple_len"""
return self.get_tuple_len()
def __len__(self):
"""__len__ is the storage capacity in number of tuples in this SharedTupleList"""
return self._size
# index-based getting and setting of whole tuples
def __getitem__(self, idx):
"""__getitem__ allows to get the tuple at the given index.
Note: as the result is a tuple, the returned tuple cannot be used to change values in the list."""
# reconstruct the tuple from the different internal ShareableLists
return tuple(self._fields_list[i][idx] for i in range(self._tuple_len))
def __setitem__(self, idx, value):
"""__setitem__ sets the values of the tuple at the given index"""
# distribute the tuple across the different internal ShareableLists
for i in range(self._tuple_len):
self._fields_list[i][idx] = value[i]
# index-based getting and setting of tuple fields
def get(self, idx, item):
"""
get returns the value of the passed field (index or name) of the tuple at the given idx
:param idx: the tuple's index
:param item: the field index inside the tuple or field name
:return: the field of the addressed tuple by index or field name
"""
return self._get_list(item)[idx]
def set(self, idx, item, value) -> None:
"""
set sets the value of the passed field (index or name) of the tuple at the given idx to the passed value
:param idx: the tuple's index
:param item: the field index inside the tuple or field name
:param value: the value
"""
self._get_list(item)[idx] = value
# shared memory clean ups#
def close(self):
"""close the attached shared memory of all referenced ShareableLists"""
for i in range(self._tuple_len):
self._get_list(i).shm.close()
def unlink(self):
"""unlink of attached shared memory of all referenced ShareableLists,
if shared memory was created by this instance"""
if self._create_shm:
for i in range(self._tuple_len):
self._get_list(i).shm.unlink()
if __name__ == '__main__':
"""some dummy, test and example code"""
tpl = (1, 2, 3, 'name', 1.0)
names = ['a','b','c','txt', 'f']
l = SharedTupleList(tpl, names, size=100)
cp = SharedTupleList(tpl, names, shm_names=l.get_shm_names())
cp2 = SharedTupleList.create_by_ref(l)
print('len(l)=', len(l))
print('len(cp)=', len(cp))
print(l[0])
for e in l[0]:
print(e, end=' | ')
print()
for n in names:
print(n, '=', l.get(0, n), end=';')
print()
for n in names:
print(n, '=', cp.get(0, n), end=';')
print()
print('change')
l.set(0,0,37)
cp.set(0,'b',-27)
cp2.set(0,'txt','abcdefgh')
for n in names:
print(n, '=', l.get(0, n), end=';')
print()
for n in names:
print(n, '=', cp.get(0, n), end=';')
print()
for i, v in enumerate(tpl):
print(i, ': ', v, '(l) =>', l.get_shm_name(i))
print(i, ': ', v, '(cp)=>', cp.get_shm_name(i))
for x in range(len(l)):
l.set(x,2,x)
l[0] = (-1,-1,-1,'start',-1.0)
cp[len(l)-1] = (0,0,0,'end', 0.0)
for x in range(len(l)):
print(cp2[x])
l.close()
cp.close()
cp2.close()
l.unlink()