-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy path__init__.py
More file actions
127 lines (91 loc) · 3.69 KB
/
__init__.py
File metadata and controls
127 lines (91 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import socketdev
from socketdev.tools import load_files
import json
import logging
log = logging.getLogger("socketdev")
class FullScans:
@staticmethod
def create_params_string(params: dict) -> str:
param_str = ""
for name in params:
value = params[name]
if value:
param_str += f"&{name}={value}"
param_str = "?" + param_str.lstrip("&")
return param_str
@staticmethod
def get(org_slug: str, params: dict) -> dict:
params_arg = FullScans.create_params_string(params)
path = "orgs/" + org_slug + "/full-scans" + str(params_arg)
headers = None
payload = None
response = socketdev.do_request(path=path, headers=headers, payload=payload)
if response.status_code == 200:
result = response.json()
result["success"] = True
result["status"] = 200
return result
result = {"success": False, "status": response.status_code, "message": response.text}
return result
@staticmethod
def post(files: list, params: dict, workspace: str = None) -> dict:
loaded_files = []
loaded_files = load_files(files, loaded_files, workspace)
params_arg = FullScans.create_params_string(params)
path = "orgs/" + str(params["org_slug"]) + "/full-scans" + str(params_arg)
response = socketdev.do_request(path=path, method="POST", files=loaded_files)
if response.status_code == 201:
result = response.json()
else:
print(f"Error posting {files} to the Fullscans API")
print(response.text)
result = response.text
return result
@staticmethod
def delete(org_slug: str, full_scan_id: str) -> dict:
path = "orgs/" + org_slug + "/full-scans/" + full_scan_id
response = socketdev.do_request(path=path, method="DELETE")
if response.status_code == 200:
result = response.json()
else:
result = {}
return result
@staticmethod
def stream_diff(org_slug: str, before: str, after: str, preview: bool = False) -> dict:
path = f"orgs/{org_slug}/full-scans/stream-diff?before={before}&after={after}&preview={preview}"
response = socketdev.do_request(path=path, method="GET")
if response.status_code == 200:
result = response.json()
else:
result = {}
return result
@staticmethod
def stream(org_slug: str, full_scan_id: str) -> dict:
path = "orgs/" + org_slug + "/full-scans/" + full_scan_id
response = socketdev.do_request(path=path, method="GET")
if response.status_code == 200:
stream_str = []
stream_dict = {}
result = response.text
result.strip('"')
result.strip()
for line in result.split("\n"):
if line != '"' and line != "" and line is not None:
item = json.loads(line)
stream_str.append(item)
for val in stream_str:
stream_dict[val["id"]] = val
stream_dict["success"] = True
stream_dict["status"] = 200
return stream_dict
stream_dict = {"success": False, "status": response.status_code, "message": response.text}
return stream_dict
@staticmethod
def metadata(org_slug: str, full_scan_id: str) -> dict:
path = "orgs/" + org_slug + "/full-scans/" + full_scan_id + "/metadata"
response = socketdev.do_request(path=path, method="GET")
if response.status_code == 200:
result = response.json()
else:
result = {}
return result