-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimple_docker.py
More file actions
123 lines (100 loc) · 3.27 KB
/
simple_docker.py
File metadata and controls
123 lines (100 loc) · 3.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from batch_core.types import PathArg, cli_arg, MinimalComputeResources
from batch_worker import Container, function, logging, await_function, TempDisk
class PythonImage(Container):
image = "python"
tag: cli_arg[str] = "3.11-slim"
@function(
compute=MinimalComputeResources,
containers=[PythonImage],
)
def python_script(
script: PathArg
):
"""
Run a simple Python script inside a Docker container.
Args:
script: Path to the Python script to run
Returns:
dict: Result of the script execution
"""
logging.info("Starting script execution: %s", script.path.name)
script_run = PythonImage.run(
command=f"python {script.path.name}",
copy={script.url: "."},
result_json_path="result.json",
)
if script_run.exit_code != 0:
raise RuntimeError(
f"Container script execution failed with exit code {script_run.exit_code}"
)
logging.info("Script execution complete")
return {
"exit_code": script_run.exit_code,
"result": script_run.result,
}
@function
def N_python_script(script: PathArg, N: int):
"""
Run the python_script function N times.
Args:
script: Path to the Python script to run
N: Number of times to run the script
Returns:
dict: Aggregated results of the script executions
"""
runs = []
logging.info("Starting to run the script %d times", N)
for i in range(N):
run = python_script.run_async(script=script)
runs.append(run)
results = await_function(runs)
output_dict = [
{
"success": r.success,
"error": r.error,
"result": r.result,
}
for r in results
]
return {
"total_runs": N,
"results": output_dict,
}
class WRTemp(TempDisk):
size_gb: float = 1.0
@function(
compute=MinimalComputeResources,
containers=[PythonImage],
disks=[WRTemp],
)
def write_read_script(write_script: PathArg, read_script: PathArg):
"""
A script writes to disk and the other reads it back.
Args:
write_script: Path to the script that writes data
read_script: Path to the script that reads data
Returns:
dict: Results of read script
"""
logging.info("Running write script: %s", write_script.path.name)
write_result = PythonImage.run(
command=f"python {write_script.path.name} --output_path /data/ --result result.json",
copy={write_script.url: "."},
result_json_path="result.json",
mount={WRTemp: "/data"},
stdout_handler=logging.info,
stderr_handler=logging.error,
)
logging.info("Write script completed with exit code: %d", write_result.exit_code)
read_result = PythonImage.run(
command=f"python {read_script.path.name} --input_file {write_result.result['output_file']} --result result.json",
copy={read_script.url: "."},
result_json_path="result.json",
mount={WRTemp: "/data"},
stdout_handler=logging.info,
stderr_handler=logging.error,
)
logging.info("Read script completed with exit code: %d", read_result.exit_code)
return {
"read_result": read_result.result
}