diff --git a/scripts/fitdump b/scripts/fitdump index 5e4a4b4..700242f 100755 --- a/scripts/fitdump +++ b/scripts/fitdump @@ -37,6 +37,75 @@ def format_message(num, message, options): return "".join(s) +def sort_record_types(record_type: str = '') -> int: + # Header order + record_order = {'file_id': 0, + 'sport': 1, + 'workout': 2, + 'activity': 3, + 'session': 4, + 'lap': 5, + 'device_info': 10, + 'hr_zone': 11, + 'power_zone': 12, + 'record': 20, + 'event': 30, + 'field_description': 40, + 'developer_data_id': 100} + DEFAULT_ORDER = max(record_order.values()) + 1 + try: + return record_order[record_type] + except KeyError: + # Not found, last + return DEFAULT_ORDER + + +def field_header_format(field_data) -> str: + s = field_data.name + if field_data.units: + s += ' [{}]'.format(field_data.units) + return s + + +def field_data_format(messages, message_header): + for message in messages: + # Format this line + s = [''] * len(message_header) + for field_data in message: + s[message_header.index(field_data.name)] = str(field_data.value) + yield ','.join(s) + '\n' + + +def write_csv(options, records): + records = list(records) + # Collect all the message types in the file + record_types = set([message.name for message in records]) + # Two-stage sort, header info, then alphabetic + record_types = list(sorted(sorted(record_types), key=sort_record_types)) + + # Cache for all lines + all_lines = list() + + for write_type in record_types: + # Collect all messages of this type + write_messages = [message for message in records if message.name == write_type] + # Collect the data field information in alphabetic order + field_names_header = sorted(set([field_header_format(field_data) for message in write_messages for field_data in message])) + field_names = sorted(set(field_data.name for message in write_messages for field_data in message)) + # Write the overall header line + all_lines.append(write_type + '\n') + # Write the field names header line + all_lines.append(','.join(field_names_header) + '\n') + # Write the data + all_lines.extend(field_data_format(write_messages, field_names)) + # Write a blank line + all_lines.append('\n\n') + + # Write the actual file + options.output.writelines(all_lines) + pass + + def parse_args(args=None): parser = argparse.ArgumentParser( description='Dump .FIT files to various formats', @@ -48,8 +117,7 @@ def parse_args(args=None): help='File to output data into (defaults to stdout)', ) parser.add_argument( - # TODO: csv - '-t', '--type', choices=('readable', 'json', 'gpx'), default='readable', + '-t', '--type', choices=('readable', 'json', 'gpx', 'csv'), default='readable', help='File type to output. (DEFAULT: %(default)s)', ) parser.add_argument( @@ -181,7 +249,7 @@ def main(args=None): fitfile = fitparse.UncachedFitFile( options.infile, data_processor=fitparse.StandardUnitsDataProcessor(), - check_crc=not(options.ignore_crc), + check_crc=not options.ignore_crc, ) records = fitfile.get_messages( name=options.name, @@ -201,12 +269,15 @@ def main(args=None): if filename: filename = os.path.basename(filename) options.output.writelines(generate_gpx(records, filename)) + elif options.type == 'csv': + write_csv(options, records) finally: try: options.output.close() except IOError: pass + if __name__ == '__main__': try: main() diff --git a/tests/files/ELEMNT_ROAM_Unterminated.fit b/tests/files/ELEMNT_ROAM_Unterminated.fit new file mode 100644 index 0000000..a3a54a6 Binary files /dev/null and b/tests/files/ELEMNT_ROAM_Unterminated.fit differ diff --git a/tests/files/rollers.fit b/tests/files/rollers.fit new file mode 100644 index 0000000..61cbde3 Binary files /dev/null and b/tests/files/rollers.fit differ diff --git a/tests/test.py b/tests/test.py index eea5dc2..77f9b0a 100755 --- a/tests/test.py +++ b/tests/test.py @@ -413,6 +413,10 @@ def test_speed(self): avg_speed = list(f.get_messages('session'))[0].get_values().get('avg_speed') self.assertEqual(avg_speed, 5.86) + def test_rollers(self): + f = FitFile(testfile('rollers.fit')) + f.parse() + def test_mismatched_field_size(self): f = FitFile(testfile('coros-pace-2-cycling-misaligned-fields.fit')) with warnings.catch_warnings(record=True) as w: @@ -422,7 +426,7 @@ def test_mismatched_field_size(self): self.assertEqual(len(f.messages), 11293) def test_unterminated_file(self): - f = FitFile(testfile('nick.fit'), check_crc=False) + f = FitFile(testfile('ELEMNT_ROAM_Unterminated.fit'), check_crc=False) with warnings.catch_warnings(record=True) as w: f.parse()