-
Notifications
You must be signed in to change notification settings - Fork 97
Add SDA Workflow #277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/workflows
Are you sure you want to change the base?
Add SDA Workflow #277
Changes from all commits
9b30024
93c1fe5
c4ef107
c9fa196
5de04ed
60143bf
fab742a
3c846d0
19027fe
790ad27
c954fd5
e10af40
39af6e5
840a76f
a9f882a
f64a661
88ca99f
c3cf46c
bc197ad
96605d2
00ca219
e0b24f7
7ecc0be
9a3dacb
0d08618
360f3ef
4850ef3
856bd9a
0bdd2da
e429c8a
bcac8e9
1f15d22
a35cce5
634a960
8696869
b5735ec
90e6528
5ca3c46
4a3ec64
9f6979b
af79eb4
ea636c6
3a17d79
eb805d0
af35601
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| { | ||
| "timeout": 120, | ||
| "memory": 128, | ||
| "languages": ["python"] | ||
| "languages": ["python"], | ||
| "modules": [] | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| def buckets_count(): | ||
| return (0, 0) | ||
|
|
||
| def generate_input(data_dir, size, input_buckets, output_buckets, upload_func): | ||
| def generate_input(data_dir, size, benchmarks_bucket,input_buckets, output_buckets, upload_func, nosql_func): | ||
| return dict() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,6 @@ | ||
| def handler(elem): | ||
| name = elem["name"] | ||
| fn, ln = name.split(" ") | ||
| name = " ".join([ln, fn]) | ||
| elem["name_rev"] = name | ||
|
|
||
| full_name:str = elem["name"] | ||
| names = full_name.split(" ") | ||
| names.reverse() | ||
| elem["name_rev"] = " ".join(names) | ||
| return elem |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| requests |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| cfg/ | ||
| dev/ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| { | ||
| "timeout": 500, | ||
| "memory": 128, | ||
| "languages": ["python"], | ||
| "modules": ["storage"], | ||
| "container-image": "logru/sda-no-db:latest" | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is good but it would be great if we had a clear path to reproduce these images - @LoloGruber, can you please point me to this Dockerfile? On the main branch, we have now C++ support that gives us a way to add "dependencies" images, and I can integrate that.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added code to container.py to build an image by injecting aws bootstrap code to a user-defined base image. This is the related Dockerfile for AWS
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant the Dockerfile that builds SDA :-) I don't see it in the Dockerfile you linked.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| { | ||
| "root": "clearDB", | ||
| "states": { | ||
| "clearDB":{ | ||
| "type": "task", | ||
| "func_name": "clearDB", | ||
| "next": "split" | ||
| }, | ||
| "split": { | ||
| "type": "task", | ||
| "func_name": "split", | ||
| "next": "filter-map" | ||
| }, | ||
| "filter-map":{ | ||
| "type": "map", | ||
| "root": "filter", | ||
| "array": "filter_workloads", | ||
| "next": "prepare-neighbors", | ||
| "states": { | ||
| "filter": { | ||
| "type": "task", | ||
| "func_name": "filter" | ||
| } | ||
| } | ||
| }, | ||
| "prepare-neighbors": { | ||
| "type": "task", | ||
| "func_name": "pre_neighbors", | ||
| "next": "neighbors-map" | ||
| }, | ||
| "neighbors-map":{ | ||
| "type": "map", | ||
| "root": "neighbors", | ||
| "array": "neighbors_workloads", | ||
| "next": "components", | ||
| "states": { | ||
| "neighbors": { | ||
| "type": "task", | ||
| "func_name": "neighbors" | ||
| } | ||
| } | ||
| }, | ||
| "components": { | ||
| "type": "task", | ||
| "func_name": "components", | ||
| "next": "cluster-analyze-map" | ||
| }, | ||
| "cluster-analyze-map":{ | ||
| "type": "map", | ||
| "root": "clustering", | ||
| "array": "cluster_workloads", | ||
| "next": "merge-results", | ||
| "states": { | ||
| "clustering": { | ||
| "type": "task", | ||
| "func_name": "clustering", | ||
| "next": "analysis" | ||
| }, | ||
| "analysis": { | ||
| "type": "task", | ||
| "func_name": "analysis" | ||
| } | ||
| } | ||
| }, | ||
| "merge-results": { | ||
| "type": "task", | ||
| "func_name": "merge" | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| import json | ||
| import os | ||
|
|
||
| class MemgraphConfig: | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a comment for me - we need to integrate it internaly, just like we did with the config for Redis. |
||
| def __init__(self, host:str, port:int, username:str, password:str): | ||
| self.host = host | ||
| self.port = port | ||
| self.username = username | ||
| self.password = password | ||
| if self.host == "localhost": | ||
| raise ValueError("Memgraph database for SDA workflow has to be reachable from the internet. Set the following environment variables via the command line or the SeBS .env file: \n\tMEMGRAPH_HOST\n\tMEMGRAPH_PORT\n\tMEMGRAPH_USER\n\tMEMGRAPH_PASSWORD") | ||
|
|
||
| @staticmethod | ||
| def from_env(): | ||
| with open(".env", "r") as f: | ||
| for line in f: | ||
| key, value = line.strip().split("=", 1) | ||
| os.environ[key] = value | ||
| return MemgraphConfig( | ||
| host=os.getenv("MEMGRAPH_HOST", "localhost"), | ||
| port=int(os.getenv("MEMGRAPH_PORT", 7687)), | ||
| username=os.getenv("MEMGRAPH_USER", ""), | ||
| password=os.getenv("MEMGRAPH_PASSWORD", "") | ||
| ) | ||
|
|
||
| class SDAConfig: | ||
| def __init__(self, memgraph_config: MemgraphConfig, splits:int=0, required_area:float=0.0, | ||
| max_edge_distance:float=0.0, max_neighbours: int = 5, clustering_distance: float = 500.0,merge_workers:int=2, visualize_edges:bool= True): | ||
| self.memgraph_config = memgraph_config | ||
| self.splits = splits | ||
| self.required_area = required_area | ||
| self.max_edge_distance = max_edge_distance | ||
| self.max_neighbours = max_neighbours | ||
| self.clustering_distance = clustering_distance | ||
| self.merge_workers = merge_workers | ||
| self.visualize_edges = visualize_edges | ||
|
|
||
| def get(self): | ||
| return { | ||
| "binary-filters": [ | ||
| { | ||
| "name": "InsidePolygonFilter" | ||
| } | ||
| ], | ||
| "centrality-measures": [ | ||
| { | ||
| "name": "DegreeCentrality" | ||
| }, | ||
| { | ||
| "name": "MeanLocalSignificance" | ||
| }, | ||
| { | ||
| "name": "SmallerNeighboursRatio" | ||
| } | ||
| ], | ||
| "contraction-predicates": [ | ||
| { | ||
| "distance": self.clustering_distance, | ||
| "name": "DistanceBiPredicate" | ||
| } | ||
| ], | ||
| "maxDistanceMeters": self.max_edge_distance, | ||
| "maxNeighbours": self.max_neighbours, | ||
| "memgraph-host": self.memgraph_config.host, | ||
| "memgraph-port": self.memgraph_config.port, | ||
| "memgraph-user": self.memgraph_config.username, | ||
| "memgraph-password": self.memgraph_config.password, | ||
| "merge-workers": self.merge_workers, | ||
| "neighbouring-predicates": [], | ||
| "splits": self.splits, | ||
| "unary-filters": [ | ||
| { | ||
| "name": "ApproxAreaFilter", | ||
| "requiredArea": self.required_area | ||
| } | ||
| ], | ||
| "visualize-edges": self.visualize_edges | ||
| } | ||
|
|
||
| @staticmethod | ||
| def from_benchmark_size(size:str): | ||
| memgraph_config:MemgraphConfig = MemgraphConfig.from_env() | ||
| configs = { | ||
| "test": SDAConfig(memgraph_config, splits=0, required_area=5000.0, max_edge_distance=3000.0, max_neighbours=5, clustering_distance=500.0, merge_workers=1, visualize_edges=False), | ||
| "small": SDAConfig(memgraph_config, splits=1, required_area=500.0, max_edge_distance=1000.0, max_neighbours=5, clustering_distance=500.0, merge_workers=2, visualize_edges=True), | ||
| "large": SDAConfig(memgraph_config, splits=2, required_area=500.0, max_edge_distance=500.0, max_neighbours=5, clustering_distance=200.0, merge_workers=2, visualize_edges=True), | ||
| } | ||
| return configs[size] | ||
|
|
||
| def get_config_file_name(size): | ||
| return f"sda-config-{size}.json" | ||
|
|
||
| def create_config_file(size): | ||
| config = SDAConfig.from_benchmark_size(size) | ||
| cfg_dir = os.path.join(os.path.dirname(__file__), "cfg") | ||
| os.makedirs(cfg_dir, exist_ok=True) | ||
| config_file_path = os.path.join(cfg_dir, get_config_file_name(size)) | ||
| with open(config_file_path, "w") as f: | ||
| json.dump(config.get(), f, indent=4) | ||
| return config_file_path | ||
|
|
||
| def get_input_file(size): | ||
| input_files = { | ||
| "test" : "Corvara_IT.tiff", | ||
| "small": "Corvara_IT.tiff", | ||
| "large": "Wuerzburg_DE.tiff", | ||
| } | ||
| return input_files[size] | ||
|
|
||
| def buckets_count(): | ||
| return (1, 5) | ||
|
|
||
| def upload_all_data(upload_func,data_dir): | ||
| sizes=["test", "small", "large"] | ||
| for size in sizes: | ||
| input_file = get_input_file(size) | ||
| config_path = create_config_file(size) | ||
| upload_func(0, input_file, os.path.join(data_dir, input_file)) | ||
| upload_func(0, get_config_file_name(size), config_path) | ||
|
|
||
|
|
||
|
|
||
| def generate_input(data_dir, size, benchmarks_bucket,input_buckets, output_buckets, upload_func, nosql_func): | ||
| upload_all_data(upload_func,data_dir) | ||
| return { | ||
| "config_file": get_config_file_name(size), | ||
| "input_file": get_input_file(size), | ||
| "input_bucket": input_buckets[0], | ||
| "split_output_bucket": output_buckets[0], | ||
| "filter_output_bucket": output_buckets[1], | ||
| "cluster_output_bucket": output_buckets[2], | ||
| "analysis_output_bucket": output_buckets[3], | ||
| "final_output_bucket": output_buckets[4], | ||
| "benchmark_bucket": benchmarks_bucket | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| import os | ||
| import uuid | ||
| from pathlib import Path | ||
| from . import storage | ||
|
|
||
| storage_client = storage.storage.get_instance() | ||
|
|
||
| SHP_SUFFIX = [".shp", ".shx", ".dbf", ".prj"] | ||
|
|
||
| def download_file(benchmark_bucket, path_in_bucket, dest_dir): | ||
| path = Path(dest_dir) / Path(path_in_bucket).name | ||
| storage_client.download(benchmark_bucket, path_in_bucket, path) | ||
| return path | ||
|
|
||
| def download_file_bucket(benchmark_bucket, bucket, basename, dest_dir): | ||
| return download_file(benchmark_bucket, bucket + '/' + basename, dest_dir) | ||
|
|
||
| def download_shp_file(benchmark_bucket, bucket ,shp_file, dest_dir): | ||
| files = [Path(shp_file).with_suffix(suffix) for suffix in filter(lambda x: x is not ".shp",SHP_SUFFIX)] | ||
| for f in files: | ||
| download_file_bucket(benchmark_bucket, bucket, f.name, dest_dir) | ||
| return download_file_bucket(benchmark_bucket, bucket, shp_file, dest_dir) | ||
|
|
||
| def load_config(event,directory): | ||
| return download_file_bucket(event["benchmark_bucket"], event["input_bucket"], event["config_file"], directory) | ||
|
|
||
| def upload_shp_file(benchmark_bucket, bucket, shp_basename): | ||
| shp_dir = Path(shp_basename).parent | ||
| for f in shp_dir.iterdir(): | ||
| if Path(shp_basename).stem == Path(f).stem and any(f.name.endswith(suffix) for suffix in SHP_SUFFIX): | ||
| full_path = shp_dir / f.name | ||
| storage_client.upload(benchmark_bucket, bucket + '/' + f.name, full_path,False) | ||
|
|
||
| def download_directory(benchmark_bucket, bucket, dest_dir): | ||
| storage_client.download_directory(benchmark_bucket, bucket, dest_dir) | ||
|
|
||
| def create_tmp_dir(): | ||
| tmp_dir = os.path.join("/tmp",str(uuid.uuid4())) | ||
| os.makedirs(tmp_dir, exist_ok=True) | ||
| return tmp_dir |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| import subprocess | ||
| from .SDAHelper import * | ||
|
|
||
| def handler(event): | ||
| benchmark_bucket = event["benchmark_bucket"] | ||
| cluster_output_bucket = event["cluster_output_bucket"] | ||
| TMP_DIR = create_tmp_dir() | ||
| analysis_input_file = download_shp_file(benchmark_bucket,cluster_output_bucket,event["cluster_output_file"],TMP_DIR) | ||
| config_file = load_config(event, TMP_DIR) | ||
| OUTPUT_STEM = "Analysis_"+Path(analysis_input_file).stem | ||
| command = ["SettlementDelineationAnalysis", "-i", str(analysis_input_file), "-c", str(config_file), "--outputStem", OUTPUT_STEM] | ||
| result = subprocess.run(command,capture_output=True,text=True, cwd=TMP_DIR) | ||
| if result.returncode != 0: | ||
| event["stdout"] = result.stdout | ||
| event["stderr"] = result.stderr | ||
| return event | ||
| event.pop("cluster_output_file", None) | ||
| event["analysis_output_files"]= [] | ||
| for file in Path(TMP_DIR).glob(f"{OUTPUT_STEM}*.shp"): | ||
| upload_shp_file(benchmark_bucket, event["analysis_output_bucket"],file) | ||
| event["analysis_output_files"].append(file.name) | ||
| return event |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| from .SDAHelper import * | ||
| import subprocess | ||
|
|
||
| def handler(event): | ||
| TMP_DIR = create_tmp_dir() | ||
| config = load_config(event, TMP_DIR) | ||
| result = subprocess.run([f"AfricapolisClearDatabase", "-c", str(config)],capture_output=True,text=True) | ||
| if result.returncode != 0: | ||
| event["stdout"] = result.stdout | ||
| event["stderr"] = result.stderr | ||
| return event |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| import subprocess | ||
| from .SDAHelper import * | ||
|
|
||
| def handler(event): | ||
|
|
||
| benchmark_bucket = event["benchmark_bucket"] | ||
| filter_output_bucket = event["filter_output_bucket"] | ||
| TMP_DIR = create_tmp_dir() | ||
| input_files = [download_shp_file(benchmark_bucket, filter_output_bucket ,shp_file, TMP_DIR ) for shp_file in event["cluster_input_files"]] | ||
| config = load_config(event, TMP_DIR) | ||
| components = event["cluster_components"] | ||
| OUTPUT_STEM = "Cluster"+str(components[0]) | ||
| # Store workflow data in /tmp due to read only filesystem restriction | ||
| command = [f"SettlementDelineationContraction", "-i"] | ||
| command.extend([str(file) for file in input_files]) | ||
| command.extend(["-c", str(config),"--outputStem",OUTPUT_STEM,"--components"]) | ||
| command.extend([str(comp) for comp in components]) | ||
| result = subprocess.run(command,capture_output=True,text=True, cwd=TMP_DIR) | ||
| if result.returncode != 0: | ||
| event["stdout"] = result.stdout | ||
| event["stderr"] = result.stderr | ||
| return event | ||
| output_file = Path(TMP_DIR).glob(f"{OUTPUT_STEM}*.shp").__next__() | ||
| upload_shp_file(benchmark_bucket, event["cluster_output_bucket"],output_file) | ||
| event.pop("cluster_input_files", None) | ||
| event.pop("cluster_components", None) | ||
| event["cluster_output_file"] = output_file.name | ||
| return {"payload":event,"request_id":event.get("request-id","0")} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why this change is necessary - need to check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran into issue trying to run this on AWS since the task name shadows the function name