-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
128 lines (103 loc) · 4.06 KB
/
main.py
File metadata and controls
128 lines (103 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import json
import quart
import asyncio
import os
import sys
import traceback
import tempfile
import mmap
if not os.path.exists("config.json"):
open("config.json", "wb").write(json.dumps({
"server": {
"host": "0.0.0.0",
"port": 8014
},
"dbp": {
"path": "<path to Dark Basic Professional (Online) root directory>",
"compiler_timeout": 3,
"program_timeout": 5
}
}, indent=2).encode("utf-8"))
print("Created file config.json. Please edit it with the correct settings now, then run the script again")
sys.exit(1)
config = json.loads(open("config.json", "rb").read().decode("utf-8"))
app = quart.Quart(__name__)
compiler_lock = asyncio.Lock()
def line_endings_to_dos(code):
return code.replace("\r", "").replace("\n", "\r\n")
def line_endings_to_unix(code):
return code.replace("\r", "")
async def compile_dbp_source(code):
compiler = os.path.join(config["dbp"]["path"], "Compiler", "DBPCompiler.exe")
async with compiler_lock:
with tempfile.TemporaryDirectory() as tmpdir:
with open(os.path.join(tmpdir, "source.dba"), "wb") as f:
f.write(code.encode("utf-8"))
mm = mmap.mmap(0, 256, "DBPROEDITORMESSAGE")
compiler_process = await asyncio.create_subprocess_exec(compiler, "source.dba", cwd=tmpdir)
try:
await asyncio.wait_for(compiler_process.wait(), config["dbp"]["compiler_timeout"])
except asyncio.TimeoutError:
error_msg = mm.read().decode("utf-8").strip("\r\n\0")
compiler_process.terminate()
await asyncio.sleep(1) # have to wait for the process to actually terminate, or windows won't delete tmpdir
return False, error_msg
if not os.path.exists(os.path.join(tmpdir, "default.exe")):
error_msg = mm.read().decode("utf-8").strip("\r\n\0")
await asyncio.sleep(1) # have to wait for the process to actually terminate, or windows won't delete tmpdir
return False, error_msg
program_process = await asyncio.create_subprocess_exec(
os.path.join(tmpdir, "default.exe"),
stdout=asyncio.subprocess.PIPE,
cwd=tmpdir)
try:
await asyncio.wait_for(program_process.wait(), config["dbp"]["program_timeout"])
out = await program_process.stdout.read()
return True, out.decode("utf-8")
except asyncio.TimeoutError:
program_process.terminate()
await asyncio.sleep(1) # have to wait for the process to actually terminate, or windows won't delete tmpdir
return False, f"Executable didn't terminate after {config['dbp']['program_timeout']}s"
@app.route("/update")
async def do_update():
return {
"success": True,
"message": ""
}
@app.route("/commit_hash")
async def commit_hash():
return {
"commit_hash": "0"
}
@app.route("/compile", methods=["POST"])
async def do_compile():
payload = await quart.request.get_data()
snippet = json.loads(payload.decode("utf-8"))
code = line_endings_to_dos(snippet["code"])
success, output = await compile_dbp_source(code)
return {
"success": success,
"output": line_endings_to_unix(output)
}
@app.route("/compile_multi", methods=["POST"])
async def do_compile_multi():
payload = await quart.request.get_data()
code_snippets = json.loads(payload.decode("utf-8"))
results = list()
for snippet in code_snippets:
code = line_endings_to_dos(snippet["code"])
success, output = await compile_dbp_source(code)
results.append({
"success": success,
"output": line_endings_to_unix(output)
})
return quart.jsonify(results)
loop = asyncio.get_event_loop()
try:
app.run(loop=loop, host=config["server"]["host"], port=config["server"]["port"])
except KeyboardInterrupt:
pass
except:
traceback.print_exc()
finally:
loop.close()