diff --git a/.gitignore b/.gitignore index 0712f6d7b..73655cf7e 100644 --- a/.gitignore +++ b/.gitignore @@ -188,3 +188,14 @@ cache # IntelliJ IDEA files .idea *.iml + +# VS-Code files +.vscode/ + +# Local config +config/config.json +config/redis + +# Benchmark Results +results/ +experiments.json \ No newline at end of file diff --git a/benchmarks-data b/benchmarks-data index 7c7f67be6..0c2cf44f2 160000 --- a/benchmarks-data +++ b/benchmarks-data @@ -1 +1 @@ -Subproject commit 7c7f67be6d6efd94a5de10607136ce237a673ef7 +Subproject commit 0c2cf44f27d214bb38a5bf69357f9dc3d18c4bb3 diff --git a/benchmarks/600.workflows/610.gen/config.json b/benchmarks/600.workflows/610.gen/config.json index 8eae08240..8ff6eec59 100644 --- a/benchmarks/600.workflows/610.gen/config.json +++ b/benchmarks/600.workflows/610.gen/config.json @@ -1,5 +1,6 @@ { "timeout": 120, "memory": 128, - "languages": ["python"] + "languages": ["python"], + "modules": [] } diff --git a/benchmarks/600.workflows/610.gen/definition.json b/benchmarks/600.workflows/610.gen/definition.json index fcdf203bc..4ccb52abe 100644 --- a/benchmarks/600.workflows/610.gen/definition.json +++ b/benchmarks/600.workflows/610.gen/definition.json @@ -37,10 +37,10 @@ "map_astros": { "type": "map", "array": "astros.people", - "root": "map_astros", + "root": "map", "next": "process_astros", "states": { - "map_astros": { + "map": { "type": "task", "func_name": "map_astros" } diff --git a/benchmarks/600.workflows/610.gen/input.py b/benchmarks/600.workflows/610.gen/input.py index 68f82e81f..f4073b711 100644 --- a/benchmarks/600.workflows/610.gen/input.py +++ b/benchmarks/600.workflows/610.gen/input.py @@ -1,5 +1,5 @@ def buckets_count(): return (0, 0) -def generate_input(data_dir, size, input_buckets, output_buckets, upload_func): +def generate_input(data_dir, size, benchmarks_bucket,input_buckets, output_buckets, upload_func, nosql_func): return dict() \ No newline at end of file diff --git a/benchmarks/600.workflows/610.gen/python/map_astros.py b/benchmarks/600.workflows/610.gen/python/map_astros.py index b98b5e9d7..54e530ff2 100644 --- a/benchmarks/600.workflows/610.gen/python/map_astros.py +++ b/benchmarks/600.workflows/610.gen/python/map_astros.py @@ -1,7 +1,6 @@ def handler(elem): - name = elem["name"] - fn, ln = name.split(" ") - name = " ".join([ln, fn]) - elem["name_rev"] = name - + full_name:str = elem["name"] + names = full_name.split(" ") + names.reverse() + elem["name_rev"] = " ".join(names) return elem \ No newline at end of file diff --git a/benchmarks/600.workflows/610.gen/python/requirements.txt b/benchmarks/600.workflows/610.gen/python/requirements.txt new file mode 100644 index 000000000..663bd1f6a --- /dev/null +++ b/benchmarks/600.workflows/610.gen/python/requirements.txt @@ -0,0 +1 @@ +requests \ No newline at end of file diff --git a/benchmarks/600.workflows/6300.sda-workflow/.gitignore b/benchmarks/600.workflows/6300.sda-workflow/.gitignore new file mode 100644 index 000000000..98a3081b2 --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/.gitignore @@ -0,0 +1,2 @@ +cfg/ +dev/ \ No newline at end of file diff --git a/benchmarks/600.workflows/6300.sda-workflow/config.json b/benchmarks/600.workflows/6300.sda-workflow/config.json new file mode 100644 index 000000000..f7e9577d9 --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/config.json @@ -0,0 +1,7 @@ +{ + "timeout": 500, + "memory": 128, + "languages": ["python"], + "modules": ["storage"], + "container-image": "logru/sda-no-db:latest" +} diff --git a/benchmarks/600.workflows/6300.sda-workflow/definition.json b/benchmarks/600.workflows/6300.sda-workflow/definition.json new file mode 100644 index 000000000..76915b068 --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/definition.json @@ -0,0 +1,70 @@ +{ + "root": "clearDB", + "states": { + "clearDB":{ + "type": "task", + "func_name": "clearDB", + "next": "split" + }, + "split": { + "type": "task", + "func_name": "split", + "next": "filter-map" + }, + "filter-map":{ + "type": "map", + "root": "filter", + "array": "filter_workloads", + "next": "prepare-neighbors", + "states": { + "filter": { + "type": "task", + "func_name": "filter" + } + } + }, + "prepare-neighbors": { + "type": "task", + "func_name": "pre_neighbors", + "next": "neighbors-map" + }, + "neighbors-map":{ + "type": "map", + "root": "neighbors", + "array": "neighbors_workloads", + "next": "components", + "states": { + "neighbors": { + "type": "task", + "func_name": "neighbors" + } + } + }, + "components": { + "type": "task", + "func_name": "components", + "next": "cluster-analyze-map" + }, + "cluster-analyze-map":{ + "type": "map", + "root": "clustering", + "array": "cluster_workloads", + "next": "merge-results", + "states": { + "clustering": { + "type": "task", + "func_name": "clustering", + "next": "analysis" + }, + "analysis": { + "type": "task", + "func_name": "analysis" + } + } + }, + "merge-results": { + "type": "task", + "func_name": "merge" + } + } +} diff --git a/benchmarks/600.workflows/6300.sda-workflow/input.py b/benchmarks/600.workflows/6300.sda-workflow/input.py new file mode 100644 index 000000000..9057a34dd --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/input.py @@ -0,0 +1,135 @@ +import json +import os + +class MemgraphConfig: + def __init__(self, host:str, port:int, username:str, password:str): + self.host = host + self.port = port + self.username = username + self.password = password + if self.host == "localhost": + raise ValueError("Memgraph database for SDA workflow has to be reachable from the internet. Set the following environment variables via the command line or the SeBS .env file: \n\tMEMGRAPH_HOST\n\tMEMGRAPH_PORT\n\tMEMGRAPH_USER\n\tMEMGRAPH_PASSWORD") + + @staticmethod + def from_env(): + with open(".env", "r") as f: + for line in f: + key, value = line.strip().split("=", 1) + os.environ[key] = value + return MemgraphConfig( + host=os.getenv("MEMGRAPH_HOST", "localhost"), + port=int(os.getenv("MEMGRAPH_PORT", 7687)), + username=os.getenv("MEMGRAPH_USER", ""), + password=os.getenv("MEMGRAPH_PASSWORD", "") + ) + +class SDAConfig: + def __init__(self, memgraph_config: MemgraphConfig, splits:int=0, required_area:float=0.0, + max_edge_distance:float=0.0, max_neighbours: int = 5, clustering_distance: float = 500.0,merge_workers:int=2, visualize_edges:bool= True): + self.memgraph_config = memgraph_config + self.splits = splits + self.required_area = required_area + self.max_edge_distance = max_edge_distance + self.max_neighbours = max_neighbours + self.clustering_distance = clustering_distance + self.merge_workers = merge_workers + self.visualize_edges = visualize_edges + + def get(self): + return { + "binary-filters": [ + { + "name": "InsidePolygonFilter" + } + ], + "centrality-measures": [ + { + "name": "DegreeCentrality" + }, + { + "name": "MeanLocalSignificance" + }, + { + "name": "SmallerNeighboursRatio" + } + ], + "contraction-predicates": [ + { + "distance": self.clustering_distance, + "name": "DistanceBiPredicate" + } + ], + "maxDistanceMeters": self.max_edge_distance, + "maxNeighbours": self.max_neighbours, + "memgraph-host": self.memgraph_config.host, + "memgraph-port": self.memgraph_config.port, + "memgraph-user": self.memgraph_config.username, + "memgraph-password": self.memgraph_config.password, + "merge-workers": self.merge_workers, + "neighbouring-predicates": [], + "splits": self.splits, + "unary-filters": [ + { + "name": "ApproxAreaFilter", + "requiredArea": self.required_area + } + ], + "visualize-edges": self.visualize_edges + } + + @staticmethod + def from_benchmark_size(size:str): + memgraph_config:MemgraphConfig = MemgraphConfig.from_env() + configs = { + "test": SDAConfig(memgraph_config, splits=0, required_area=5000.0, max_edge_distance=3000.0, max_neighbours=5, clustering_distance=500.0, merge_workers=1, visualize_edges=False), + "small": SDAConfig(memgraph_config, splits=1, required_area=500.0, max_edge_distance=1000.0, max_neighbours=5, clustering_distance=500.0, merge_workers=2, visualize_edges=True), + "large": SDAConfig(memgraph_config, splits=2, required_area=500.0, max_edge_distance=500.0, max_neighbours=5, clustering_distance=200.0, merge_workers=2, visualize_edges=True), + } + return configs[size] + +def get_config_file_name(size): + return f"sda-config-{size}.json" + +def create_config_file(size): + config = SDAConfig.from_benchmark_size(size) + cfg_dir = os.path.join(os.path.dirname(__file__), "cfg") + os.makedirs(cfg_dir, exist_ok=True) + config_file_path = os.path.join(cfg_dir, get_config_file_name(size)) + with open(config_file_path, "w") as f: + json.dump(config.get(), f, indent=4) + return config_file_path + +def get_input_file(size): + input_files = { + "test" : "Corvara_IT.tiff", + "small": "Corvara_IT.tiff", + "large": "Wuerzburg_DE.tiff", + } + return input_files[size] + +def buckets_count(): + return (1, 5) + +def upload_all_data(upload_func,data_dir): + sizes=["test", "small", "large"] + for size in sizes: + input_file = get_input_file(size) + config_path = create_config_file(size) + upload_func(0, input_file, os.path.join(data_dir, input_file)) + upload_func(0, get_config_file_name(size), config_path) + + + +def generate_input(data_dir, size, benchmarks_bucket,input_buckets, output_buckets, upload_func, nosql_func): + upload_all_data(upload_func,data_dir) + return { + "config_file": get_config_file_name(size), + "input_file": get_input_file(size), + "input_bucket": input_buckets[0], + "split_output_bucket": output_buckets[0], + "filter_output_bucket": output_buckets[1], + "cluster_output_bucket": output_buckets[2], + "analysis_output_bucket": output_buckets[3], + "final_output_bucket": output_buckets[4], + "benchmark_bucket": benchmarks_bucket + } \ No newline at end of file diff --git a/benchmarks/600.workflows/6300.sda-workflow/python/SDAHelper.py b/benchmarks/600.workflows/6300.sda-workflow/python/SDAHelper.py new file mode 100644 index 000000000..ab7a71e59 --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/python/SDAHelper.py @@ -0,0 +1,40 @@ +import os +import uuid +from pathlib import Path +from . import storage + +storage_client = storage.storage.get_instance() + +SHP_SUFFIX = [".shp", ".shx", ".dbf", ".prj"] + +def download_file(benchmark_bucket, path_in_bucket, dest_dir): + path = Path(dest_dir) / Path(path_in_bucket).name + storage_client.download(benchmark_bucket, path_in_bucket, path) + return path + +def download_file_bucket(benchmark_bucket, bucket, basename, dest_dir): + return download_file(benchmark_bucket, bucket + '/' + basename, dest_dir) + +def download_shp_file(benchmark_bucket, bucket ,shp_file, dest_dir): + files = [Path(shp_file).with_suffix(suffix) for suffix in filter(lambda x: x is not ".shp",SHP_SUFFIX)] + for f in files: + download_file_bucket(benchmark_bucket, bucket, f.name, dest_dir) + return download_file_bucket(benchmark_bucket, bucket, shp_file, dest_dir) + +def load_config(event,directory): + return download_file_bucket(event["benchmark_bucket"], event["input_bucket"], event["config_file"], directory) + +def upload_shp_file(benchmark_bucket, bucket, shp_basename): + shp_dir = Path(shp_basename).parent + for f in shp_dir.iterdir(): + if Path(shp_basename).stem == Path(f).stem and any(f.name.endswith(suffix) for suffix in SHP_SUFFIX): + full_path = shp_dir / f.name + storage_client.upload(benchmark_bucket, bucket + '/' + f.name, full_path,False) + +def download_directory(benchmark_bucket, bucket, dest_dir): + storage_client.download_directory(benchmark_bucket, bucket, dest_dir) + +def create_tmp_dir(): + tmp_dir = os.path.join("/tmp",str(uuid.uuid4())) + os.makedirs(tmp_dir, exist_ok=True) + return tmp_dir \ No newline at end of file diff --git a/benchmarks/600.workflows/6300.sda-workflow/python/analysis.py b/benchmarks/600.workflows/6300.sda-workflow/python/analysis.py new file mode 100644 index 000000000..11128472b --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/python/analysis.py @@ -0,0 +1,22 @@ +import subprocess +from .SDAHelper import * + +def handler(event): + benchmark_bucket = event["benchmark_bucket"] + cluster_output_bucket = event["cluster_output_bucket"] + TMP_DIR = create_tmp_dir() + analysis_input_file = download_shp_file(benchmark_bucket,cluster_output_bucket,event["cluster_output_file"],TMP_DIR) + config_file = load_config(event, TMP_DIR) + OUTPUT_STEM = "Analysis_"+Path(analysis_input_file).stem + command = ["SettlementDelineationAnalysis", "-i", str(analysis_input_file), "-c", str(config_file), "--outputStem", OUTPUT_STEM] + result = subprocess.run(command,capture_output=True,text=True, cwd=TMP_DIR) + if result.returncode != 0: + event["stdout"] = result.stdout + event["stderr"] = result.stderr + return event + event.pop("cluster_output_file", None) + event["analysis_output_files"]= [] + for file in Path(TMP_DIR).glob(f"{OUTPUT_STEM}*.shp"): + upload_shp_file(benchmark_bucket, event["analysis_output_bucket"],file) + event["analysis_output_files"].append(file.name) + return event diff --git a/benchmarks/600.workflows/6300.sda-workflow/python/clearDB.py b/benchmarks/600.workflows/6300.sda-workflow/python/clearDB.py new file mode 100644 index 000000000..e0b57702a --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/python/clearDB.py @@ -0,0 +1,11 @@ +from .SDAHelper import * +import subprocess + +def handler(event): + TMP_DIR = create_tmp_dir() + config = load_config(event, TMP_DIR) + result = subprocess.run([f"AfricapolisClearDatabase", "-c", str(config)],capture_output=True,text=True) + if result.returncode != 0: + event["stdout"] = result.stdout + event["stderr"] = result.stderr + return event \ No newline at end of file diff --git a/benchmarks/600.workflows/6300.sda-workflow/python/clustering.py b/benchmarks/600.workflows/6300.sda-workflow/python/clustering.py new file mode 100644 index 000000000..c7907ce9a --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/python/clustering.py @@ -0,0 +1,28 @@ +import subprocess +from .SDAHelper import * + +def handler(event): + + benchmark_bucket = event["benchmark_bucket"] + filter_output_bucket = event["filter_output_bucket"] + TMP_DIR = create_tmp_dir() + input_files = [download_shp_file(benchmark_bucket, filter_output_bucket ,shp_file, TMP_DIR ) for shp_file in event["cluster_input_files"]] + config = load_config(event, TMP_DIR) + components = event["cluster_components"] + OUTPUT_STEM = "Cluster"+str(components[0]) + # Store workflow data in /tmp due to read only filesystem restriction + command = [f"SettlementDelineationContraction", "-i"] + command.extend([str(file) for file in input_files]) + command.extend(["-c", str(config),"--outputStem",OUTPUT_STEM,"--components"]) + command.extend([str(comp) for comp in components]) + result = subprocess.run(command,capture_output=True,text=True, cwd=TMP_DIR) + if result.returncode != 0: + event["stdout"] = result.stdout + event["stderr"] = result.stderr + return event + output_file = Path(TMP_DIR).glob(f"{OUTPUT_STEM}*.shp").__next__() + upload_shp_file(benchmark_bucket, event["cluster_output_bucket"],output_file) + event.pop("cluster_input_files", None) + event.pop("cluster_components", None) + event["cluster_output_file"] = output_file.name + return {"payload":event,"request_id":event.get("request-id","0")} \ No newline at end of file diff --git a/benchmarks/600.workflows/6300.sda-workflow/python/components.py b/benchmarks/600.workflows/6300.sda-workflow/python/components.py new file mode 100644 index 000000000..48f87e3f2 --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/python/components.py @@ -0,0 +1,45 @@ +import subprocess +from .SDAHelper import * +import json + +def build_cluster_workload(components_files): + workloads = [] + for comp_file in components_files: + with open(comp_file, 'r') as f: + components_data = json.load(f) + workload = { + "cluster_input_files": [Path(f).name for f in components_data["files"]], + "cluster_components": components_data["components"] + } + workloads.append(workload) + return workloads + +def handler(event): + TMP_DIR = create_tmp_dir() + config_file = load_config(event, TMP_DIR) + COMPONENT_FILE_PREFIX = "components" + result = subprocess.run( + ["AfricapolisGraphComponents", "-c", str(config_file), "-o", COMPONENT_FILE_PREFIX], + cwd=TMP_DIR, + capture_output=True, + text=True + ) + if result.returncode != 0: + event["stdout"] = result.stdout + event["stderr"] = result.stderr + return event + event.pop("stdout", None) + event.pop("stderr", None) + event.pop("neighbors_workloads", None) + components_files = sorted(str(p) for p in Path(TMP_DIR).glob(f"{COMPONENT_FILE_PREFIX}*.json")) + workloads = build_cluster_workload(components_files) + return { + "cluster_workloads":[ + { + "cluster_input_files": workload["cluster_input_files"], + "cluster_components": workload["cluster_components"], + **event + } for workload in workloads + ], + **event + } \ No newline at end of file diff --git a/benchmarks/600.workflows/6300.sda-workflow/python/filter.py b/benchmarks/600.workflows/6300.sda-workflow/python/filter.py new file mode 100644 index 000000000..0f547d443 --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/python/filter.py @@ -0,0 +1,18 @@ +import subprocess +from .SDAHelper import * + +def handler(event): + benchmark_bucket = event["benchmark_bucket"] + input = event["filter_input_file"] + TMP_DIR = create_tmp_dir() + input_path = download_shp_file(benchmark_bucket, event["split_output_bucket"], input, TMP_DIR) + config_path = load_config(event, TMP_DIR) + # Store workflow data in /tmp due to read only filesystem restriction + result = subprocess.run([f"SettlementDelineationFilter", "-i", str(input_path), "-c", str(config_path), "-o", TMP_DIR],capture_output=True,text=True) + if result.returncode != 0: + event["stdout"] = result.stdout + event["stderr"] = result.stderr + return event + output_file = Path(TMP_DIR).glob("*_filtered.shp").__next__() + upload_shp_file(benchmark_bucket, event["filter_output_bucket"], output_file) + return {"filtered_shp_file":Path(output_file).name,**event} \ No newline at end of file diff --git a/benchmarks/600.workflows/6300.sda-workflow/python/merge.py b/benchmarks/600.workflows/6300.sda-workflow/python/merge.py new file mode 100644 index 000000000..eb9583a1c --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/python/merge.py @@ -0,0 +1,38 @@ +import subprocess +from .SDAHelper import * + +def run_merge(input_files, output_file, directory,event): + output_location = Path(directory)/output_file + command = [f"SettlementDelineationMerge", "-i"] + command.extend([str(file) for file in input_files]) + command.extend(["--output",str(output_location)]) + res = subprocess.run(command,capture_output=True,text=True, cwd=directory) + upload_shp_file(event["benchmark_bucket"], event["final_output_bucket"],output_location) + return res + +def load_input(event,directory): + benchmark_bucket = event["benchmark_bucket"] + analysis_output_bucket = event["analysis_output_bucket"] + merge_input_files = [] + merge_edges_input_files = [] + for workload in event["cluster_workloads"]: + for file in workload["analysis_output_files"]: + shp_file = download_shp_file(benchmark_bucket,analysis_output_bucket,file,directory) + if "_edges" in shp_file.stem: + merge_edges_input_files.append(shp_file) + else: + merge_input_files.append(shp_file) + return merge_input_files, merge_edges_input_files + +def handler(event): + TMP_DIR = create_tmp_dir() + merge_input_files, merge_edges_input_files = load_input(event, TMP_DIR) + OUTPUT_FILE_STEM = Path(event["input_file"]).stem + "_SDA" + OUTPUT_FILE_NAME = OUTPUT_FILE_STEM + ".shp" + EDGE_OUTPUT_FILE_NAME = OUTPUT_FILE_STEM + "_edges.shp" + merge_output_result = run_merge(merge_input_files, OUTPUT_FILE_NAME, TMP_DIR,event) + if merge_output_result.returncode != 0: + return {"stdout": merge_output_result.stdout, "stderr": merge_output_result.stderr,"command": merge_output_result.args} + if len(merge_edges_input_files) > 0: + merge__edge_output_result = run_merge(merge_edges_input_files, EDGE_OUTPUT_FILE_NAME, TMP_DIR,event) + return {"OutputFile":OUTPUT_FILE_NAME,"EdgeOutputFile":EDGE_OUTPUT_FILE_NAME} \ No newline at end of file diff --git a/benchmarks/600.workflows/6300.sda-workflow/python/neighbors.py b/benchmarks/600.workflows/6300.sda-workflow/python/neighbors.py new file mode 100644 index 000000000..96a375560 --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/python/neighbors.py @@ -0,0 +1,22 @@ +import subprocess +from .SDAHelper import * + +def handler(event): + benchmark_bucket = event["benchmark_bucket"] + filter_output_bucket = event["filter_output_bucket"] + TMP_DIR = create_tmp_dir() + input_path = download_shp_file(benchmark_bucket, filter_output_bucket ,event["filtered_shp_file"], TMP_DIR) + adjacent_input_paths = [download_shp_file(benchmark_bucket, filter_output_bucket ,f, TMP_DIR) for f in event.get("adjacent_files",[])] + config_path = load_config(event, TMP_DIR) + # Store workflow data in /tmp due to read only filesystem restriction + command = ["SettlementDelineationNeighbours", "-i", str(input_path), "-c", str(config_path)] + if len(adjacent_input_paths) > 0: + command.append("-a") + command.extend([str(p) for p in adjacent_input_paths]) + result = subprocess.run(command,capture_output=True,text=True) + if result.returncode != 0: + event["stdout"] = result.stdout + event["stderr"] = result.stderr + event["command"] = " ".join(command) + return event + return {} \ No newline at end of file diff --git a/benchmarks/600.workflows/6300.sda-workflow/python/pre_neighbors.py b/benchmarks/600.workflows/6300.sda-workflow/python/pre_neighbors.py new file mode 100644 index 000000000..4c74a20d3 --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/python/pre_neighbors.py @@ -0,0 +1,54 @@ +import re + +class Coordinate: + def __init__(self,x:int,y:int): + self.x = x + self.y = y + + def chebyshev_distance(self, other) -> int: + return max(abs(self.x - other.x), abs(self.y - other.y)) + + @staticmethod + def from_filename(filename: str): + match = re.search(r'.+_(\d+)_(\d+)_filtered.shp', filename) + if match: + filename = match.group(1) + coord = Coordinate(int(match.group(1)), int(match.group(2))) + return coord + else: + raise ValueError("Filename does not match expected pattern") + +class SpatialFile: + def __init__(self, filename: str): + self.filename = filename + self.coordinate = Coordinate.from_filename(filename) + self.neighbors = [] + + def add_neighbor(self, neighbor_filename: str): + self.neighbors.append(neighbor_filename) + + def __eq__(self, other) -> bool: + if not isinstance(other, SpatialFile): + return False + return self.filename == other.filename + + def is_adjacent(self, other) -> bool: + return self.coordinate.chebyshev_distance(other.coordinate) <= 1 + +def handler(event): + files = [SpatialFile(file_workload["filtered_shp_file"]) for file_workload in event["filter_workloads"]] + for file in files: + for other_file in files: + if file != other_file and file.is_adjacent(other_file): + file.add_neighbor(other_file.filename) + event.pop("filter_workloads", None) + return { + "neighbors_workloads":[ + { + "filtered_shp_file": file.filename, + "adjacent_files": file.neighbors, + **event + } for file in files + ], + **event + } \ No newline at end of file diff --git a/benchmarks/600.workflows/6300.sda-workflow/python/split.py b/benchmarks/600.workflows/6300.sda-workflow/python/split.py new file mode 100644 index 000000000..dbb85bd57 --- /dev/null +++ b/benchmarks/600.workflows/6300.sda-workflow/python/split.py @@ -0,0 +1,35 @@ +import subprocess +import json +from .SDAHelper import * + +def get_splits(config_path:Path)->int: + with open(config_path,"r") as f: + config = json.load(f) + return int(config.get("splits",0)) + +def handler(event): + benchmark_bucket = event["benchmark_bucket"] + INPUT_DIR = create_tmp_dir() + OUTPUT_DIR = create_tmp_dir() + input_path = download_file_bucket(benchmark_bucket, event["input_bucket"], event["input_file"], INPUT_DIR) + splits = get_splits(load_config(event, INPUT_DIR)) + command = ["FishnetShapefileSplitter","-i", str(input_path), "-o", str(OUTPUT_DIR),"-s", str(splits)] + result = subprocess.run(command,capture_output=True,text=True,cwd=INPUT_DIR) + if result.returncode != 0: + event["stdout"] = result.stdout + event["stderr"] = result.stderr + event["command"] = " ".join(command) + return event + split_output_files = [] + for file in Path(OUTPUT_DIR).glob("*.shp"): + upload_shp_file(benchmark_bucket, event["split_output_bucket"], file) + split_output_files.append(Path(file).name) + return { + "filter_workloads":[ + { + "filter_input_file": split_file, + **event + }for split_file in split_output_files + ], + **event + } \ No newline at end of file diff --git a/config/systems.json b/config/systems.json index 9acc1dd2d..aacf58f11 100644 --- a/config/systems.json +++ b/config/systems.json @@ -100,7 +100,11 @@ "packages": [ "redis" ], - "module_packages": {} + "module_packages": { + "storage": [ + "boto3" + ] + } } }, "nodejs": { diff --git a/dockerfiles/aws/python/Dockerfile.base b/dockerfiles/aws/python/Dockerfile.base new file mode 100644 index 000000000..5f6289594 --- /dev/null +++ b/dockerfiles/aws/python/Dockerfile.base @@ -0,0 +1,23 @@ +# Define custom base image +ARG BASE_IMAGE="python:3.12" + +FROM ${BASE_IMAGE} AS build-image +RUN apt-get update && apt-get install -y python3-pip + +# Include global arg in this stage of the build +ARG LAMBDA_DIR="/lambda" + +# Copy function code +RUN mkdir -p ${LAMBDA_DIR} +RUN mkdir -p ${LAMBDA_DIR}/function +# Install the function's dependencies +RUN pip install \ + --target ${LAMBDA_DIR} \ + awslambdaric + +COPY . ${LAMBDA_DIR}/function +WORKDIR ${LAMBDA_DIR} +# Set runtime interface client as default command for the container runtime +ENTRYPOINT [ "/usr/bin/python", "-m", "awslambdaric" ] +# Pass the name of the function handler as an argument to the runtime +CMD [ "function/handler.handler" ] \ No newline at end of file diff --git a/sebs/aws/aws.py b/sebs/aws/aws.py index 18706d20e..e1dda2cbd 100644 --- a/sebs/aws/aws.py +++ b/sebs/aws/aws.py @@ -165,9 +165,7 @@ def package_code( "{{REDIS_PASSWORD}}", f'"{self.config.resources.redis_password}"', ) - - # if the containerized deployment is set to True - if code_package.container_deployment: + if code_package.container_deployment and code_package.benchmark_config.container_image is None: # build base image and upload to ECR _, container_uri = self.ecr_client.build_base_image( directory, @@ -177,6 +175,15 @@ def package_code( code_package.benchmark, is_cached, ) + if code_package.benchmark_config.container_image is not None: + _, container_uri =self.ecr_client.build_custom_image( + code_package.benchmark_config.container_image, + directory, + code_package.language_name, + code_package.language_version, + code_package.architecture, + code_package.benchmark, + ) package_config = CONFIG_FILES[code_package.language_name] function_dir = os.path.join(directory, "function") @@ -453,7 +460,7 @@ def create_workflow(self, code_package: Benchmark, workflow_name: str) -> "SFNWo code_files = list(code_package.get_code_files(include_config=False)) func_names = [os.path.splitext(os.path.basename(p))[0] for p in code_files] funcs = [ - self.create_function(code_package, workflow_name + "___" + fn) for fn in func_names + self.create_function(code_package, workflow_name + "___" + fn, code_package.container_deployment,code_package.container_uri if code_package.container_deployment else None) for fn in func_names ] # Generate workflow definition.json @@ -527,7 +534,7 @@ def update_workflow(self, workflow: Workflow, code_package: Benchmark): code_files = list(code_package.get_code_files(include_config=False)) func_names = [os.path.splitext(os.path.basename(p))[0] for p in code_files] funcs = [ - self.create_function(code_package, workflow.name + "___" + fn) for fn in func_names + self.create_function(code_package, workflow.name + "___" + fn,code_package.container_deployment,code_package.container_uri if code_package.container_deployment else None) for fn in func_names ] # Generate workflow definition.json diff --git a/sebs/aws/container.py b/sebs/aws/container.py index 6a2f20e0c..4bf8ad91d 100644 --- a/sebs/aws/container.py +++ b/sebs/aws/container.py @@ -38,17 +38,19 @@ def client(self) -> ECRClient: def registry_name( self, benchmark: str, language_name: str, language_version: str, architecture: str ) -> Tuple[str, str, str, str]: + image_tag = self.system_config.benchmark_image_tag( + self.name(), benchmark, language_name, language_version, architecture + ) + return self.registry(image_tag) + def registry( + self, image_tag: str + ) -> Tuple[str, str, str, str]: account_id = self.config.credentials.account_id region = self.config.region registry_name = f"{account_id}.dkr.ecr.{region}.amazonaws.com" - repository_name = self.config.resources.get_ecr_repository(self.client) - image_tag = self.system_config.benchmark_image_tag( - self.name(), benchmark, language_name, language_version, architecture - ) image_uri = f"{registry_name}/{repository_name}:{image_tag}" - return registry_name, repository_name, image_tag, image_uri def find_image(self, repository_name, image_tag) -> bool: @@ -80,4 +82,4 @@ def push_image(self, repository_uri, image_tag): except docker.errors.APIError as e: self.logging.error(f"Failed to push the image to registry {repository_uri}.") self.logging.error(f"Error: {str(e)}") - raise RuntimeError("Couldn't push to Docker registry") + raise RuntimeError("Couldn't push to Docker registry") \ No newline at end of file diff --git a/sebs/aws/generator.py b/sebs/aws/generator.py index 602bc3998..6b941deb7 100644 --- a/sebs/aws/generator.py +++ b/sebs/aws/generator.py @@ -117,7 +117,7 @@ def _encode_case(self, case: Switch.Case) -> dict: } cond = type + comp[case.op] - return {"Variable": "$.payload" + case.var, cond: case.val, "Next": case.next} + return {"Variable": "$.payload." + case.var, cond: case.val, "Next": case.next} def encode_map(self, state: Map) -> Union[dict, List[dict]]: states = {n: State.deserialize(n, s) for n, s in state.funcs.items()} diff --git a/sebs/benchmark.py b/sebs/benchmark.py index dbcae6b43..2bc1bcad8 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -29,11 +29,13 @@ def __init__( memory: int, languages: List["Language"], modules: List[BenchmarkModule], + container_image: Optional[str] = None ): self._timeout = timeout self._memory = memory self._languages = languages self._modules = modules + self._container_image = container_image @property def timeout(self) -> int: @@ -59,6 +61,10 @@ def languages(self) -> List["Language"]: def modules(self) -> List[BenchmarkModule]: return self._modules + @property + def container_image(self) -> Optional[str]: + return self._container_image + # FIXME: 3.7+ python with future annotations @staticmethod def deserialize(json_object: dict) -> "BenchmarkConfig": @@ -69,6 +75,7 @@ def deserialize(json_object: dict) -> "BenchmarkConfig": json_object["memory"], [Language.deserialize(x) for x in json_object["languages"]], [BenchmarkModule(x) for x in json_object["modules"]], + json_object.get("container-image",None) ) @@ -294,8 +301,7 @@ def serialize(self) -> dict: return {"size": self.code_size, "hash": self.hash} def query_cache(self): - - if self.container_deployment: + if self.container_deployment or self.benchmark_config.container_image is not None: self._code_package = self._cache_client.get_container( deployment=self._deployment_name, benchmark=self._benchmark, @@ -633,7 +639,6 @@ def build( ], is_workflow: bool, ) -> Tuple[bool, str, bool, str]: - # Skip build if files are up to date and user didn't enforce rebuild if self.is_cached and self.is_cached_valid: self.logging.info( @@ -681,6 +686,7 @@ def build( self.is_cached_valid, ) ) + self._container_deployment = self._container_uri != "" self.logging.info( ( "Created code package (source hash: {hash}), for run on {deployment}" diff --git a/sebs/faas/container.py b/sebs/faas/container.py index af24d8bd5..97e1bc449 100644 --- a/sebs/faas/container.py +++ b/sebs/faas/container.py @@ -130,6 +130,10 @@ def registry_name( ) -> Tuple[str, str, str, str]: pass + @abstractmethod + def registry(image_tag:str) -> Tuple[str,str,str,str]: + pass + def build_base_image( self, directory: str, @@ -217,3 +221,53 @@ def build_base_image( self.push_image(image_uri, image_tag) return True, image_uri + + @abstractmethod + def build_custom_image(self, base_image_tag: str, package_directory: str, language_name: str, language_version: str, architecture: str,benchmark:str) -> Tuple[bool, str]: + image_tag = f"sebs.{base_image_tag.replace(':','_').replace('/','.')}.{benchmark}.{language_name}.{language_version}.{architecture}" + registry_name, repository_name, image_tag, image_uri = self.registry(image_tag) + if self.find_image(registry_name,image_tag): + self.logging.info( + f"Skipping building custom Docker image for {benchmark}, using " + f"Docker image {image_uri} from registry: {registry_name}." + ) + return False, image_uri + build_dir = os.path.join(package_directory, "build") + os.makedirs(build_dir, exist_ok=True) + + shutil.copy( + os.path.join(DOCKER_DIR, self.name(), language_name, "Dockerfile.base"), + os.path.join(build_dir, "Dockerfile"), + ) + for fn in os.listdir(package_directory): + if fn not in ("index.js", "__main__.py"): + file = os.path.join(package_directory, fn) + shutil.move(file, build_dir) + + with open(os.path.join(build_dir, ".dockerignore"), "w") as f: + f.write("Dockerfile") + + self.logging.info(f"Building custom image from {base_image_tag}.") + + isa = platform.processor() + if (isa == "x86_64" and architecture != "x64") or ( + isa == "arm64" and architecture != "arm64" + ): + self.logging.warning( + f"Building image for architecture: {architecture} on CPU architecture: {isa}. " + "This step requires configured emulation. If the build fails, please consult " + "our documentation. We recommend QEMU as it can be configured to run automatically." + ) + + buildargs = { + "BASE_IMAGE": base_image_tag, + } + image, _ = self.docker_client.images.build( + tag=image_uri, path=build_dir, buildargs=buildargs + ) + self.logging.info( + f"Push the custom benchmark image {repository_name}:{image_tag} " + f"to registry: {registry_name}." + ) + self.push_image(image_uri, image_tag) + return True, image_uri \ No newline at end of file diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 0adcfc1d7..0b2b35b20 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -363,7 +363,7 @@ def update_workflow(self, workflow: Workflow, code_package: Benchmark): def get_workflow(self, code_package: Benchmark, workflow_name: Optional[str] = None): if code_package.language_version not in self.system_config.supported_language_versions( - self.name(), code_package.language_name + self.name(), code_package.language_name, code_package.architecture ): raise Exception( "Unsupported {language} version {version} in {system}!".format( @@ -375,7 +375,7 @@ def get_workflow(self, code_package: Benchmark, workflow_name: Optional[str] = N if not workflow_name: workflow_name = self.default_function_name(code_package) - rebuilt, _ = code_package.build(self.package_code, True) + rebuilt, _ , _ ,_= code_package.build(self.package_code, True) """ There's no function with that name?