-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.py
More file actions
125 lines (104 loc) · 4.27 KB
/
example.py
File metadata and controls
125 lines (104 loc) · 4.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import guardx
import guardx.analysis
import guardx.analysis.types
from guardx.analysis.types import AnalysisType
import guardx.policy
import io
import json
import logging
import re
import tarfile
from typing import Optional, TypedDict, Set, TYPE_CHECKING
import docker
if TYPE_CHECKING:
from docker.models.containers import ExecResult
class ExecutionResult(TypedDict):
success: bool
result: Optional[str]
locals: Optional[str]
stdout: Optional[str]
docker_exec_result: 'ExecResult'
class PythonExecutes:
def __init__(self):
"""TODO make it a parameter whether we re-create the container each time or re-use the existing one. If re-use, use context manager."""
pass
def __call__(
self, python_code: str,
) -> ExecutionResult: # TODO other ones return bool. what to do?
# static analysis
# g = guardx.Guardx()
# result = g.analyze(python_code, [analysistype])
# prefilter based on result
# sandboxed execution
result = guardx.Guardx().execute(python_code).get_docker_result()
if result.exit_code == 0:
return self._format_result(result)
else:
return {
"success": False,
"result": None,
"locals": None,
"stdout": None,
"docker_exec_result": result,
}
def analyze(self, python_code: str) -> guardx.analysis.types.AnalysisResults:
result = guardx.Guardx().analyze(python_code, {AnalysisType.DETECT_SECRET, AnalysisType.UNSAFE_CODE})
print(result)
return result
def _format_result(
self, docker_result: 'ExecResult'
) -> ExecutionResult:
"""Formats the result from the wrapper script.
The output from the wrapper script should look something like:
```
lines of stdout
-- THIS LINE IS METADATA --
JSON encoding of {result: "...", locals: {...}}
```
where 'result' is the result of `eval`ing the last line.
We return the JSON that is returned by the wrapper, with a couple of modifications:
1. we add an "stdout" that contains the stdout.
2. if the `result` is None, then we set `result` to the last non-whitespace line of stdout before the meta separator line.
"""
assert (
docker_result.exit_code == 0
), f"Expected to receive an ExecResult is a successful error code, but got: {docker_result}"
# Parse the JSON and STDOUT from the result.
lines = docker_result.output.decode().strip().splitlines()
json_from_eval_wrapper = json.loads(lines[-1])
stdout = lines[0 : lines.index("-- THIS LINE IS METADATA --")]
# The json_from_eval_wrapper has a result key. That result might be None if the last thing that the script did was print something out.
# In that case, we want the last printed line in STDOUT that wasn't empty to be reported as the result.
if json_from_eval_wrapper["result"] is None:
# find the first line in stdout that isn't whitespace.
non_whitespace_lines = list(
filter(lambda line: line.strip() != "", stdout.strip().splitlines())
)
# If there is such a line, set the result to that line's contents.
if len(non_whitespace_lines) != 0:
json_from_eval_wrapper["result"] = non_whitespace_lines[-1]
return {
"success": True,
"result": json_from_eval_wrapper["result"],
"locals": json_from_eval_wrapper["locals"],
"stdout": stdout,
"docker_exec_result": docker_result,
}
if __name__ == "__main__":
import argparse
argument_parser = argparse.ArgumentParser()
argument_parser.add_argument("--file", required=True)
argument_parser.add_argument("--analyze", action=argparse.BooleanOptionalAction)
args = argument_parser.parse_args()
contents = open(args.file).read()
logging.getLogger().setLevel(logging.INFO)
v = PythonExecutes()
if args.analyze:
result = v.analyze(contents)
result = v(contents)
if result["success"]:
print(result["result"])
print("SUCCESS")
else:
print(result["docker_exec_result"])
print("DID NOT SUCCEED")