From c8c30600f5c2464d00a7f65d39624ebfc1cca760 Mon Sep 17 00:00:00 2001 From: Jens Buss Date: Wed, 23 May 2018 13:00:04 +0200 Subject: [PATCH 1/4] set max_heap_size to 0.9 vmem and propagate max_heap_size to java call --- erna/scripts/process_fact_data.py | 2 +- erna/scripts/process_fact_mc.py | 4 ++-- erna/scripts/process_fact_run_list.py | 2 +- erna/stream_runner.py | 13 +++++++++++-- erna/stream_runner_local_output.py | 13 +++++++++++-- 5 files changed, 26 insertions(+), 8 deletions(-) diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index 60154c9..3a40545 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -22,7 +22,7 @@ def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, for num, df in df_mapping.groupby("bunch_index"): df=df.copy() df["bunch_index"] = num - job = Job(stream_runner.run, [jar, xml, df, aux_source_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) + job = Job(stream_runner.run, [jar, xml, df, aux_source_path, '{}mb'.format(0.9*vmem)], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) jobs.append(job) return jobs diff --git a/erna/scripts/process_fact_mc.py b/erna/scripts/process_fact_mc.py index 9a445a1..70669a9 100644 --- a/erna/scripts/process_fact_mc.py +++ b/erna/scripts/process_fact_mc.py @@ -47,10 +47,10 @@ def make_jobs(jar, xml, data_paths, drs_paths, file_name, _ = path.splitext(path.basename(output_path)) file_name = create_filename_from_format(filename_format, file_name, num) out_path = path.dirname(output_path) - run = [jar, xml, df, path.join(out_path, file_name)] + run = [jar, xml, df, path.join(out_path, file_name), None, '{}mb'.format(0.9*vmem)] stream_runner = stream_runner_local else: - run = [jar, xml, df] + run = [jar, xml, df, None, '{mb}'.format(0.9*vmem)] stream_runner = stream_runner_std jobs.append( diff --git a/erna/scripts/process_fact_run_list.py b/erna/scripts/process_fact_run_list.py index c83c80e..340382c 100644 --- a/erna/scripts/process_fact_run_list.py +++ b/erna/scripts/process_fact_run_list.py @@ -21,7 +21,7 @@ def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, for num, indices in enumerate(split_indices): df = df_mapping[indices.min(): indices.max()] - job = Job(stream_runner.run, [jar, xml, df, aux_source_path], + job = Job(stream_runner.run, [jar, xml, df, aux_source_path, '{}mb'.format(0.9*vmem)], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) jobs.append(job) diff --git a/erna/stream_runner.py b/erna/stream_runner.py index 0a6c8c0..1e94015 100644 --- a/erna/stream_runner.py +++ b/erna/stream_runner.py @@ -11,7 +11,9 @@ ) -def run(jar, xml, input_files_df, aux_source_path=None): +def run(jar, xml, input_files_df, aux_source_path=None, + max_heap_size='2014m' + ): ''' This is what will be executed on the cluster ''' @@ -23,7 +25,14 @@ def run(jar, xml, input_files_df, aux_source_path=None): output_path = os.path.join(output_directory, "output.json") input_files_df.to_json(input_path, orient='records', date_format='epoch') - call = assemble_facttools_call(jar, xml, input_path, output_path, aux_source_path) + call = assemble_facttools_call( + jar, + xml, + input_path, + output_path, + aux_source_path, + max_heap_size, + ) check_environment_on_node() diff --git a/erna/stream_runner_local_output.py b/erna/stream_runner_local_output.py index 747d842..d04ba6b 100644 --- a/erna/stream_runner_local_output.py +++ b/erna/stream_runner_local_output.py @@ -12,7 +12,9 @@ ) -def run(jar, xml, input_files_df, output_path, aux_source_path=None): +def run(jar, xml, input_files_df, output_path, aux_source_path=None + max_heap_size='2014m' + ): ''' This is a version of ernas stream runner that will be executed on the cluster, but writes its results directly to disk without sending them @@ -26,7 +28,14 @@ def run(jar, xml, input_files_df, output_path, aux_source_path=None): tmp_output_path = os.path.join(output_directory, "output.json") input_files_df.to_json(input_path, orient='records', date_format='epoch') - call = assemble_facttools_call(jar, xml, input_path, tmp_output_path, aux_source_path) + call = assemble_facttools_call( + jar, + xml, + input_path, + tmp_output_path, + aux_source_path, + max_heap_size, + ) check_environment_on_node() From a9a0c25036354fc55aee594e458bc959a37cc762 Mon Sep 17 00:00:00 2001 From: Jens Buss Date: Wed, 23 May 2018 13:00:55 +0200 Subject: [PATCH 2/4] let JVM memory sizes be defined in function call --- erna/utils.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/erna/utils.py b/erna/utils.py index 9a4cad8..49d8787 100644 --- a/erna/utils.py +++ b/erna/utils.py @@ -65,17 +65,23 @@ def date_to_night_int(night): return 10000 * night.year + 100 * night.month + night.day -def assemble_facttools_call(jar, xml, input_path, output_path, aux_source_path=None): +def assemble_facttools_call(jar, xml, input_path, output_path, + aux_source_path=None, + max_heap_size='2014m', + initial_heap_size='512m', + compressed_class_space_size='64m', + max_meta_size='128m' + ): ''' Assemble the call for fact-tools with the given combinations of jar, xml, input_path and output_path. The db_path is optional for the case where a db_file is needed ''' call = [ 'java', - '-XX:MaxHeapSize=1024m', - '-XX:InitialHeapSize=512m', - '-XX:CompressedClassSpaceSize=64m', - '-XX:MaxMetaspaceSize=128m', + '-XX:MaxHeapSize={}'.format(max_heap_size), + '-XX:InitialHeapSize={}'.format(initial_heap_size), + '-XX:CompressedClassSpaceSize={}'.format(compressed_class_space_size), + '-XX:MaxMetaspaceSize={}'.format(max_meta_size), '-XX:+UseConcMarkSweepGC', '-XX:+UseParNewGC', '-jar', From 8a5060cf599372c469fe81c6f80473f39acd4ea6 Mon Sep 17 00:00:00 2001 From: Jens Buss Date: Wed, 23 May 2018 17:39:23 +0200 Subject: [PATCH 3/4] fix typos --- erna/scripts/process_fact_mc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/erna/scripts/process_fact_mc.py b/erna/scripts/process_fact_mc.py index 70669a9..67232c5 100644 --- a/erna/scripts/process_fact_mc.py +++ b/erna/scripts/process_fact_mc.py @@ -36,7 +36,7 @@ def make_jobs(jar, xml, data_paths, drs_paths, data_partitions = np.array_split(data_paths, num_jobs) drs_partitions = np.array_split(drs_paths, num_jobs) if output_path: - logger.info("Using stream runner für local output") + logger.info("Using stream runner for local output") else: logger.debug("Using std stream runner gathering output from all nodes") @@ -50,7 +50,7 @@ def make_jobs(jar, xml, data_paths, drs_paths, run = [jar, xml, df, path.join(out_path, file_name), None, '{}mb'.format(0.9*vmem)] stream_runner = stream_runner_local else: - run = [jar, xml, df, None, '{mb}'.format(0.9*vmem)] + run = [jar, xml, df, None, '{}mb'.format(0.9*vmem)] stream_runner = stream_runner_std jobs.append( From 497e7f4c5e26ed4f6e806723f59d6f0bd19f8c07 Mon Sep 17 00:00:00 2001 From: Jens Buss Date: Wed, 27 Jun 2018 11:10:44 +0200 Subject: [PATCH 4/4] add range --- erna/scripts/process_fact_run_list.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erna/scripts/process_fact_run_list.py b/erna/scripts/process_fact_run_list.py index 340382c..18ed4d3 100644 --- a/erna/scripts/process_fact_run_list.py +++ b/erna/scripts/process_fact_run_list.py @@ -41,7 +41,7 @@ def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, @click.option('--walltime', help='Estimated maximum walltime of your job in format hh:mm:ss.', default='02:00:00') @click.option('--engine', help='Name of the grid engine used by the cluster.', type=click.Choice(['PBS', 'SGE',]), default='SGE') @click.option('--num_jobs', help='Number of jobs to start on the cluster.', default='4', type=click.INT) -@click.option('--vmem', help='Amount of memory to use per node in MB.', default='400', type=click.INT) +@click.option('--vmem', help='Amount of memory to use per node in MB.', default='400', type=click.IntRange(2000, 1000000)) @click.option("--log_level", type=click.Choice(['INFO', 'DEBUG', 'WARN']), help='increase output verbosity', default='INFO') @click.option('--port', help='The port through which to communicate with the JobMonitor', default=12856, type=int) @click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .')