From 8772d12dc8b416bea1bdc96f1e781a753983bb7b Mon Sep 17 00:00:00 2001 From: LadyChristina Date: Fri, 13 Jun 2025 20:23:20 +0100 Subject: [PATCH] Add input paths to config file --- config.yaml | 6 ++ consensus_decentralization/helper.py | 10 +++- consensus_decentralization/parse.py | 10 ++-- .../parsers/default_parser.py | 34 +++++++++--- .../parsers/dummy_parser.py | 4 +- .../parsers/ethereum_parser.py | 10 ++-- data_collection_scripts/collect_block_data.py | 23 ++++---- run.py | 55 +++++++++++-------- tests/test_mappings.py | 26 ++++----- tests/test_parsers.py | 22 ++++---- 10 files changed, 120 insertions(+), 80 deletions(-) diff --git a/config.yaml b/config.yaml index 7cd06cf..796c3f3 100644 --- a/config.yaml +++ b/config.yaml @@ -58,3 +58,9 @@ population_windows: 1 plot_parameters: plot: false animated: false + +# List of paths that specify where to look for raw block data. Relative to the root directory of the repository. +# The first item in the list is the directory that is used to write newly fetched data when using the +# `collect_block_data` script and is also the directory where tests expect the sample data to be found. +input_directories: + - raw_block_data diff --git a/consensus_decentralization/helper.py b/consensus_decentralization/helper.py index 443073a..00d215f 100644 --- a/consensus_decentralization/helper.py +++ b/consensus_decentralization/helper.py @@ -13,7 +13,6 @@ from yaml import safe_load ROOT_DIR = pathlib.Path(__file__).resolve().parent.parent -RAW_DATA_DIR = ROOT_DIR / 'raw_block_data' INTERIM_DIR = ROOT_DIR / 'processed_data' MAPPING_INFO_DIR = ROOT_DIR / 'mapping_information' RESULTS_DIR = ROOT_DIR / 'results' @@ -464,3 +463,12 @@ def get_mapped_data_filename(clustering_flag): :returns: str """ return 'mapped_data_' + ('clustered' if clustering_flag else 'non_clustered') + '.json' + + +def get_input_directories(): + """ + Reads the config file and retrieves the directories to look for raw block data + :returns: a list of directories that may contain the raw block data + """ + config = get_config_data() + return [ROOT_DIR / input_dir for input_dir in config['input_directories']] diff --git a/consensus_decentralization/parse.py b/consensus_decentralization/parse.py index 17de475..fd799a2 100644 --- a/consensus_decentralization/parse.py +++ b/consensus_decentralization/parse.py @@ -16,13 +16,13 @@ } -def parse(project, input_dir): +def parse(ledger, input_dirs): """ Parses raw data - :param project: string that corresponds to the ledger whose data should be parsed - :param input_dir: path to the directory of the raw block data + :param ledger: string that corresponds to the ledger whose data should be parsed + :param input_dirs: list of paths that point to the directories that contain raw block data :returns: list of dictionaries (the parsed data of the project) """ - logging.info(f'Parsing {project} data..') - parser = ledger_parser[project](project_name=project, input_dir=input_dir) + logging.info(f'Parsing {ledger} data..') + parser = ledger_parser[ledger](ledger=ledger, input_dirs=input_dirs) return parser.parse() diff --git a/consensus_decentralization/parsers/default_parser.py b/consensus_decentralization/parsers/default_parser.py index b8a3ada..a407c5d 100644 --- a/consensus_decentralization/parsers/default_parser.py +++ b/consensus_decentralization/parsers/default_parser.py @@ -9,12 +9,13 @@ class DefaultParser: The default parser, used for Bitcoin, Litecoin, Zcash and others. Any project that requires different parsing must use a parser class that inherits from this one. - :ivar project_name: the name of the project associated with a specific parser instance + :ivar ledger: the name of the ledger associated with a specific parser instance + :ivar input_dirs: the directories where the raw block data are stored """ - def __init__(self, project_name, input_dir): - self.project_name = project_name - self.input_dir = input_dir + def __init__(self, ledger, input_dirs): + self.ledger = ledger + self.input_dirs = input_dirs @staticmethod def parse_identifiers(block_identifiers): @@ -26,13 +27,30 @@ def parse_identifiers(block_identifiers): """ return str(codecs.decode(block_identifiers, 'hex')) + def get_input_file(self): + """ + Determines the file that contains the raw data for the project. The file is expected to be named + _raw_data.json and to be located in (exactly) one of the input directories. + :returns: a Path object that corresponds to the file containing the raw data + :raises FileNotFoundError: if the file does not exist in any of the input directories + """ + filename = f'{self.ledger}_raw_data.json' + for input_dir in self.input_dirs: + filepath = input_dir / filename + if filepath.is_file(): + return filepath + raise FileNotFoundError(f'File {self.ledger}_raw_data.json not found in the input directories. Skipping ' + f'{self.ledger}..') + def read_and_sort_data(self): """ Reads the "raw" block data associated with the project :returns: a list of dictionaries (block data) sorted by timestamp """ - filename = f'{self.project_name}_raw_data.json' - filepath = self.input_dir / filename + try: + filepath = self.get_input_file() + except FileNotFoundError as e: + raise e with open(filepath) as f: contents = f.read() data = [json.loads(item) for item in contents.strip().split('\n')] @@ -48,8 +66,8 @@ def parse(self): data = self.read_and_sort_data() for block in data: - block['reward_addresses'] = ','.join(sorted([tx['addresses'][0] for tx in block['outputs'] - if (tx['addresses'] and int(tx['value']) > MIN_TX_VALUE)])) + block['reward_addresses'] = ','.join(sorted([tx['addresses'][0] for tx in block['outputs'] if + (tx['addresses'] and int(tx['value']) > MIN_TX_VALUE)])) del block['outputs'] block['identifiers'] = self.parse_identifiers(block['identifiers']) return data diff --git a/consensus_decentralization/parsers/dummy_parser.py b/consensus_decentralization/parsers/dummy_parser.py index 57ea3a6..75ce8ce 100644 --- a/consensus_decentralization/parsers/dummy_parser.py +++ b/consensus_decentralization/parsers/dummy_parser.py @@ -6,8 +6,8 @@ class DummyParser(DefaultParser): Dummy parser that only sorts the raw data. Used when the data are already in the required format. """ - def __init__(self, project_name, input_dir): - super().__init__(project_name, input_dir) + def __init__(self, ledger, input_dirs): + super().__init__(ledger, input_dirs) @staticmethod def parse_identifiers(block_identifiers): diff --git a/consensus_decentralization/parsers/ethereum_parser.py b/consensus_decentralization/parsers/ethereum_parser.py index 5739ebb..2a76e8c 100644 --- a/consensus_decentralization/parsers/ethereum_parser.py +++ b/consensus_decentralization/parsers/ethereum_parser.py @@ -7,8 +7,8 @@ class EthereumParser(DummyParser): Parser for Ethereum. Inherits from DummyParser class. """ - def __init__(self, project_name, input_dir): - super().__init__(project_name, input_dir) + def __init__(self, ledger, input_dirs): + super().__init__(ledger, input_dirs) @staticmethod def parse_identifiers(block_identifiers): @@ -29,8 +29,10 @@ def read_and_sort_data(self): Note that the current version does not sort the data (because it is too memory-intensive) but assumes that the data are already sorted (which is generally the case given the suggested queries). """ - filename = f'{self.project_name}_raw_data.json' - filepath = self.input_dir / filename + try: + filepath = self.get_input_file() + except FileNotFoundError as e: + raise e def generate_data(): with open(filepath) as f: diff --git a/data_collection_scripts/collect_block_data.py b/data_collection_scripts/collect_block_data.py index 6e4b697..1125b55 100644 --- a/data_collection_scripts/collect_block_data.py +++ b/data_collection_scripts/collect_block_data.py @@ -16,13 +16,10 @@ from yaml import safe_load from datetime import datetime -from consensus_decentralization.helper import ROOT_DIR, RAW_DATA_DIR +from consensus_decentralization.helper import ROOT_DIR -def collect_data(ledgers, from_block, to_date): - if not RAW_DATA_DIR.is_dir(): - RAW_DATA_DIR.mkdir() - +def collect_data(raw_data_dir, ledgers, from_block, to_date): data_collection_dir = ROOT_DIR / "data_collection_scripts" with open(data_collection_dir / "queries.yaml") as f: @@ -31,7 +28,7 @@ def collect_data(ledgers, from_block, to_date): client = bq.Client.from_service_account_json(json_credentials_path=data_collection_dir / "google-service-account-key.json") for ledger in ledgers: - file = RAW_DATA_DIR / f'{ledger}_raw_data.json' + file = raw_data_dir / f'{ledger}_raw_data.json' logging.info(f"Querying {ledger} from block {from_block[ledger]} until {to_date}..") query = (queries[ledger]).replace("{{block_number}}", str(from_block[ledger]) if from_block[ledger] else "-1").replace("{{timestamp}}", to_date) @@ -56,14 +53,13 @@ def collect_data(ledgers, from_block, to_date): logging.info(f'Done writing {ledger} data to file.\n') -def get_last_block_collected(ledger): +def get_last_block_collected(file): """ Get the last block collected for a ledger. This is useful for knowing where to start collecting data from. Assumes that the data is stored in a json lines file, ordered in increasing block number. - :param ledger: the ledger to get the last block collected for - :returns: the number of the last block collected for the specified ledger + :param file: the file that corresponds to the ledger to get the last block collected for + :returns: the number of the last ledger block collected in the file """ - file = RAW_DATA_DIR / f'{ledger}_raw_data.json' if not file.is_file(): return None with open(file) as f: @@ -95,5 +91,8 @@ def get_last_block_collected(ledger): ) args = parser.parse_args() - from_block = {ledger: get_last_block_collected(ledger) for ledger in args.ledgers} - collect_data(ledgers=args.ledgers, from_block=from_block, to_date=args.to_date) + raw_data_dir = hlp.get_input_directories()[0] + if not raw_data_dir.is_dir(): + raw_data_dir.mkdir() + from_block = {ledger: get_last_block_collected(file=raw_data_dir / f'{ledger}_raw_data.json') for ledger in args.ledgers} + collect_data(raw_data_dir=raw_data_dir, ledgers=args.ledgers, from_block=from_block, to_date=args.to_date) diff --git a/run.py b/run.py index 2883bb3..84cef8b 100644 --- a/run.py +++ b/run.py @@ -13,7 +13,8 @@ def process_data(force_map, ledger_dir, ledger, output_dir): clustering_flag = hlp.get_clustering_flag() mapped_data_file = ledger_dir / hlp.get_mapped_data_filename(clustering_flag) if force_map or not mapped_data_file.is_file(): - parsed_data = parse(ledger, input_dir=hlp.RAW_DATA_DIR) + raw_data_dirs = hlp.get_input_directories() + parsed_data = parse(ledger=ledger, input_dirs=raw_data_dirs) return apply_mapping(ledger, parsed_data=parsed_data, output_dir=output_dir) return None @@ -36,11 +37,16 @@ def main(ledgers, timeframe, estimation_window, frequency, population_windows, i force_map = hlp.get_force_map_flag() - for ledger in ledgers: + for ledger in list(ledgers): ledger_dir = interim_dir / ledger ledger_dir.mkdir(parents=True, exist_ok=True) # create ledger output directory if it doesn't already exist - mapped_data = process_data(force_map, ledger_dir, ledger, interim_dir) + try: + mapped_data = process_data(force_map, ledger_dir, ledger, interim_dir) + except FileNotFoundError as e: + logging.error(repr(e)) + ledgers.remove(ledger) + continue aggregate( ledger, @@ -52,30 +58,31 @@ def main(ledgers, timeframe, estimation_window, frequency, population_windows, i mapped_data=mapped_data ) - aggregated_data_filename = hlp.get_blocks_per_entity_filename(timeframe, estimation_window, frequency) - metrics_dir = results_dir / 'metrics' - metrics_dir.mkdir(parents=True, exist_ok=True) - - used_metrics = analyze( - projects=ledgers, - aggregated_data_filename=aggregated_data_filename, - population_windows=population_windows, - input_dir=interim_dir, - output_dir=metrics_dir - ) - - if hlp.get_plot_flag(): - figures_dir = results_dir / 'figures' - figures_dir.mkdir(parents=True, exist_ok=True) - plot( - ledgers=ledgers, - metrics=used_metrics, + if ledgers: + aggregated_data_filename = hlp.get_blocks_per_entity_filename(timeframe, estimation_window, frequency) + metrics_dir = results_dir / 'metrics' + metrics_dir.mkdir(parents=True, exist_ok=True) + + used_metrics = analyze( + projects=ledgers, aggregated_data_filename=aggregated_data_filename, - animated=hlp.get_plot_config_data()['animated'], - metrics_dir=metrics_dir, - figures_dir=figures_dir + population_windows=population_windows, + input_dir=interim_dir, + output_dir=metrics_dir ) + if hlp.get_plot_flag(): + figures_dir = results_dir / 'figures' + figures_dir.mkdir(parents=True, exist_ok=True) + plot( + ledgers=ledgers, + metrics=used_metrics, + aggregated_data_filename=aggregated_data_filename, + animated=hlp.get_plot_config_data()['animated'], + metrics_dir=metrics_dir, + figures_dir=figures_dir + ) + if __name__ == '__main__': ledgers = hlp.get_ledgers() diff --git a/tests/test_mappings.py b/tests/test_mappings.py index b161e0f..cbcb913 100644 --- a/tests/test_mappings.py +++ b/tests/test_mappings.py @@ -12,7 +12,7 @@ from consensus_decentralization.mappings.ethereum_mapping import EthereumMapping from consensus_decentralization.mappings.cardano_mapping import CardanoMapping from consensus_decentralization.mappings.tezos_mapping import TezosMapping -from consensus_decentralization.helper import RAW_DATA_DIR, INTERIM_DIR, get_clustering_flag +from consensus_decentralization.helper import INTERIM_DIR, get_clustering_flag, get_input_directories @pytest.fixture @@ -31,7 +31,7 @@ def setup_and_cleanup(): ledger_parser['sample_cardano'] = DummyParser ledger_mapping['sample_tezos'] = TezosMapping ledger_parser['sample_tezos'] = DummyParser - test_raw_data_dir = RAW_DATA_DIR + test_raw_data_dirs = get_input_directories() test_output_dir = INTERIM_DIR / "test_output" # Create the output directory for each project (as this is typically done in the run.py script before parsing or # mapping takes place) @@ -41,7 +41,7 @@ def setup_and_cleanup(): mapping_info_dir = pathlib.Path(__file__).resolve().parent.parent / 'mapping_information' # Mock return value of get_clustering_flag get_clustering_flag.return_value = True - yield mapping_info_dir, test_raw_data_dir, test_output_dir + yield mapping_info_dir, test_raw_data_dirs, test_output_dir # Clean up shutil.rmtree(test_output_dir) @@ -95,9 +95,9 @@ def prep_sample_tezos_mapping_info(): def test_map(setup_and_cleanup, prep_sample_bitcoin_mapping_info): - mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup + mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup - parsed_data = parse(project='sample_bitcoin', input_dir=test_raw_data_dir) + parsed_data = parse(ledger='sample_bitcoin', input_dirs=test_raw_data_dirs) apply_mapping(project='sample_bitcoin', parsed_data=parsed_data, output_dir=test_output_dir) mapped_data_file = test_output_dir / 'sample_bitcoin/mapped_data_clustered.json' @@ -105,14 +105,14 @@ def test_map(setup_and_cleanup, prep_sample_bitcoin_mapping_info): def test_bitcoin_mapping(setup_and_cleanup, prep_sample_bitcoin_mapping_info): - mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup + mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup with open(mapping_info_dir / 'addresses/sample_bitcoin.json') as f: pool_addresses = json.load(f) pool_addresses['0000000000000000000000000000000000000000'] = {'name': 'TEST2', 'source': ''} with open(mapping_info_dir / 'addresses/sample_bitcoin.json', 'w') as f: f.write(json.dumps(pool_addresses)) - parsed_data = parse(project='sample_bitcoin', input_dir=test_raw_data_dir) + parsed_data = parse(ledger='sample_bitcoin', input_dirs=test_raw_data_dirs) apply_mapping(project='sample_bitcoin', parsed_data=parsed_data, output_dir=test_output_dir) expected_block_creators = { @@ -136,7 +136,7 @@ def test_bitcoin_mapping(setup_and_cleanup, prep_sample_bitcoin_mapping_info): def test_ethereum_mapping(setup_and_cleanup, prep_sample_ethereum_mapping_info): - mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup + mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup with open(mapping_info_dir / 'addresses/sample_ethereum.json') as f: addresses = json.load(f) @@ -144,7 +144,7 @@ def test_ethereum_mapping(setup_and_cleanup, prep_sample_ethereum_mapping_info): with open(mapping_info_dir / 'addresses/sample_ethereum.json', 'w') as f: f.write(json.dumps(addresses)) - parsed_data = parse(project='sample_ethereum', input_dir=test_raw_data_dir) + parsed_data = parse(ledger='sample_ethereum', input_dirs=test_raw_data_dirs) apply_mapping(project='sample_ethereum', parsed_data=parsed_data, output_dir=test_output_dir) expected_block_creators = { @@ -169,9 +169,9 @@ def test_ethereum_mapping(setup_and_cleanup, prep_sample_ethereum_mapping_info): def test_cardano_mapping(setup_and_cleanup, prep_sample_cardano_mapping_info): - mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup + mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup - parsed_data = parse(project='sample_cardano', input_dir=test_raw_data_dir) + parsed_data = parse(ledger='sample_cardano', input_dirs=test_raw_data_dirs) apply_mapping(project='sample_cardano', parsed_data=parsed_data, output_dir=test_output_dir) expected_block_creators = { @@ -193,9 +193,9 @@ def test_cardano_mapping(setup_and_cleanup, prep_sample_cardano_mapping_info): def test_tezos_mapping(setup_and_cleanup, prep_sample_tezos_mapping_info): - mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup + mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup - parsed_data = parse(project='sample_tezos', input_dir=test_raw_data_dir) + parsed_data = parse(ledger='sample_tezos', input_dirs=test_raw_data_dirs) apply_mapping(project='sample_tezos', parsed_data=parsed_data, output_dir=test_output_dir) expected_block_creators = { diff --git a/tests/test_parsers.py b/tests/test_parsers.py index 3b5b813..981f245 100644 --- a/tests/test_parsers.py +++ b/tests/test_parsers.py @@ -3,13 +3,13 @@ from consensus_decentralization.parsers.default_parser import DefaultParser from consensus_decentralization.parsers.dummy_parser import DummyParser from consensus_decentralization.parsers.ethereum_parser import EthereumParser -from consensus_decentralization.helper import RAW_DATA_DIR +from consensus_decentralization.helper import get_input_directories @pytest.fixture def setup(): - test_raw_data_dir = RAW_DATA_DIR - return test_raw_data_dir + test_raw_data_dirs = get_input_directories() + return test_raw_data_dirs def compare_parsed_samples(correct_data, parsed_data): @@ -25,7 +25,7 @@ def compare_parsed_samples(correct_data, parsed_data): def test_default_parser(setup): - test_raw_data_dir = setup + test_raw_data_dirs = setup sample_parsed_data = [ {"number": "507516", "timestamp": "2018-02-04 02:36:23 UTC", "identifiers": 'b"\\x03|\\xbe\\x07A\\xd6\\x9d\\x9cj\\xcc\\xe4\\xd1A\\xd6\\x9d\\x9ci\\xf9\\xbe\\xf5/BTC.TOP/\\xfa\\xbemm\\x141 \\xf7\\xb3\\xda\\x91\\x8f\\x12\\xff\\xb3(\\xab\\x93_\\xbf\\xe2\\xd1\\xcd\\x9b\\xb4pre\\xd7\\xfe\\xe2?\\xd6\\xcf7\'\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x00ZD\\xca\\xcf\\x00\\x00\\xf8\\xa4A \\x00\\x00"', "reward_addresses": "137YB5cpBLxLKvy8T6qXsycJ699iJjWCHH,1FVKW4rp5rN23dqFVk2tYGY4niAXMB8eZC"}, {"number": "507715", "timestamp": "2018-02-05 04:54:34 UTC", "identifiers": "b'\\x03C\\xbf\\x07\\x13/mined by gbminers/,\\xfa\\xbemm\\x94\\x97n\\xce\\xbb\\xc7;=B\\x14\\xb3\\xd7\\xab3\\r\\xca!)\\xeb\\xfc\\xc8c\\xfaubAb\\x0f\\xe9\\x10\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x10Tb\\xa2\\x0f\\xc2\\x15\\x91\\xf7\\x0ei\\x19\\x05f\\x0b\\x00\\x00'", "reward_addresses": "18cBEMRxXHqzWWCxZNtU91F5sbUNKhL5PX"} ] - parser = DefaultParser(project_name='sample_bitcoin', input_dir=test_raw_data_dir) + parser = DefaultParser(ledger='sample_bitcoin', input_dirs=test_raw_data_dirs) parsed_data = parser.parse() compare_parsed_samples(sample_parsed_data, parsed_data) def test_dummy_parser(setup): - test_raw_data_dir = setup + test_raw_data_dirs = setup sample_parsed_data = [ {"number": "1649812", "timestamp": "2021-08-30 00:36:18 UTC", "identifiers": None, "reward_addresses": "tz1Kf25fX1VdmYGSEzwFy1wNmkbSEZ2V83sY"}, {"number": "1650474", "timestamp": "2021-08-30 06:11:58 UTC", "identifiers": None, "reward_addresses": "tz1Vd1rXpV8hTHbFXCXN3c3qzCsgcU5BZw1e"}, @@ -56,21 +56,21 @@ def test_dummy_parser(setup): {"number": "0000000", "timestamp": "2018-08-30 00:36:18 UTC", "identifiers": None, "reward_addresses": "tz0000000000000000000000000000000000"}, ] - parser = DummyParser(project_name='sample_tezos', input_dir=test_raw_data_dir) + parser = DummyParser(ledger='sample_tezos', input_dirs=test_raw_data_dirs) parsed_data = parser.parse() compare_parsed_samples(sample_parsed_data, parsed_data) def test_parse(setup): - test_raw_data_dir = setup + test_raw_data_dirs = setup sample_block = {"number": "682736", "timestamp": "2021-05-09 11:12:32 UTC", "identifiers": "03f06a0a202f5669614254432f4d696e6564206279206a617669647361656964373037332f2cfabe6d6d6e43ef2e06f7137b897180388403ee5019b8ff0ca4a045ea3cd82e3e41620fe91000000000000000105462a20fc21591f70e691905660b0000", "reward_addresses": "18cBEMRxXHqzWWCxZNtU91F5sbUNKhL5PX"} ledger_parser['sample_bitcoin'] = DefaultParser - input_file = test_raw_data_dir / 'sample_bitcoin_raw_data.json' + input_file = test_raw_data_dirs[0] / 'sample_bitcoin_raw_data.json' - parsed_data = parse(project='sample_bitcoin', input_dir=test_raw_data_dir) + parsed_data = parse(ledger='sample_bitcoin', input_dirs=test_raw_data_dirs) for item in parsed_data: if item['number'] == '682736': assert item['reward_addresses'] == sample_block['reward_addresses'] @@ -81,7 +81,7 @@ def test_parse(setup): with open(input_file, 'w') as f: f.write(sample_data) - parsed_data = parse(project='sample_bitcoin', input_dir=test_raw_data_dir) + parsed_data = parse(ledger='sample_bitcoin', input_dirs=test_raw_data_dirs) for item in parsed_data: if item['number'] == '682736': assert item['reward_addresses'] == "----------------------------------"