-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
169 lines (150 loc) · 5.41 KB
/
main.py
File metadata and controls
169 lines (150 loc) · 5.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# -*- coding: utf-8 -*-
"""
license-classifier: license扫描分类工具
功能: 代码分析
用法: python3 main.py
"""
import os
import json
import subprocess
import sys
import platform
from license_type import *
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
UNKNOWN = 5
Severity2Rule = {
CRITICAL: "critical-risk",
HIGH: "high-risk",
MEDIUM: "medium-risk",
LOW: "low-risk",
UNKNOWN: "unknown-risk"
}
CONFIDENCE = 0.9
class LicenseClassifier(object):
def __get_task_params(self):
"""获取需要任务参数
:return:
"""
task_request_file = os.environ.get("TASK_REQUEST")
with open(task_request_file, "r") as rf:
task_request = json.load(rf)
task_params = task_request["task_params"]
return task_params
def run(self):
"""
:return:
"""
# 代码目录直接从环境变量获取
source_dir = os.environ.get("SOURCE_DIR", None)
print("[debug] source_dir: %s" % source_dir)
work_dir = os.environ.get("RESULT_DIR", None)
# 其他参数从task_request.json文件获取
task_params = self.__get_task_params()
# 规则
rules = task_params["rules"]
# 过滤(默认过滤.git)
re_exclude_path = task_params["path_filters"]["re_exclusion"]
re_exclude = [".*/.git/.*"]
re_exclude.extend(re_exclude_path)
result = []
result_path = os.path.join(work_dir, "result.json")
scan_files = [source_dir]
if not scan_files:
print("[error] To-be-scanned files is empty, return empty result")
with open(result_path, "w") as fp:
json.dump(result, fp, indent=2)
return
print("[debug] scan files: %s" % len(scan_files))
error_output = os.path.join(work_dir, "license.json")
outfile = os.path.join(work_dir, "output")
fs = open(outfile, "w")
textchars = bytearray({7,8,9,10,12,13,27} | set(range(0x20, 0x100)) - {0x7f})
is_binary_string = lambda bytes: bool(bytes.translate(None, textchars))
# 三端环境
if sys.platform in ("darwin",):
cmd = ["./tool/mac/identify_license"]
elif sys.platform in ("linux", "linux2"):
# arm64
arch = platform.machine().lower()
if arch in ('aarch64', 'arm64', 'armv8'):
cmd = ["./tool/linux_arm64/identify_license"]
else:
cmd = ["./tool/linux/identify_license"]
elif sys.platform in ("win32"):
cmd = ["./tool/windows/identify_license.exe"]
cmd = cmd + [
"-headers",
"-json",
error_output
]
if re_exclude:
cmd.extend(["-ignore_paths_re", "\"%s\"" % ",".join(re_exclude)])
cmd.extend(scan_files)
scan_cmd = " ".join(cmd)
print("[debug] cmd: %s" % scan_cmd)
subproc = subprocess.Popen(scan_cmd, stdout=fs, stderr=subprocess.STDOUT, shell=True)
subproc.communicate()
print("start data handle")
# 数据处理
try:
with open(error_output, "r") as f:
outputs_data = json.load(f)
except:
print("[error] Resulting file not found or cannot be loaded, return empty result")
with open(outfile, "r") as fs:
print(fs.read())
with open(result_path, "w") as fp:
json.dump(result, fp, indent=2)
return
if outputs_data:
for file_res in outputs_data:
path = file_res["Filepath"]
# 过滤掉二进制文件
try:
if is_binary_string(open(path, 'rb').read(1024)):
print("skip binary file: %s" % path)
continue
except Exception as e:
print("error: %s" % e)
continue
for item in file_res["Classifications"]:
confidence = item["Confidence"]
if confidence < CONFIDENCE:
continue
license = item['Name']
severity = self.license_severity(license)
rule_name = Severity2Rule.get(severity, None)
if rule_name not in rules:
continue
issue = {}
issue['path'] = path
issue['line'] = item['StartLine']
issue['column'] = 0
issue['msg'] = "License: %s; Confidence: %s; Link: https://spdx.org/licenses/%s.html" % (license, confidence, license)
issue['rule'] = rule_name
issue['refs'] = []
if issue != {}:
result.append(issue)
with open(result_path, "w") as fp:
json.dump(result, fp, indent=2)
def license_severity(self, name):
"""
根据license类型获取严重性
"""
if name in forbiddenType:
return CRITICAL
elif name in restrictedType:
return HIGH
elif name in reciprocalType:
return MEDIUM
elif name in (noticeType + permissiveType + unencumberedType):
return LOW
else:
return UNKNOWN
if __name__ == "__main__":
print("-- start run tool ...")
LicenseClassifier().run()
print("-- end ...")