forked from mstoeckl/pascapalyze
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.py
More file actions
318 lines (279 loc) · 13.2 KB
/
index.py
File metadata and controls
318 lines (279 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
import os, sys
from typing import Self
from struct import unpack
import numpy as np
import matplotlib.pyplot as plt
# Parsing librairies
from zipfile import ZipFile
import xml.etree.ElementTree as ET
def grok(file_name: str, data_size: int, archive: ZipFile):
if data_size == 0:
return []
try:
with archive.open(
os.path.normpath(file_name).replace("\\", "/"), "r"
) as source:
binary_data = source.read(12 * data_size)
if not binary_data or len(binary_data) != 12 * data_size:
print(
"Data set did not contain advertised number of elements:",
binary_data is None,
len(binary_data),
12 * data_size,
)
return []
# Extract numbers from binary data
numbers = [
unpack("d", binary_data[offset : offset + 8])[0]
for offset in range(4, 12 * data_size, 12)
]
return numbers
except KeyError:
print("Subfile", file_name, "not available in archive")
return []
def frange(start: int | float, stop: int | float, step: int | float) -> list:
"""Range, but for floats"""
list_of_items = []
for index in range(int((stop - start) / step)):
list_of_items.append(round(start + step * index, 12))
return list_of_items
class DataSet:
"""Represents a single data set within a PASCO Capstone file."""
def __init__(
self,
name: str,
x_values: str | int,
y_values: str,
data_size: int,
channel_id_name: str | None = None,
archive: ZipFile | None = None,
):
"""
Initialize a DataSet object.
:param name: The name of the data set (e.g., MeasurementName-ChannelIDName).
:type name: str
:param x_values: The path to the independent data file or the time step (if constant).
:type x_values: str | int
:param y_values: The path to the dependent data file.
:type y_values: str
:param data_size: The size of the data in the dependent file.
:type data_size: int
"""
self.name = name
if archive:
if type(x_values) == str:
self.x_values = grok(x_values, data_size, archive)
elif type(x_values) == float:
self.x_values = list(frange(0, x_values * data_size, (x_values)))
else:
self.x_values = x_values
self.y_values = grok(y_values, data_size, archive)
else:
self.x_values = x_values
self.y_values = y_values
self.data_size = data_size
self.channel_id_name = channel_id_name
def plot(self, show: bool = True):
"""
Uses MatPlotLib to plot the data on a graph.
"""
if not (type(self.x_values) == list and type(self.y_values) == list):
raise TypeError(
f"The given data is in the wrong format. Expected list (or Array like), got {type(self.x_values)} and {type(self.y_values)}."
)
plt.plot(self.x_values, self.y_values)
if show:
plt.show()
def __repr__(self) -> str:
"""
Return a string representation of the DataSet object.
:return: A string describing the DataSet.
:rtype: str
"""
representation = f'DataSet(name="{self.name}", '
if type(self.x_values) == float:
representation += f"x_values={self.x_values}, "
elif type(self.x_values) == list:
representation += f"x_values={self.x_values if len(self.x_values) <= 2 else f"[{self.x_values[0]},...,{self.x_values[-1]}]"}, "
else:
representation += f'x_values="{self.x_values}", '
if type(self.y_values) == float:
representation += f"y_values={self.y_values}, "
elif type(self.y_values) == list:
representation += f"y_values={self.y_values if len(self.y_values) <= 2 else f"[{self.y_values[0]}, ..., {self.y_values[-1]}]"}, "
else:
representation += f'y_values="{self.y_values}", '
representation += f"data_size={self.data_size})"
return representation
def __str__(self) -> str:
return self.__repr__()
class CapstoneFile:
"""Represents a PASCO Capstone file and its data."""
def __init__(self, file_path: str):
"""Load a PASCO Capstone file and initialize the object.
:param file_path: The path to the PASCO Capstone file (.cap)
:type file_path: str
:raises FileNotFoundError: Try double-checking the given file path."""
if (
not os.path.isfile(file_path)
or os.path.splitext(file_path)[1].lower() != ".cap"
):
raise FileNotFoundError(
f"Unable to find the PASCO Capstone (.cap) file at {file_path}. Verify the existance and the extension of your file."
)
self.archive_path = file_path
self.archive = ZipFile(file_path, "r")
self.data_sets: dict[int, list[DataSet]] = self.process_archive()
def process_archive(capstone_file: ZipFile | Self) -> dict[int, list[DataSet]]:
"""Load the data from the PASCO Capstone archive.
:param capstone_file: This parameters appears only if you use the method from the class and not from an object.
:type capstone_file: ZipFile|CapstoneFile
:returns: A dictionnary with a list of data sets asociated with their group number.
:rtype: dict[int, list[DataSet]]"""
if type(capstone_file) == ZipFile:
archive = capstone_file
else:
archive = capstone_file.archive
main_xml_content = ET.fromstring(archive.read("main.xml"))
# -------------------------------------------------------------------------------------------------------
# Get the DataSet elements
# -------------------------------------------------------------------------------------------------------
# data_sets:
# DataSource/DataSet[DataGroupNumber]:
# DataSource[MeasurmentName]-DataSource[ChannelIDName]:
# DataSource/DataSet/DataSegmentElement/DependentStorageElement[FileName]
# DataSource/DataSet/DataSegmentElement/IndependentStorageElement[IntervalCacheInterval|FileName]
# -------------------------------------------------------------------------------------------------------
data_sets: dict[int, list[DataSet]] = {}
data_repository = main_xml_content.find("DataRepository")
data_source_elements = data_repository.findall("DataSource")
for data_source_element in data_source_elements:
data_set_elements = data_source_element.findall("DataSet")
if not data_set_elements:
continue
else:
# -----------------------------------------------------------------------------------------------
# Retrieve all the informations about a DataSource
# -----------------------------------------------------------------------------------------------
measurment_name = data_source_element.get("MeasurementName")
channel_id_name = data_source_element.get("ChannelIDName")
for data_set_element in data_set_elements:
data_segment_element = data_set_element.find("DataSegmentElement")
dependent_file = data_segment_element.find(
"DependentStorageElement"
)
independent_file = data_segment_element.find(
"IndependentStorageElement"
)
group_number = int(data_set_element.get("DataGroupNumber"))
data_size = int(dependent_file.get("DataCacheDataSize"))
dependent_file_name = dependent_file.get("FileName")
independent_file_name = independent_file.get("FileName")
if not independent_file_name:
independent_file_name = float(
independent_file.get("IntervalCacheInterval")
)
# Safety, to later remove the strange defaultdict(dict)
if not group_number in data_sets:
data_sets[group_number] = []
# Add the set to data_sets
data_sets[group_number].append(
DataSet(
measurment_name,
independent_file_name,
dependent_file_name,
data_size,
channel_id_name if channel_id_name else None,
capstone_file.archive,
)
)
return data_sets
def plot(self, series: list[int] | None = None):
actual_index = 0
for group_id, group in self.data_sets.items():
for data_set in group:
if not series or actual_index in series:
data_set.plot(False)
actual_index += 1
plt.show()
def to_csv(self, decimal_separator: str = ".", cell_separator: str = ";"):
original_table: list[list[int | str]] = []
max_column_length = 1
for group_id, group in self.data_sets.items():
columns = []
for data_set in group:
column_x = ["", data_set.name, "x-axis"]
column_y = ["", "", "y-axis"]
if type(data_set.x_values) == list and type(data_set.y_values) == list:
column_x.extend(data_set.x_values)
column_y.extend(data_set.y_values)
columns.append(column_x)
columns.append(column_y)
max_column_length = max(max_column_length, len(column_x), len(column_y))
if columns and columns[0]:
columns[0][0] = f"Group {group_id}"
else:
columns.append([f"Group {group_id} (empty)"])
original_table.extend(columns)
for column_index, column in enumerate(original_table):
if len(column) < max_column_length:
original_table[column_index].extend(
[""] * (max_column_length - len(column))
)
output = "\n".join(
[
cell_separator.join(
[
(
str(column[row_index])
if type(column[row_index]) != float
else str(column[row_index]).replace(".", decimal_separator)
)
for column in original_table
]
)
+ cell_separator
for row_index in range(max_column_length)
]
)
return output
def __repr__(self):
representation = f"{os.path.basename(self.archive_path)} at {os.path.realpath(self.archive_path)}:"
for group_number, data_sets in self.data_sets.items():
representation += f"\nGroup {group_number}:"
for data_set in data_sets:
representation += f"\n{data_set}"
return representation
def __str__(self):
return self.__repr__()
if __name__ == "__main__":
# Check if a file path is provided as a command-line argument
if len(sys.argv) > 1 and os.path.isfile(sys.argv[1]):
# Create a CapstoneFile object from the provided file path
capstone_file = CapstoneFile(sys.argv[1])
# Check if the user wants to export the data to a CSV file
if "-to-csv" in sys.argv and sys.argv.index("-to-csv") < len(sys.argv) - 1:
# Open the specified output file for writing
with open(sys.argv[sys.argv.index("-to-csv") + 1], "w") as capstone_csv_output:
# Generate CSV content with the specified or default separators
capstone_file_csv = capstone_file.to_csv(
cell_separator=sys.argv[sys.argv.index("-csv-sep") + 1]
if "-csv-sep" in sys.argv and sys.argv.index("-csv-sep") < len(sys.argv) - 1
and len(sys.argv[sys.argv.index("-csv-sep") + 1]) == 1
else ";",
decimal_separator=sys.argv[sys.argv.index("-csv-dec") + 1]
if "-csv-dec" in sys.argv and sys.argv.index("-csv-dec") < len(sys.argv) - 1
and len(sys.argv[sys.argv.index("-csv-dec") + 1]) == 1
else "."
)
# Write the CSV content to the output file
capstone_csv_output.write(capstone_file_csv)
elif "-to-csv" in sys.argv:
print("Error: Missing output file path for CSV export.")
# Check if the user wants to plot the data
if "-plot" in sys.argv:
capstone_file.plot([0])
elif "-to-csv" not in sys.argv and "-plot" not in sys.argv:
print("Error: No valid operation specified. Use '-to-csv' or '-plot'.")
else:
print("Error: No valid file path provided or file does not exist.")