diff --git a/.gitignore b/.gitignore index eea778f..7ae8d42 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,7 @@ output raw_block_data/*_raw_data.json .coverage site +.ipynb_checkpoints/ +*.ipynb +processed_data +results \ No newline at end of file diff --git a/README.md b/README.md index b179df7..1f1358b 100644 --- a/README.md +++ b/README.md @@ -58,8 +58,8 @@ analyzed. If only the timeframe is specified, all ledgers will be analyzed for the given timeframe. If no arguments are given, all ledgers will be analyzed for all months since January 2018. -Three files `nc.csv`, `gini.csv`, `entropy.csv` are also created in the `output` directory, containing the data from the -last execution of `run.py`. +Three files `nc.csv`, `gini.csv`, `entropy.csv` are also created in the `results` directory, containing the data from +the last execution of `run.py`. ## Contributing diff --git a/config.yaml b/config.yaml index 3a094bd..7cd06cf 100644 --- a/config.yaml +++ b/config.yaml @@ -8,7 +8,9 @@ metrics: hhi: nakamoto_coefficient: theil_index: - max_power_ratio: + concentration_ratio: + - 1 + - 3 tau_index: - 0.33 - 0.66 @@ -36,8 +38,8 @@ analyze_flags: # The timeframe for which an analysis should be performed. # Each date is a string of the form YYYY-MM-DD. timeframe: - start_date: 2011-01-01 - end_date: 2023-12-31 + start_date: 2018-01-01 + end_date: 2025-03-01 # The number of days to use for the estimation window, i.e.how many days of blocks to use for each data point. # If left empty, then the entire time frame will be used (only valid when combined with empty frequency). @@ -46,19 +48,13 @@ estimation_window: 30 # How frequently to sample the data, in days # If left empty, then only one data point will be analyzed (snapshot instead of longitudinal analysis), but this is # only valid when combined with an empty estimation_window. -frequency: 30 # todo maybe add hadrcoded values for day, week, month, year (in the code that parses this) + for the estimation window - - -input_directories: # Paths to directories that contain raw input data - - ./input - -# Paths to directories of snapshot db files; either absolute or relative from run.py. -# The first path will be used to write newly created dbs and the output of runs -output_directories: - - ./output +frequency: 30 +# A number that specifies how many windows to look back and forward when deciding whether an entity is active on a +# given time period, or 'all' to count all entities that have produced blocks in the entire observation period. +population_windows: 1 # Plot flags plot_parameters: plot: false - animated: true + animated: false diff --git a/consensus_decentralization/aggregate.py b/consensus_decentralization/aggregate.py index 166f51e..76dbbe8 100644 --- a/consensus_decentralization/aggregate.py +++ b/consensus_decentralization/aggregate.py @@ -12,16 +12,16 @@ class Aggregator: blocks they produced """ - def __init__(self, project, io_dir): + def __init__(self, project, io_dir, mapped_data=None): """ :param project: str. Name of the project :param io_dir: Path. Path to the project's output directory """ self.project = project - self.data_to_aggregate = hlp.read_mapped_project_data(io_dir) + self.data_to_aggregate = hlp.read_mapped_project_data(io_dir) if mapped_data is None else mapped_data self.data_start_date = hlp.get_timeframe_beginning(hlp.get_date_from_block(self.data_to_aggregate[0])) self.data_end_date = hlp.get_timeframe_beginning(hlp.get_date_from_block(self.data_to_aggregate[-1])) - self.aggregated_data_dir = io_dir / 'blocks_per_entity' + self.aggregated_data_dir = io_dir / hlp.get_aggregated_data_dir_name(hlp.get_clustering_flag()) self.aggregated_data_dir.mkdir(parents=True, exist_ok=True) self.monthly_data_breaking_points = [(self.data_start_date.strftime('%Y-%m'), 0)] @@ -89,7 +89,7 @@ def divide_timeframe(timeframe, estimation_window, frequency): return time_chunks -def aggregate(project, output_dir, timeframe, estimation_window, frequency, force_aggregate): +def aggregate(project, output_dir, timeframe, estimation_window, frequency, force_aggregate, mapped_data=None): """ Aggregates the results of the mapping process for the given project and timeframe. The results are saved in a csv file in the project's output directory. Note that the output file is created (just with the headers) even if there @@ -113,7 +113,7 @@ def aggregate(project, output_dir, timeframe, estimation_window, frequency, forc raise ValueError('The estimation window is too large for the given timeframe') project_io_dir = output_dir / project - aggregator = Aggregator(project, project_io_dir) + aggregator = Aggregator(project, project_io_dir, mapped_data=mapped_data) filename = hlp.get_blocks_per_entity_filename(timeframe=timeframe, estimation_window=estimation_window, frequency=frequency) output_file = aggregator.aggregated_data_dir / filename diff --git a/consensus_decentralization/analyze.py b/consensus_decentralization/analyze.py index 66af0e8..044dc28 100644 --- a/consensus_decentralization/analyze.py +++ b/consensus_decentralization/analyze.py @@ -6,20 +6,25 @@ from consensus_decentralization.metrics.entropy import compute_entropy, compute_entropy_percentage # noqa: F401 from consensus_decentralization.metrics.herfindahl_hirschman_index import compute_hhi # noqa: F401 from consensus_decentralization.metrics.theil_index import compute_theil_index # noqa: F401 -from consensus_decentralization.metrics.max_power_ratio import compute_max_power_ratio # noqa: F401 +from consensus_decentralization.metrics.concentration_ratio import compute_concentration_ratio # noqa: F401 from consensus_decentralization.metrics.tau_index import compute_tau_index # noqa: F401 from consensus_decentralization.metrics.total_entities import compute_total_entities # noqa: F401 -def analyze(projects, aggregated_data_filename, output_dir): +def analyze(projects, aggregated_data_filename, input_dir, output_dir, population_windows): """ Calculates all available metrics for the given ledgers and timeframes. Outputs one file for each metric. :param projects: list of strings that correspond to the ledgers whose data should be analyzed :param aggregated_data_filename: string that corresponds to the name of the file that contains the aggregated data + :param input_dir: the directory where the aggregated data is located + :param output_dir: the directory to save the results in + :param population_windows: the number of windows to look backwards and forwards to determine the population of + active block producers for a given time period :returns: a list with the names of all the metrics that were used Using multiple projects and timeframes is necessary here to produce collective csv files. """ + logging.info('Calculating metrics on aggregated data..') metrics = hlp.get_metrics_config() metric_params = [] @@ -30,6 +35,7 @@ def analyze(projects, aggregated_data_filename, output_dir): else: metric_params.append((key, key, None)) metric_names = [name for name, _, _ in metric_params] + clustering_flag = hlp.get_clustering_flag() aggregate_output = {} @@ -42,8 +48,9 @@ def analyze(projects, aggregated_data_filename, output_dir): for column_index, project in enumerate(projects): logging.info(f'Calculating {project} metrics') aggregate_output[project] = {} - aggregated_data_dir = output_dir / project / 'blocks_per_entity' - dates, blocks_per_entity = hlp.get_blocks_per_entity_from_file(aggregated_data_dir / aggregated_data_filename) + aggregated_data_dir = input_dir / project / hlp.get_aggregated_data_dir_name(clustering_flag) + dates, blocks_per_entity = hlp.get_blocks_per_entity_from_file(aggregated_data_dir / + aggregated_data_filename, population_windows) for date in dates: aggregate_output[project][date] = {} @@ -80,7 +87,6 @@ def analyze(projects, aggregated_data_filename, output_dir): csv_writer = csv.writer(f) csv_writer.writerows(csv_contents[metric]) - clustering_flag = hlp.get_config_data()['analyze_flags']['clustering'] aggregate_csv_output = [['ledger', 'date', 'clustering'] + metric_names] for project, timeframes in aggregate_output.items(): for date, results in timeframes.items(): diff --git a/consensus_decentralization/helper.py b/consensus_decentralization/helper.py index 9400d4b..443073a 100644 --- a/consensus_decentralization/helper.py +++ b/consensus_decentralization/helper.py @@ -14,8 +14,9 @@ ROOT_DIR = pathlib.Path(__file__).resolve().parent.parent RAW_DATA_DIR = ROOT_DIR / 'raw_block_data' -OUTPUT_DIR = ROOT_DIR / 'output' +INTERIM_DIR = ROOT_DIR / 'processed_data' MAPPING_INFO_DIR = ROOT_DIR / 'mapping_information' +RESULTS_DIR = ROOT_DIR / 'results' with open(ROOT_DIR / "config.yaml") as f: config = safe_load(f) @@ -190,11 +191,13 @@ def write_blocks_per_entity_to_file(output_dir, blocks_per_entity, dates, filena csv_writer.writerow(entity_row) -def get_blocks_per_entity_from_file(filepath): +def get_blocks_per_entity_from_file(filepath, population_windows): """ Retrieves information about the number of blocks that each entity produced over some timeframe for some project. :param filepath: the path to the file with the relevant information. It can be either an absolute or a relative path in either a pathlib.PosixPath object or a string. + :param population_windows: int representing the number of windows to look back and forward when determining if an + entity is active during a certain time frame :returns: a tuple of length 2 where the first item is a list of time chunks (strings) and the second item is a dictionary with entities (keys) and a list of the number of blocks they produced during each time chunk (values) """ @@ -206,7 +209,17 @@ def get_blocks_per_entity_from_file(filepath): for row in csv_reader: entity = row[0] for idx, item in enumerate(row[1:]): - if item != '0': + if item == '0': + if population_windows == 'all': + blocks_per_entity[entity][dates[idx]] = 0 + else: + # If the entity hasn't produced any blocks in the current time chunk, we only consider it as + # active if it has produced at least one block in population_windows time chunks before or after + # (otherwise it's not considered part of the population for this time frame) + for i in range(max(0, idx - population_windows), min(len(row) - 1, idx + population_windows + 1)): + if row[i + 1] != '0': + blocks_per_entity[entity][dates[idx]] = 0 + else: blocks_per_entity[entity][dates[idx]] = int(item) return dates, blocks_per_entity @@ -294,7 +307,7 @@ def read_mapped_project_data(project_dir): :param project_dir: pathlib.PosixPath object of the output directory corresponding to the project :returns: a dictionary with the mapped data """ - with open(project_dir / 'mapped_data.json') as f: + with open(project_dir / get_mapped_data_filename(get_clustering_flag())) as f: data = json.load(f) return data @@ -309,6 +322,15 @@ def get_representative_dates(time_chunks): return [str(chunk[0] + (chunk[1] - chunk[0]) // 2) for chunk in time_chunks] +def get_aggregated_data_dir_name(clustering_flag): + """ + Determines the name of the directory that will contain the aggregated data + :param clustering_flag: boolean that determines whether the data is clustered or not + :returns: str that corresponds to the name of the directory + """ + return 'blocks_per_entity_' + ('clustered' if clustering_flag else 'non_clustered') + + def get_blocks_per_entity_filename(timeframe, estimation_window, frequency): """ Determines the filename of the csv file that contains the aggregated data @@ -363,6 +385,21 @@ def get_estimation_window_and_frequency(): raise ValueError('"estimation_window" or "frequency" missing from config file') +def get_population_windows(): + """ + Retrieves the number of windows to be used for estimating the population of block producers + :returns: int representing the number of windows to look back and forward when determining if an entity is active + during a certain time frame + :raises ValueError: if the population_windows field is missing from the config file + """ + try: + config = get_config_data() + population_windows = config['population_windows'] + return population_windows + except KeyError: + raise ValueError('"population_windows" missing from config file') + + def get_plot_flag(): """ Gets the flag that determines whether generate plots for the output @@ -395,3 +432,35 @@ def get_force_map_flag(): return config['execution_flags']['force_map'] except KeyError: raise ValueError('Flag "force_map" missing from config file') + + +def get_clustering_flag(): + """ + Gets the flag that determines whether to perform clustering + :returns: boolean + :raises ValueError: if the flag is not set in the config file + """ + config = get_config_data() + try: + return config['analyze_flags']['clustering'] + except KeyError: + raise ValueError('Flag "clustering" missing from config file') + + +def get_results_dir(estimation_window, frequency, population_windows): + """ + Retrieves the path to the results directory for the specific config parameters + :returns: pathlib.PosixPath object + """ + results_dir_name = (f'{estimation_window}_day_window_with_{population_windows}_population_windows_sampled_every' + f'_{frequency}_days') + return RESULTS_DIR / results_dir_name + + +def get_mapped_data_filename(clustering_flag): + """ + Retrieves the filename of the mapped data file + :param clustering_flag: boolean that determines whether the data is clustered or not + :returns: str + """ + return 'mapped_data_' + ('clustered' if clustering_flag else 'non_clustered') + '.json' diff --git a/consensus_decentralization/mappings/default_mapping.py b/consensus_decentralization/mappings/default_mapping.py index 87d7497..83b9ddc 100644 --- a/consensus_decentralization/mappings/default_mapping.py +++ b/consensus_decentralization/mappings/default_mapping.py @@ -1,4 +1,5 @@ import json + import consensus_decentralization.helper as hlp @@ -9,8 +10,6 @@ class DefaultMapping: :ivar project_name: the name of the project associated with a specific mapping instance :ivar output_dir: the directory that includes the parsed data related to the project - :ivar mapped_data_dir: the directory to save the mapped data files in - :ivar multi_pool_dir: the directory to save the multi pool data files in :ivar data_to_map: a list with the parsed data of the project (list of dictionaries with block information :ivar special_addresses: a set with the special addresses of the project (addresses that don't count in the context of out analysis) @@ -45,7 +44,7 @@ def perform_mapping(self): project. :returns: a list of dictionaries (mapped block data) """ - clustering_flag = hlp.get_config_data()['analyze_flags']['clustering'] + clustering_flag = hlp.get_clustering_flag() for block in self.data_to_map: if not clustering_flag: entity = self.fallback_mapping(block) @@ -83,7 +82,7 @@ def perform_mapping(self): }) if len(self.mapped_data) > 0: - self.write_mapped_data() + self.write_mapped_data(clustering_flag) self.write_multi_pool_files() return self.mapped_data @@ -187,11 +186,12 @@ def write_multi_pool_files(self): with open(self.output_dir / 'multi_pool_blocks.csv', 'w') as f: f.write('Block No,Timestamp,Entities\n' + '\n'.join(self.multi_pool_blocks)) - def write_mapped_data(self): + def write_mapped_data(self, clustering_flag): """ Writes the mapped data into a file in a directory associated with the mapping instance. Specifically, into a folder named after the project, inside the general output directory + :param clustering_flag: boolean, indicating whether clustering was used in the mapping process """ - filename = 'mapped_data.json' + filename = hlp.get_mapped_data_filename(clustering_flag) with open(self.output_dir / filename, 'w') as f: json.dump(self.mapped_data, f, indent=4) diff --git a/consensus_decentralization/mappings/dummy_mapping.py b/consensus_decentralization/mappings/dummy_mapping.py index f435813..91443dc 100644 --- a/consensus_decentralization/mappings/dummy_mapping.py +++ b/consensus_decentralization/mappings/dummy_mapping.py @@ -1,4 +1,5 @@ from consensus_decentralization.mappings.default_mapping import DefaultMapping +import consensus_decentralization.helper as hlp class DummyMapping(DefaultMapping): @@ -28,6 +29,6 @@ def perform_mapping(self): }) if len(self.mapped_data) > 0: - self.write_mapped_data() + self.write_mapped_data(hlp.get_clustering_flag()) return self.mapped_data diff --git a/consensus_decentralization/metrics/concentration_ratio.py b/consensus_decentralization/metrics/concentration_ratio.py new file mode 100644 index 0000000..816bb89 --- /dev/null +++ b/consensus_decentralization/metrics/concentration_ratio.py @@ -0,0 +1,9 @@ +def compute_concentration_ratio(block_distribution, topn): + """ + Calculates the n-concentration ratio of a distribution of balances + :param block_distribution: a list of integers, each being the blocks that an entity has produced, sorted in descending order + :param topn: the number of top block producers to consider + :returns: float that represents the ratio of blocks produced by the top n block producers (0 if there weren't any) + """ + total_blocks = sum(block_distribution) + return sum(block_distribution[:topn]) / total_blocks if total_blocks else 0 diff --git a/consensus_decentralization/metrics/max_power_ratio.py b/consensus_decentralization/metrics/max_power_ratio.py deleted file mode 100644 index 520970c..0000000 --- a/consensus_decentralization/metrics/max_power_ratio.py +++ /dev/null @@ -1,8 +0,0 @@ -def compute_max_power_ratio(block_distribution): - """ - Calculates the maximum power ratio of a distribution of balances - :param block_distribution: a list of integers, each being the blocks that an entity has produced, sorted in descending order - :returns: float that represents the maximum power ratio among all block producers (0 if there weren't any) - """ - total_blocks = sum(block_distribution) - return block_distribution[0] / total_blocks if total_blocks else 0 diff --git a/consensus_decentralization/parsers/dummy_parser.py b/consensus_decentralization/parsers/dummy_parser.py index 6023fd0..57ea3a6 100644 --- a/consensus_decentralization/parsers/dummy_parser.py +++ b/consensus_decentralization/parsers/dummy_parser.py @@ -24,6 +24,7 @@ def parse(self): directory associated with the parser instance (specifically in /) """ data = self.read_and_sort_data() + for block in data: if 'identifiers' not in block.keys(): block['identifiers'] = None @@ -31,4 +32,4 @@ def parse(self): block['identifiers'] = self.parse_identifiers(block['identifiers']) if 'reward_addresses' not in block.keys(): block['reward_addresses'] = None - return data + yield block diff --git a/consensus_decentralization/parsers/ethereum_parser.py b/consensus_decentralization/parsers/ethereum_parser.py index 987845b..5739ebb 100644 --- a/consensus_decentralization/parsers/ethereum_parser.py +++ b/consensus_decentralization/parsers/ethereum_parser.py @@ -1,4 +1,5 @@ from consensus_decentralization.parsers.dummy_parser import DummyParser +import json class EthereumParser(DummyParser): @@ -20,3 +21,20 @@ def parse_identifiers(block_identifiers): return bytes.fromhex(block_identifiers[2:]).decode('utf-8') except (UnicodeDecodeError, ValueError): return block_identifiers + + def read_and_sort_data(self): + """ + Reads the "raw" block data associated with the project + :returns: a list of dictionaries (block data) sorted by timestamp + Note that the current version does not sort the data (because it is too memory-intensive) but assumes that the + data are already sorted (which is generally the case given the suggested queries). + """ + filename = f'{self.project_name}_raw_data.json' + filepath = self.input_dir / filename + + def generate_data(): + with open(filepath) as f: + for line in f: + yield json.loads(line.strip()) + + return generate_data() diff --git a/consensus_decentralization/plot.py b/consensus_decentralization/plot.py index 8a9e48a..39d50a6 100644 --- a/consensus_decentralization/plot.py +++ b/consensus_decentralization/plot.py @@ -123,7 +123,8 @@ def plot_animated_stack_area_chart(values, execution_id, path, ylabel, legend_la plt.close(fig) -def plot_dynamics_per_ledger(ledgers, aggregated_data_filename, top_k=-1, unit='relative', animated=False, legend=False): +def plot_dynamics_per_ledger(ledgers, aggregated_data_filename, output_dir, top_k=-1, unit='relative', animated=False, + legend=False): """ Plots the dynamics of pools for each ledger in terms of produced blocks :param ledgers: list of strings representing the ledgers whose data will be plotted @@ -137,13 +138,12 @@ def plot_dynamics_per_ledger(ledgers, aggregated_data_filename, top_k=-1, unit=' :param legend: bool that specifies whether the plots to be generated will include a legend or not """ for ledger in ledgers: - ledger_path = hlp.OUTPUT_DIR / ledger - figures_path = ledger_path / 'figures' - if not figures_path.is_dir(): - figures_path.mkdir() + ledger_path = hlp.INTERIM_DIR / ledger + figures_path = output_dir / ledger + figures_path.mkdir(parents=True, exist_ok=True) time_chunks, blocks_per_entity = hlp.get_blocks_per_entity_from_file( - filepath=ledger_path / "blocks_per_entity" / aggregated_data_filename + filepath=ledger_path / hlp.get_aggregated_data_dir_name(hlp.get_clustering_flag()) / aggregated_data_filename ) total_blocks_per_time_chunk = [0] * len(time_chunks) @@ -213,13 +213,10 @@ def plot_dynamics_per_ledger(ledgers, aggregated_data_filename, top_k=-1, unit=' ) -def plot_comparative_metrics(ledgers, metrics, animated=False): +def plot_comparative_metrics(ledgers, metrics, metrics_dir, output_dir, animated=False): for metric in metrics: - figures_path = hlp.OUTPUT_DIR / 'figures' - if not figures_path.is_dir(): - figures_path.mkdir() - filename = f'{metric}.csv' - metric_df = pd.read_csv(hlp.OUTPUT_DIR / filename) + metric_filepath = metrics_dir / f'{metric}.csv' + metric_df = pd.read_csv(metric_filepath) # only keep rows that contain at least one (non-nan) value in the columns that correspond to the ledgers metric_df = metric_df[metric_df.iloc[:, 1:].notna().any(axis=1)] ledger_columns_to_keep = [col for col in metric_df.columns if col in ledgers] @@ -233,7 +230,7 @@ def plot_comparative_metrics(ledgers, metrics, animated=False): x_label='Time', y_label=metric, filename=f"{metric}_{'_'.join(ledger_columns_to_keep)}", - path=figures_path, + path=output_dir, colors=colors ) else: @@ -242,24 +239,38 @@ def plot_comparative_metrics(ledgers, metrics, animated=False): x_label='Time', y_label=metric, filename=f"{metric}_{'_'.join(ledger_columns_to_keep)}", - path=figures_path, + path=output_dir, xtick_labels=metric_df['timeframe'], colors=colors ) -def plot(ledgers, metrics, aggregated_data_filename, animated): +def plot(ledgers, metrics, aggregated_data_filename, animated, metrics_dir, figures_dir, plot_dynamics=False): logging.info("Creating plots..") - plot_dynamics_per_ledger(ledgers=ledgers, aggregated_data_filename=aggregated_data_filename, animated=False, legend=True) - plot_comparative_metrics(ledgers=ledgers, metrics=metrics, animated=False) + if plot_dynamics: + plot_dynamics_per_ledger(ledgers=ledgers, aggregated_data_filename=aggregated_data_filename, + output_dir=figures_dir, animated=False, legend=True) + plot_comparative_metrics(ledgers=ledgers, metrics=metrics, animated=False, metrics_dir=metrics_dir, + output_dir=figures_dir) if animated: - plot_dynamics_per_ledger(ledgers=ledgers, aggregated_data_filename=aggregated_data_filename, animated=True) - plot_comparative_metrics(ledgers=ledgers, metrics=metrics, animated=True) + if plot_dynamics: + plot_dynamics_per_ledger(ledgers=ledgers, aggregated_data_filename=aggregated_data_filename, + output_dir=figures_dir, animated=True) + plot_comparative_metrics(ledgers=ledgers, metrics=metrics, animated=True, metrics_dir=metrics_dir, output_dir=figures_dir) if __name__ == '__main__': ledgers = hlp.get_ledgers() - default_metrics = hlp.get_metrics_config().keys() + + metrics = hlp.get_metrics_config() + metric_params = [] + for key, args in metrics.items(): + if args: + for val in args: + metric_params.append((f'{key}={val}', key, val)) + else: + metric_params.append((key, key, None)) + default_metrics = [name for name, _, _ in metric_params] default_start_date, default_end_date = hlp.get_start_end_dates() timeframe_start = hlp.get_timeframe_beginning(default_start_date) @@ -296,4 +307,4 @@ def plot(ledgers, metrics, aggregated_data_filename, animated): help='Flag to specify whether to also generate animated plots.' ) args = parser.parse_args() - plot(ledgers=args.ledgers, metrics=args.metrics, aggregated_data_filename=args.filename, animated=args.animated) + plot(ledgers=args.ledgers, metrics=args.metrics, aggregated_data_filename=args.filename, animated=args.animated, results_dir=hlp.RESULTS_DIR) diff --git a/data_collection_scripts/collect_block_data.py b/data_collection_scripts/collect_block_data.py index 6bf7ccd..6e4b697 100644 --- a/data_collection_scripts/collect_block_data.py +++ b/data_collection_scripts/collect_block_data.py @@ -32,7 +32,7 @@ def collect_data(ledgers, from_block, to_date): for ledger in ledgers: file = RAW_DATA_DIR / f'{ledger}_raw_data.json' - logging.info(f"Querying {ledger}..") + logging.info(f"Querying {ledger} from block {from_block[ledger]} until {to_date}..") query = (queries[ledger]).replace("{{block_number}}", str(from_block[ledger]) if from_block[ledger] else "-1").replace("{{timestamp}}", to_date) query_job = client.query(query) @@ -40,9 +40,13 @@ def collect_data(ledgers, from_block, to_date): rows = query_job.result() logging.info(f'Done querying {ledger}') except Exception as e: - logging.info(f'{ledger} query failed, please make sure it is properly defined.') - logging.info(f'The following exception was raised: {repr(e)}') - continue + if 'Quota exceeded' in repr(e): + logging.info('Quota exceeded for this service account key. Aborting..') + break + else: + logging.info(f'{ledger} query failed, please make sure it is properly defined.') + logging.info(f'The following exception was raised: {repr(e)}\n') + continue logging.info(f"Writing {ledger} data to file..") # Append result to file diff --git a/data_collection_scripts/queries.yaml b/data_collection_scripts/queries.yaml index d7e817e..4a2aaf7 100644 --- a/data_collection_scripts/queries.yaml +++ b/data_collection_scripts/queries.yaml @@ -45,7 +45,6 @@ ethereum: AND number > {{block_number}} ORDER BY timestamp - litecoin: SELECT block_number as number, block_timestamp as timestamp, coinbase_param as identifiers, `bigquery-public-data.crypto_litecoin.transactions`.outputs FROM `bigquery-public-data.crypto_litecoin.transactions` @@ -56,7 +55,6 @@ litecoin: AND timestamp < '{{timestamp}}' ORDER BY timestamp - tezos: SELECT level as number, timestamp, baker as reward_addresses FROM `public-data-finance.crypto_tezos.blocks` @@ -74,4 +72,3 @@ zcash: AND timestamp > '2018-01-01' AND timestamp < '{{timestamp}}' ORDER BY timestamp - diff --git a/docs/aggregator.md b/docs/aggregator.md index 8905769..312d66e 100644 --- a/docs/aggregator.md +++ b/docs/aggregator.md @@ -1,10 +1,10 @@ # Aggregator -The aggregator obtains the mapped data of a ledger (from `output//mapped_data.json`) and aggregates it -over units of time that are determined based on the given `timeframe` and `aggregate_by` parameters. +The aggregator obtains the mapped data of a ledger (from `processed_data//mapped_data_<(non_)clustered>.json`) +and aggregates it over units of time that are determined based on the given `timeframe` and `aggregate_by` parameters. It then outputs a `csv` file with the distribution of blocks to entities for each time unit under consideration. -This file is saved in the directory `output//blocks_per_entity/` and is named based on the `timeframe` -and `aggregate_by` parameters. +This file is saved in the directory `processed_data//blocks_per_entity/` and is named based on the +`timeframe` and `aggregate_by` parameters. For example, if the specified timeframe is from June 2023 to September 2023 and the aggregation is by month, then the output file would be named `monthly_from_2023-06-01_to_2023-09-30.csv` and would be structured as follows: ``` diff --git a/docs/mappings.md b/docs/mappings.md index d8ee30c..19ab4da 100644 --- a/docs/mappings.md +++ b/docs/mappings.md @@ -4,8 +4,8 @@ A mapping is responsible for linking blocks to the entities that created them. W information about the addresses that received rewards for producing some block or identifiers that are related to them, it does not contain information about the entities that control these addresses, which is where the mapping comes in. -The mapping takes as input the parsed data and outputs a file (`output//mapped_data.json`), which is -structured as follows: +The mapping takes as input the parsed data and outputs a file (`processed_data//mapped_data.json`), +which is structured as follows: ``` [ diff --git a/docs/metrics.md b/docs/metrics.md index 25f29d7..54cc3a8 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -29,8 +29,9 @@ The metrics that have been implemented so far are the following: or the redundancy, in a population. In practice, it is calculated as the maximum possible entropy minus the observed entropy. The output is a real number. Values close to 0 indicate equality and values towards infinity indicate inequality. Therefore, a high Theil Index suggests a population that is highly centralized. -6. **Max power ratio**: The max power ratio represents the share of blocks that are produced by the most "powerful" - entity, i.e. the entity that produces the most blocks. The output of the metric is a decimal number in [0,1]. +6. **Concentration ratio**: The n-concentration ratio represents the share of blocks that are produced by the n most + "powerful" entities, i.e. the entities that produce the most blocks. The output of the metric is a decimal + number in [0,1]. Values typically used are the 1-concentration ratio and the 3-concentration ratio. 7. **Tau-decentralization index**: The tau-decentralization index is a generalization of the Nakamoto coefficient. It is defined as the minimum number of entities that collectively produce more than a given threshold of the total blocks within a given timeframe. The threshold parameter is a decimal in [0, 1] (0.66 by default) and the output of diff --git a/docs/setup.md b/docs/setup.md index 0362085..eea3ac2 100644 --- a/docs/setup.md +++ b/docs/setup.md @@ -28,42 +28,41 @@ in the `raw_block_data/` directory, each file named as `_raw_data. By default, there is a (very small) sample input file for some supported projects; to use it, remove the prefix `sample_`. -Run `python run.py --ledgers --timeframe --estimation-window --frequency ` to analyze the n specified ledgers for the given timeframe, -aggregated using the given estimation window and frequency. -All arguments are optional, so it's possible to omit any of them; in this case, the default values -will be used. Specifically: - -- `ledgers` accepts any number of the supported ledgers (case-insensitive). For example, `--ledgers bitcoin` - would run the analysis for Bitcoin, while `--ledgers Bitcoin Ethereum Cardano` would run the analysis for Bitcoin, - Ethereum and Cardano. If the `ledgers` argument is omitted, then the analysis is performed for the ledgers - specified in the `config.yaml` file, which are typically all supported ledgers. -- The `timeframe` argument accepts one or two values of the form `YYYY-MM-DD` (month and day can be - omitted), which indicate the beginning and end of the time period that will be analyzed. For example, - `--timeframe 2022` would run the analysis for the year 2022 (so from January 1st 2022 to - December 31st 2022), while we could also get the same result using `--timeframe 2022-01 2022-12` or - `--timeframe 2022-01-01 2022-12-31`. Similarly, `--timeframe 2022-02` or `--timeframe 2022-02-01 2022-02-28` would - do it for the month of February 2022 (February 1st 2022 to February 28th 2022), while `--timeframe 2022-02-03` - would do it for a single day (Feburary 3rd 2022). Last, `--timeframe 2018 2022` would run the analysis for the - entire time period between January 1st 2018 and December 31st 2022. If the `timeframe` argument is omitted, then - the start date and end dates of the time frame are sourced from the `config.yaml` file. -- `estimation_window` corresponds to the number of days that will be used to aggregate the data. For example, - `--estimation_window 7` means that every data point will use 7 days of blocks to calculate the distribution of - blocks to entities. If left empty, then the entire time frame will be used (only valid when combined with empty frequency). - - `frequency` determines how frequently to sample the data, in days. If left empty, then only one data point will be - analyzed (snapshot instead of longitudinal analysis), but this is only valid when combined with an empty estimation_window. - -Additionally, there are three flags that can be used to customize an execution: - -- `--force-map` forces the parsing, mapping and aggregation to be performed on all data, even if the relevant output - files already exist. This can be useful for when mapping info is updated for some blockchain. By default, this flag is - set to False and the tool only performs the mapping and aggregation when the relevant output files do not exist. -- `--plot` enables the generation of graphs at the end of the execution. Specifically, the output of each +Run `python run.py` to run the analysis with the parameters specified in the `config.yaml` file. + +The parameters that can be specified in the `config.yaml` file are: + +- `metrics`: a list with the metrics that will be calculated. By default, includes all implemented metrics. +- `ledgers`: a list with the ledgers that will be analyzed. By default, includes all supported ledgers. +- `force-map`: a flag that can force the parsing, mapping and aggregation to be performed on all data, even if the + relevant output files already exist. This can be useful for when mapping info is updated for some blockchain. By + default, this flag is set to False and the tool only performs the mapping and aggregation when the relevant output + files do not exist. +- `clustering`: a flag that specifies whether block producers will be clustered based on the available mapping + information. By default, this flag is set to True. +- `start_date`: a value of the form `YYYY-MM-DD` (month and day can be omitted), which indicates the beginning of the + time period that will be analyzed. +- `end_date`: a value of the form `YYYY-MM-DD` (month and day can be omitted), which indicates the end of the time + period that will be analyzed. +- `estimation_window`: the number of days that will be used to aggregate the data. For example, + `estimation_window 7` means that every data point will use 7 days of blocks to calculate the distribution of + blocks to entities. If left empty, then the entire time frame will be used (only valid when combined with empty + frequency). +- `frequency`: number of days that determines how frequently to sample the data. If left empty, then only one data + point will be analyzed (snapshot instead of longitudinal analysis), but this is only valid when combined with an + empty estimation_window. +- `population_windows`: number that defines the number of windows to look back and forward when calculating the + population of block producers. For example, `population_windows 3`, combined with `estimation_window 7` means that the + population of block producers will be calculated using the blocks produced in the 3 weeks before and after each + week under consideration. If `all` is specified, then the entire time frame will be used to determine the population. +- `plot`: a flag that enables the generation of graphs at the end of the execution. Specifically, the output of each implemented metric is plotted for the specified ledgers and timeframe, as well as the block production dynamics for each specified ledger. By default, this flag is set to False and no plots are generated. -- `--animated` enables the generation of (additional) animated graphs at the end of the execution. By default, this flag -is set to False and no animated plots are generated. Note that this flag is ignored if `--plot` is set to False. +- `animated`: a flag that enables the generation of (additional) animated graphs at the end of the execution. By + default, this flag is set to False and no animated plots are generated. Note that this flag is ignored if `plot` is + set to False. -All output files can then be found under the `output/` directory, which is automatically created the first time the tool -is run. +All output files can then be found under the `results/` directory, which is automatically created the first time the +tool is run. Interim files that are produced by some modules and are used by others can be found under the +`processed_data/` directory. diff --git a/run.py b/run.py index 7a93b5c..2883bb3 100644 --- a/run.py +++ b/run.py @@ -10,13 +10,16 @@ def process_data(force_map, ledger_dir, ledger, output_dir): - mapped_data_file = ledger_dir / 'mapped_data.json' + clustering_flag = hlp.get_clustering_flag() + mapped_data_file = ledger_dir / hlp.get_mapped_data_filename(clustering_flag) if force_map or not mapped_data_file.is_file(): parsed_data = parse(ledger, input_dir=hlp.RAW_DATA_DIR) - apply_mapping(ledger, parsed_data=parsed_data, output_dir=output_dir) + return apply_mapping(ledger, parsed_data=parsed_data, output_dir=output_dir) + return None -def main(ledgers, timeframe, estimation_window, frequency, output_dir=hlp.OUTPUT_DIR): +def main(ledgers, timeframe, estimation_window, frequency, population_windows, interim_dir=hlp.INTERIM_DIR, + results_dir=hlp.RESULTS_DIR): """ Executes the entire pipeline (parsing, mapping, analyzing) for some projects and timeframes. :param ledgers: list of strings that correspond to the ledgers whose data should be analyzed @@ -27,41 +30,50 @@ def main(ledgers, timeframe, estimation_window, frequency, output_dir=hlp.OUTPUT :param frequency: int or None. The number of days to consider for the frequency of the analysis (i.e. the number of days between each data point considered in the analysis). If None, only one data point will be considered, spanning the entire timeframe (i.e. it needs to be combined with None estimation_window). - :param output_dir: pathlib.PosixPath object of the directory where the output data will be saved + :param interim_dir: pathlib.PosixPath object of the directory where the output data will be saved """ logging.info(f"The ledgers that will be analyzed are: {','.join(ledgers)}") force_map = hlp.get_force_map_flag() for ledger in ledgers: - ledger_dir = output_dir / ledger + ledger_dir = interim_dir / ledger ledger_dir.mkdir(parents=True, exist_ok=True) # create ledger output directory if it doesn't already exist - process_data(force_map, ledger_dir, ledger, output_dir) + mapped_data = process_data(force_map, ledger_dir, ledger, interim_dir) aggregate( ledger, - output_dir, + interim_dir, timeframe, estimation_window, frequency, - force_map + force_map, + mapped_data=mapped_data ) aggregated_data_filename = hlp.get_blocks_per_entity_filename(timeframe, estimation_window, frequency) + metrics_dir = results_dir / 'metrics' + metrics_dir.mkdir(parents=True, exist_ok=True) used_metrics = analyze( projects=ledgers, aggregated_data_filename=aggregated_data_filename, - output_dir=output_dir + population_windows=population_windows, + input_dir=interim_dir, + output_dir=metrics_dir ) if hlp.get_plot_flag(): + figures_dir = results_dir / 'figures' + figures_dir.mkdir(parents=True, exist_ok=True) plot( ledgers=ledgers, metrics=used_metrics, aggregated_data_filename=aggregated_data_filename, - animated=hlp.get_plot_config_data()['animated'] + animated=hlp.get_plot_config_data()['animated'], + metrics_dir=metrics_dir, + figures_dir=figures_dir ) @@ -69,6 +81,10 @@ def main(ledgers, timeframe, estimation_window, frequency, output_dir=hlp.OUTPUT ledgers = hlp.get_ledgers() estimation_window, frequency = hlp.get_estimation_window_and_frequency() + population_windows = hlp.get_population_windows() + + results_dir = hlp.get_results_dir(estimation_window, frequency, population_windows) + results_dir.mkdir(parents=True, exist_ok=True) start_date, end_date = hlp.get_start_end_dates() timeframe_start = hlp.get_timeframe_beginning(start_date) @@ -78,6 +94,6 @@ def main(ledgers, timeframe, estimation_window, frequency, output_dir=hlp.OUTPUT 'the first date.') timeframe = (timeframe_start, timeframe_end) - main(ledgers, timeframe, estimation_window, frequency) + main(ledgers, timeframe, estimation_window, frequency, population_windows, results_dir=results_dir) logging.info('Done. Please check the output directory for results.') diff --git a/tests/test_aggregate.py b/tests/test_aggregate.py index fb4d02e..e80e681 100644 --- a/tests/test_aggregate.py +++ b/tests/test_aggregate.py @@ -2,8 +2,9 @@ import json import shutil import pytest -from consensus_decentralization.helper import OUTPUT_DIR +from consensus_decentralization.helper import INTERIM_DIR from consensus_decentralization.aggregate import aggregate, Aggregator, divide_timeframe +from consensus_decentralization.helper import get_clustering_flag @pytest.fixture @@ -14,7 +15,9 @@ def setup_and_cleanup(): after (cleanup) """ # Set up - test_io_dir = OUTPUT_DIR / "test_output" + test_io_dir = INTERIM_DIR / "test_output" + # Mock return value of get_clustering_flag + get_clustering_flag.return_value = True yield test_io_dir # Clean up shutil.rmtree(test_io_dir) @@ -41,7 +44,7 @@ def mock_sample_bitcoin_mapped_data(setup_and_cleanup): '{"number": "649064", "timestamp": "2020-09-20 11:17:00 UTC", "reward_addresses": "0000000000000000000000000000000000000000", "creator": "TEST2", "mapping_method": "known_identifiers"},' \ '{"number": "682736", "timestamp": "2021-05-09 11:12:32 UTC", "reward_addresses": "18cBEMRxXHqzWWCxZNtU91F5sbUNKhL5PX", "creator": "ViaBTC", "mapping_method": "known_identifiers"}' \ ']' - with open(test_bitcoin_dir / 'mapped_data.json', 'w') as f: + with open(test_bitcoin_dir / 'mapped_data_clustered.json', 'w') as f: f.write(mapped_data) return json.loads(mapped_data) @@ -64,7 +67,7 @@ def mock_sample_ethereum_mapped_data(setup_and_cleanup): '{"number":"11184329","timestamp":"2020-11-03 12:56:41 UTC","reward_addresses":"0x8595dd9e0438640b5e1254f9df579ac12a86865f","creator":"TEST", "mapping_method": "known_identifiers"},' \ '{"number":"11183793","timestamp":"2020-11-03 10:56:07 UTC","reward_addresses":"0x8595dd9e0438640b5e1254f9df579ac12a86865f","creator":"TEST", "mapping_method": "known_identifiers"}' \ ']' - with open(test_ethereum_dir / 'mapped_data.json', 'w') as f: + with open(test_ethereum_dir / 'mapped_data_clustered.json', 'w') as f: f.write(mapped_data) @@ -81,7 +84,7 @@ def mock_sample_cardano_mapped_data(setup_and_cleanup): '{"number":"00000000000","timestamp":"2020-12-31T06:42:00","creator":"Arrakis", "mapping_method": "known_identifiers"},' \ '{"number":"55555555555","timestamp":"2020-12-31T06:42:01","creator":"1percentpool", "mapping_method": "known_identifiers"}' \ ']' - with open(test_cardano_dir / 'mapped_data.json', 'w') as f: + with open(test_cardano_dir / 'mapped_data_clustered.json', 'w') as f: f.write(mapped_data) @@ -99,7 +102,7 @@ def mock_sample_tezos_mapped_data(setup_and_cleanup): '{"number": "1650474", "timestamp": "2021-08-30 06:11:58 UTC", "reward_addresses": "tz1Vd1rXpV8hTHbFXCXN3c3qzCsgcU5BZw1e", "creator": "TEST", "mapping_method": "known_addresses"},' \ '{"number": "1651794", "timestamp": "2021-08-30 17:41:08 UTC", "reward_addresses": "None", "creator": "----- UNDEFINED BLOCK PRODUCER -----", "mapping_method": "fallback_mapping"}' \ ']' - with open(test_tezos_dir / 'mapped_data.json', 'w') as f: + with open(test_tezos_dir / 'mapped_data_clustered.json', 'w') as f: f.write(mapped_data) @@ -109,7 +112,7 @@ def test_aggregate(setup_and_cleanup, mock_sample_bitcoin_mapped_data): timeframe = (datetime.date(2010, 1, 1), datetime.date(2010, 12, 31)) aggregate(project='sample_bitcoin', output_dir=test_io_dir, timeframe=timeframe, estimation_window=31, frequency=31, force_aggregate=True) - output_file = test_io_dir / ('sample_bitcoin/blocks_per_entity/31_day_window_from_2010-01-01_to_2010-12' + output_file = test_io_dir / ('sample_bitcoin/blocks_per_entity_clustered/31_day_window_from_2010-01-01_to_2010-12' '-31_sampled_every_31_days.csv') assert output_file.is_file() # there is no data from 2010 in the sample but the aggregator still creates the file when called with this timeframe @@ -119,20 +122,20 @@ def test_aggregate(setup_and_cleanup, mock_sample_bitcoin_mapped_data): aggregate(project='sample_bitcoin', output_dir=test_io_dir, timeframe=timeframe, estimation_window=30, frequency=30, force_aggregate=True) - output_file = test_io_dir / 'sample_bitcoin/blocks_per_entity/30_day_window_from_2018-02-01_to_2018-02-28_sampled_every_30_days.csv' + output_file = test_io_dir / 'sample_bitcoin/blocks_per_entity_clustered/30_day_window_from_2018-02-01_to_2018-02-28_sampled_every_30_days.csv' assert not output_file.is_file() timeframe = (datetime.date(2018, 3, 1), datetime.date(2018, 3, 31)) aggregate(project='sample_bitcoin', output_dir=test_io_dir, timeframe=timeframe, estimation_window=31, frequency=31, force_aggregate=True) - output_file = test_io_dir / ('sample_bitcoin/blocks_per_entity/31_day_window_from_2018-03-01_to_2018-03' + output_file = test_io_dir / ('sample_bitcoin/blocks_per_entity_clustered/31_day_window_from_2018-03-01_to_2018-03' '-31_sampled_every_31_days.csv') assert output_file.is_file() timeframe = (datetime.date(2021, 1, 1), datetime.date(2021, 12, 31)) aggregate(project='sample_bitcoin', output_dir=test_io_dir, timeframe=timeframe, estimation_window=31, frequency=31, force_aggregate=True) - output_file = test_io_dir / ('sample_bitcoin/blocks_per_entity/31_day_window_from_2021-01-01_to_2021-12' + output_file = test_io_dir / ('sample_bitcoin/blocks_per_entity_clustered/31_day_window_from_2021-01-01_to_2021-12' '-31_sampled_every_31_days.csv') assert output_file.is_file() @@ -172,7 +175,7 @@ def test_bitcoin_aggregation(setup_and_cleanup, mock_sample_bitcoin_mapped_data) 'GBMiners': '2\n' } - output_file = test_io_dir / ('sample_bitcoin/blocks_per_entity/30_day_window_from_2018-02-01_to_2018-03' + output_file = test_io_dir / ('sample_bitcoin/blocks_per_entity_clustered/30_day_window_from_2018-02-01_to_2018-03' '-02_sampled_every_30_days.csv') with open(output_file) as f: for line in f.readlines(): @@ -194,7 +197,7 @@ def test_bitcoin_aggregation(setup_and_cleanup, mock_sample_bitcoin_mapped_data) 'Bitmain': '1\n' } - output_file = test_io_dir / ('sample_bitcoin/blocks_per_entity/all_from_2020-01-01_to_2020-12-31.csv') + output_file = test_io_dir / ('sample_bitcoin/blocks_per_entity_clustered/all_from_2020-01-01_to_2020-12-31.csv') with open(output_file) as f: for line in f.readlines(): col_1, col_2 = line.split(',') @@ -220,7 +223,7 @@ def test_ethereum_aggregation(setup_and_cleanup, mock_sample_ethereum_mapped_dat '0x45133a7e1cc7e18555ae8a4ee632a8a61de90df6': '1\n' } - output_file = test_io_dir / ('sample_ethereum/blocks_per_entity/30_day_window_from_2020-11-01_to_2020-11' + output_file = test_io_dir / ('sample_ethereum/blocks_per_entity_clustered/30_day_window_from_2020-11-01_to_2020-11' '-30_sampled_every_30_days.csv') with open(output_file) as f: for line in f.readlines(): @@ -241,7 +244,7 @@ def test_ethereum_aggregation(setup_and_cleanup, mock_sample_ethereum_mapped_dat 'MEV Builder: 0x3B...436': '1\n' } - output_file = test_io_dir / ('sample_ethereum/blocks_per_entity/365_day_window_from_2023-01-01_to_2023-12' + output_file = test_io_dir / ('sample_ethereum/blocks_per_entity_clustered/365_day_window_from_2023-01-01_to_2023-12' '-31_sampled_every_365_days.csv') with open(output_file) as f: for line in f.readlines(): @@ -270,7 +273,7 @@ def test_cardano_aggregation(setup_and_cleanup, mock_sample_cardano_mapped_data) '1percentpool': '1\n' } - output_file = test_io_dir / ('sample_cardano/blocks_per_entity/31_day_window_from_2020-12-01_to_2020-12' + output_file = test_io_dir / ('sample_cardano/blocks_per_entity_clustered/31_day_window_from_2020-12-01_to_2020-12' '-31_sampled_every_31_days.csv') with open(output_file) as f: for line in f.readlines(): @@ -298,7 +301,7 @@ def test_tezos_aggregation(setup_and_cleanup, mock_sample_tezos_mapped_data): '----- UNDEFINED BLOCK PRODUCER -----': '1\n' } - output_file = test_io_dir / ('sample_tezos/blocks_per_entity/31_day_window_from_2021-08-01_to_2021-08' + output_file = test_io_dir / ('sample_tezos/blocks_per_entity_clustered/31_day_window_from_2021-08-01_to_2021-08' '-31_sampled_every_31_days.csv') with open(output_file) as f: for line in f.readlines(): @@ -319,7 +322,7 @@ def test_tezos_aggregation(setup_and_cleanup, mock_sample_tezos_mapped_data): 'tz0000000000000000000000000000000000': '1\n' } - output_file = test_io_dir / ('sample_tezos/blocks_per_entity/365_day_window_from_2018-01-01_to_2018-12' + output_file = test_io_dir / ('sample_tezos/blocks_per_entity_clustered/365_day_window_from_2018-01-01_to_2018-12' '-31_sampled_every_365_days.csv') with open(output_file) as f: for line in f.readlines(): diff --git a/tests/test_analyze.py b/tests/test_analyze.py index 0ae323b..5164125 100644 --- a/tests/test_analyze.py +++ b/tests/test_analyze.py @@ -1,6 +1,6 @@ import shutil import pytest -from consensus_decentralization.helper import OUTPUT_DIR +from consensus_decentralization.helper import INTERIM_DIR, get_clustering_flag from consensus_decentralization.analyze import analyze @@ -12,7 +12,7 @@ def setup_and_cleanup(): after (cleanup) """ # Set up - test_io_dir = OUTPUT_DIR / "test_output" + test_io_dir = INTERIM_DIR / "test_output" test_bitcoin_dir = test_io_dir / "sample_bitcoin" test_bitcoin_dir.mkdir(parents=True, exist_ok=True) # create files that would be the output of aggregation @@ -32,11 +32,16 @@ def setup_and_cleanup(): 'year_from_2010-01-01_to_2010-12-31': 'Entity \\ Date,2010\n' } - aggregated_data_path = test_bitcoin_dir / 'blocks_per_entity' + aggregated_data_path = test_bitcoin_dir / 'blocks_per_entity_clustered' aggregated_data_path.mkdir(parents=True, exist_ok=True) for filename, content in csv_per_file.items(): - with open(test_bitcoin_dir / f'blocks_per_entity/{filename}.csv', 'w') as f: + with open(aggregated_data_path / f'{filename}.csv', 'w') as f: f.write(content) + # Create metrics directory + metrics_dir = test_io_dir / "metrics" + metrics_dir.mkdir(parents=True, exist_ok=True) + # Mock return value of get_clustering_flag + get_clustering_flag.return_value = True yield test_io_dir # Clean up shutil.rmtree(test_io_dir) @@ -49,12 +54,14 @@ def test_analyze(setup_and_cleanup): analyze( projects=projects, aggregated_data_filename='year_from_2018-01-01_to_2018-12-31.csv', - output_dir=test_output_dir + input_dir=test_output_dir, + output_dir=test_output_dir / 'metrics', + population_windows=0 ) metrics = ['gini', 'nakamoto_coefficient', 'entropy=1'] for metric in metrics: - output_file = test_output_dir / f'{metric}.csv' + output_file = test_output_dir / 'metrics' / f'{metric}.csv' assert output_file.is_file() with open(output_file) as f: @@ -70,12 +77,14 @@ def test_analyze(setup_and_cleanup): analyze( projects=projects, aggregated_data_filename='month_from_2018-02-01_to_2018-03-31.csv', - output_dir=test_output_dir + input_dir=test_output_dir, + output_dir=test_output_dir / 'metrics', + population_windows=0 ) metrics = ['gini', 'nakamoto_coefficient', 'entropy=1'] for metric in metrics: - output_file = test_output_dir / f'{metric}.csv' + output_file = test_output_dir / 'metrics' / f'{metric}.csv' assert output_file.is_file() with open(output_file) as f: @@ -94,12 +103,14 @@ def test_analyze(setup_and_cleanup): analyze( projects=projects, aggregated_data_filename='year_from_2010-01-01_to_2010-12-31.csv', - output_dir=test_output_dir + input_dir=test_output_dir, + output_dir=test_output_dir / 'metrics', + population_windows=0 ) metrics = ['gini', 'nakamoto_coefficient', 'entropy=1'] for metric in metrics: - output_file = test_output_dir / f'{metric}.csv' + output_file = test_output_dir / 'metrics' / f'{metric}.csv' assert output_file.is_file() with open(output_file) as f: diff --git a/tests/test_end_to_end.py b/tests/test_end_to_end.py index 77ea7eb..9cad933 100644 --- a/tests/test_end_to_end.py +++ b/tests/test_end_to_end.py @@ -9,7 +9,7 @@ from consensus_decentralization.map import ledger_mapping from consensus_decentralization.mappings.default_mapping import DefaultMapping from consensus_decentralization.mappings.cardano_mapping import CardanoMapping -from consensus_decentralization.helper import OUTPUT_DIR, config +from consensus_decentralization.helper import INTERIM_DIR, config import pytest @@ -21,7 +21,8 @@ def setup_and_cleanup(): after (cleanup) """ # Set up - test_output_dir = OUTPUT_DIR / "test_output" + test_output_dir = INTERIM_DIR / "test_output" + test_metrics_subdir = test_output_dir / "metrics" ledger_mapping['sample_bitcoin'] = DefaultMapping ledger_parser['sample_bitcoin'] = DefaultParser ledger_mapping['sample_cardano'] = CardanoMapping @@ -29,6 +30,7 @@ def setup_and_cleanup(): force_map_flag = config['execution_flags']['force_map'] config['execution_flags']['force_map'] = True + config['analyze_flags']['clustering'] = True mapping_info_dir = pathlib.Path(__file__).resolve().parent.parent / 'mapping_information' for project in ['bitcoin', 'cardano']: @@ -53,7 +55,7 @@ def setup_and_cleanup(): ) except FileNotFoundError: pass - yield test_output_dir + yield test_output_dir, test_metrics_subdir # Clean up shutil.rmtree(test_output_dir) for project in ['sample_bitcoin', 'sample_cardano']: @@ -74,21 +76,23 @@ def setup_and_cleanup(): def test_end_to_end(setup_and_cleanup): - test_output_dir = setup_and_cleanup + test_output_dir, test_metrics_dir = setup_and_cleanup main( ['sample_bitcoin', 'sample_cardano'], (datetime.date(2010, 1, 1), datetime.date(2010, 12, 31)), estimation_window=None, frequency=None, - output_dir=test_output_dir + interim_dir=test_output_dir, + results_dir=test_output_dir, + population_windows=0 ) expected_entropy = [ 'timeframe,sample_bitcoin,sample_cardano\n', '2010-07-02,,\n' ] - with open(test_output_dir / 'entropy=1.csv') as f: + with open(test_metrics_dir / 'entropy=1.csv') as f: lines = f.readlines() for idx, line in enumerate(lines): assert line == expected_entropy[idx] @@ -97,7 +101,7 @@ def test_end_to_end(setup_and_cleanup): 'timeframe,sample_bitcoin,sample_cardano\n', '2010-07-02,,\n' ] - with open(test_output_dir / 'gini.csv') as f: + with open(test_metrics_dir / 'gini.csv') as f: lines = f.readlines() for idx, line in enumerate(lines): assert line == expected_gini[idx] @@ -106,7 +110,7 @@ def test_end_to_end(setup_and_cleanup): 'timeframe,sample_bitcoin,sample_cardano\n', '2010-07-02,,\n' ] - with open(test_output_dir / 'nakamoto_coefficient.csv') as f: + with open(test_metrics_dir / 'nakamoto_coefficient.csv') as f: lines = f.readlines() for idx, line in enumerate(lines): assert line == expected_nc[idx] @@ -116,7 +120,9 @@ def test_end_to_end(setup_and_cleanup): (datetime.date(2018, 2, 1), datetime.date(2018, 3, 31)), estimation_window=30, frequency=30, - output_dir=test_output_dir + interim_dir=test_output_dir, + results_dir=test_output_dir, + population_windows=0 ) expected_entropy = [ @@ -124,7 +130,7 @@ def test_end_to_end(setup_and_cleanup): '2018-02-15,1.5,\n', '2018-03-17,0.0,\n', ] - with open(test_output_dir / 'entropy=1.csv') as f: + with open(test_metrics_dir / 'entropy=1.csv') as f: lines = f.readlines() for idx, line in enumerate(lines): assert line == expected_entropy[idx] @@ -135,7 +141,7 @@ def test_end_to_end(setup_and_cleanup): # '2018-02-15,0.375,\n', # '2018-03-17,0.75,\n' # ] - # with open(test_output_dir / 'gini.csv') as f: + # with open(test_metrics_dir / 'gini.csv') as f: # lines = f.readlines() # for idx, line in enumerate(lines): # assert line == expected_gini[idx] @@ -144,7 +150,7 @@ def test_end_to_end(setup_and_cleanup): 'timeframe,sample_bitcoin,sample_cardano\n', '2018-02-15,1,\n', '2018-03-17,1,\n' ] - with open(test_output_dir / 'nakamoto_coefficient.csv') as f: + with open(test_metrics_dir / 'nakamoto_coefficient.csv') as f: lines = f.readlines() for idx, line in enumerate(lines): assert line == expected_nc[idx] @@ -154,14 +160,16 @@ def test_end_to_end(setup_and_cleanup): (datetime.date(2020, 12, 1), datetime.date(2020, 12, 31)), estimation_window=31, frequency=31, - output_dir=test_output_dir + interim_dir=test_output_dir, + results_dir=test_output_dir, + population_windows=0 ) expected_entropy = [ 'timeframe,sample_bitcoin,sample_cardano\n', '2020-12-16,,1.9219280948873623\n' ] - with open(test_output_dir / 'entropy=1.csv') as f: + with open(test_metrics_dir / 'entropy=1.csv') as f: lines = f.readlines() for idx, line in enumerate(lines): assert line == expected_entropy[idx] @@ -170,7 +178,7 @@ def test_end_to_end(setup_and_cleanup): 'timeframe,sample_bitcoin,sample_cardano\n', '2020-12-16,,0.15\n' ] - with open(test_output_dir / 'gini.csv') as f: + with open(test_metrics_dir / 'gini.csv') as f: lines = f.readlines() for idx, line in enumerate(lines): assert line == expected_gini[idx] @@ -179,7 +187,7 @@ def test_end_to_end(setup_and_cleanup): 'timeframe,sample_bitcoin,sample_cardano\n', '2020-12-16,,2\n' ] - with open(test_output_dir / 'nakamoto_coefficient.csv') as f: + with open(test_metrics_dir / 'nakamoto_coefficient.csv') as f: lines = f.readlines() for idx, line in enumerate(lines): assert line == expected_nc[idx] diff --git a/tests/test_helper.py b/tests/test_helper.py index 149ebca..1fde0bd 100644 --- a/tests/test_helper.py +++ b/tests/test_helper.py @@ -4,7 +4,7 @@ import pytest from consensus_decentralization.helper import get_pool_identifiers, get_pool_legal_links, get_known_addresses, \ get_pool_clusters, write_blocks_per_entity_to_file, get_blocks_per_entity_from_file, get_timeframe_beginning, \ - get_timeframe_end, get_time_period, get_ledgers, valid_date, OUTPUT_DIR, get_blocks_per_entity_filename, \ + get_timeframe_end, get_time_period, get_ledgers, valid_date, INTERIM_DIR, get_blocks_per_entity_filename, \ get_representative_dates from consensus_decentralization.map import ledger_mapping @@ -17,7 +17,7 @@ def setup_and_cleanup(): after (cleanup) """ # Setting up - test_output_dir = OUTPUT_DIR / "test_output" + test_output_dir = INTERIM_DIR / "test_output" test_output_dir.mkdir(parents=True, exist_ok=True) yield test_output_dir # Cleaning up @@ -82,16 +82,25 @@ def test_committed_pool_data(): def test_write_read_blocks_per_entity(setup_and_cleanup): output_dir = setup_and_cleanup - blocks_per_entity = {'Entity 1': {'2018': 1, '2019': 3}, 'Entity 2': {'2018': 2, '2019': 2}} + blocks_per_entity = { + 'Entity 1': {'2018': 1, '2019': 3, '2020': 2, '2021': 3}, + 'Entity 2': {'2018': 2, '2019': 2, '2021': 1}, + 'Entity 3': {'2018': 2}, + 'Entity 4': {'2021': 1} + } - write_blocks_per_entity_to_file(output_dir=output_dir, blocks_per_entity=blocks_per_entity, dates=['2018', '2019'], - filename='test.csv') + write_blocks_per_entity_to_file(output_dir=output_dir, blocks_per_entity=blocks_per_entity, + dates=['2018', '2019', '2020', '2021'], filename='test.csv') - dates, bpe = get_blocks_per_entity_from_file(output_dir / 'test.csv') + dates, bpe = get_blocks_per_entity_from_file(output_dir / 'test.csv', population_windows=1) - assert all(len(nblocks) == len(dates) for nblocks in bpe.values()) - assert dates == ['2018', '2019'] - assert all([bpe['Entity 1'] == {'2018': 1, '2019': 3}, bpe['Entity 2'] == {'2018': 2, '2019': 2}]) + assert dates == ['2018', '2019', '2020', '2021'] + assert all([ + bpe['Entity 1'] == {'2018': 1, '2019': 3, '2020': 2, '2021': 3}, + bpe['Entity 2'] == {'2018': 2, '2019': 2, '2020': 0, '2021': 1}, + bpe['Entity 3'] == {'2018': 2, '2019': 0}, + bpe['Entity 4'] == {'2020': 0, '2021': 1} + ]) def test_valid_date(): diff --git a/tests/test_mappings.py b/tests/test_mappings.py index 35f9f33..b161e0f 100644 --- a/tests/test_mappings.py +++ b/tests/test_mappings.py @@ -12,7 +12,7 @@ from consensus_decentralization.mappings.ethereum_mapping import EthereumMapping from consensus_decentralization.mappings.cardano_mapping import CardanoMapping from consensus_decentralization.mappings.tezos_mapping import TezosMapping -from consensus_decentralization.helper import RAW_DATA_DIR, OUTPUT_DIR +from consensus_decentralization.helper import RAW_DATA_DIR, INTERIM_DIR, get_clustering_flag @pytest.fixture @@ -32,13 +32,15 @@ def setup_and_cleanup(): ledger_mapping['sample_tezos'] = TezosMapping ledger_parser['sample_tezos'] = DummyParser test_raw_data_dir = RAW_DATA_DIR - test_output_dir = OUTPUT_DIR / "test_output" + test_output_dir = INTERIM_DIR / "test_output" # Create the output directory for each project (as this is typically done in the run.py script before parsing or # mapping takes place) for project in ['sample_bitcoin', 'sample_ethereum', 'sample_cardano', 'sample_tezos']: test_project_output_dir = test_output_dir / project test_project_output_dir.mkdir(parents=True, exist_ok=True) mapping_info_dir = pathlib.Path(__file__).resolve().parent.parent / 'mapping_information' + # Mock return value of get_clustering_flag + get_clustering_flag.return_value = True yield mapping_info_dir, test_raw_data_dir, test_output_dir # Clean up shutil.rmtree(test_output_dir) @@ -98,7 +100,7 @@ def test_map(setup_and_cleanup, prep_sample_bitcoin_mapping_info): parsed_data = parse(project='sample_bitcoin', input_dir=test_raw_data_dir) apply_mapping(project='sample_bitcoin', parsed_data=parsed_data, output_dir=test_output_dir) - mapped_data_file = test_output_dir / 'sample_bitcoin/mapped_data.json' + mapped_data_file = test_output_dir / 'sample_bitcoin/mapped_data_clustered.json' assert mapped_data_file.is_file() @@ -125,7 +127,7 @@ def test_bitcoin_mapping(setup_and_cleanup, prep_sample_bitcoin_mapping_info): '510888': 'known_identifiers', '649064': 'known_addresses' } - with open(test_output_dir / 'sample_bitcoin/mapped_data.json') as f: + with open(test_output_dir / 'sample_bitcoin/mapped_data_clustered.json') as f: mapped_data = json.load(f) for block in mapped_data: if block['number'] in expected_block_creators: @@ -158,7 +160,7 @@ def test_ethereum_mapping(setup_and_cleanup, prep_sample_ethereum_mapping_info): '11183793': 'known_identifiers' } - with open(test_output_dir / 'sample_ethereum/mapped_data.json') as f: + with open(test_output_dir / 'sample_ethereum/mapped_data_clustered.json') as f: mapped_data = json.load(f) for block in mapped_data: if block['number'] in expected_block_creators: @@ -182,7 +184,7 @@ def test_cardano_mapping(setup_and_cleanup, prep_sample_cardano_mapping_info): '66666666666': 'known_clusters', '00000000001': 'known_addresses' } - with open(test_output_dir / 'sample_cardano/mapped_data.json') as f: + with open(test_output_dir / 'sample_cardano/mapped_data_clustered.json') as f: mapped_data = json.load(f) for block in mapped_data: if block['number'] in expected_block_creators: @@ -208,7 +210,7 @@ def test_tezos_mapping(setup_and_cleanup, prep_sample_tezos_mapping_info): '1651794': 'fallback_mapping', '0000000': 'fallback_mapping' } - with open(test_output_dir / 'sample_tezos/mapped_data.json') as f: + with open(test_output_dir / 'sample_tezos/mapped_data_clustered.json') as f: mapped_data = json.load(f) for block in mapped_data: if block['number'] in expected_block_creators: diff --git a/tests/test_metrics.py b/tests/test_metrics.py index a14fe73..2fab65c 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -1,5 +1,5 @@ from consensus_decentralization.metrics import (entropy, gini, nakamoto_coefficient, herfindahl_hirschman_index, - theil_index, max_power_ratio, tau_index, total_entities) + theil_index, concentration_ratio, tau_index, total_entities) import numpy as np @@ -72,6 +72,9 @@ def test_gini(): g6 = gini.compute_gini([0, 0]) assert g6 is None + g7 = gini.compute_gini([1]) + assert g7 == 0 + def test_nc(): nc1 = nakamoto_coefficient.compute_nakamoto_coefficient([3, 2, 1]) @@ -137,21 +140,36 @@ def test_compute_theil_index(): assert theil_t == 0 -def test_compute_max_power_ratio(): - max_mpr = max_power_ratio.compute_max_power_ratio([3, 2, 1]) - assert max_mpr == 0.5 +def test_compute_concentration_ratio(): + cr1 = concentration_ratio.compute_concentration_ratio([3, 2, 1], 1) + assert cr1 == 0.5 + + cr3 = concentration_ratio.compute_concentration_ratio([3, 2, 1], 3) + assert cr3 == 1 + + cr1 = concentration_ratio.compute_concentration_ratio([3, 2, 1, 1, 1, 1], 1) + assert cr1 == 1 / 3 + + cr3 = concentration_ratio.compute_concentration_ratio([3, 2, 1, 1, 1, 1], 3) + assert cr3 == 6/9 + + cr1 = concentration_ratio.compute_concentration_ratio([1], 1) + assert cr1 == 1 + + cr3 = concentration_ratio.compute_concentration_ratio([1], 3) + assert cr3 == 1 - max_mpr = max_power_ratio.compute_max_power_ratio([3, 2, 1, 1, 1, 1]) - assert max_mpr == 1 / 3 + cr1 = concentration_ratio.compute_concentration_ratio([1, 1, 1], 1) + assert cr1 == 1 / 3 - max_mpr = max_power_ratio.compute_max_power_ratio([1]) - assert max_mpr == 1 + cr3 = concentration_ratio.compute_concentration_ratio([1, 1, 1], 3) + assert cr3 == 1 - max_mpr = max_power_ratio.compute_max_power_ratio([1, 1, 1]) - assert max_mpr == 1 / 3 + cr1 = concentration_ratio.compute_concentration_ratio([], 1) + assert cr1 == 0 - max_mpr = max_power_ratio.compute_max_power_ratio([]) - assert max_mpr == 0 + cr3 = concentration_ratio.compute_concentration_ratio([], 3) + assert cr3 == 0 def test_tau_33():