-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
133 lines (113 loc) · 4.21 KB
/
app.py
File metadata and controls
133 lines (113 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
import pathlib
import cgi
import secrets
import string
import os
import magic
class ShareService:
def __init__(self, environ, response):
self.environ = environ
self.response = response
self.storage = pathlib.Path(os.getenv('STORAGE', '/tmp/'))
self.default_mime = 'text/plain'
self.default_encoding = 'utf-8'
def _get_blob_path(self, blob_id: str) -> str:
return self.storage.joinpath(blob_id)
def read_blob(self, blob_id: str) -> bytes:
try:
with open(self._get_blob_path(blob_id), 'rb') as fd:
return fd.read()
except (FileNotFoundError, PermissionError, IsADirectoryError):
return
def write_blob(self, blob_id: str, buffer: bytes) -> None:
try:
with open(self._get_blob_path(blob_id), 'wb') as fd:
fd.write(buffer)
except (PermissionError):
return
def generate_blob_id(self, length: int = 8) -> str:
alphabet = string.digits + string.ascii_letters
return ''.join(secrets.choice(alphabet) for _ in range(length))
def generate_unique_blob_id(self, length: int = 8) -> str:
while True:
blob_id = self.generate_blob_id(length)
if not self.storage.joinpath(blob_id).exists():
break
return blob_id
def get_mime(self, buffer: bytes) -> str:
mimes = magic.Magic(mime = True)
mime = mimes.from_buffer(buffer)
badies = ['octet-stream', 'text/']
for bad in badies:
if bad in mime:
mime = 'text/plain'
break
return mime
def get_user_blob(self) -> bytes:
post_data = cgi.FieldStorage(
fp = self.environ.get('wsgi.input'),
environ = self.environ,
keep_blank_values = False,
)
if not post_data:
return
blob = post_data.getvalue(post_data.keys().pop())
if type(blob) == str:
return blob.encode(self.default_encoding)
return blob
def ok(self, buffer: bytes, mime: str = '') -> list:
if not mime:
mime = self.default_mime
self.response('200 OK', [
('Content-Type', mime),
('Content-Length', str(len(buffer))),
])
return [buffer]
def fail(self) -> list:
self.response('451 Unavailable for legal reasons', [
('Content-Type', self.default_mime),
])
return [b'']
def redirect(self, blob_id: str) -> list:
self.response('301 Moved Permanently', [
('Content-Type', self.default_mime),
('Location', f'/{blob_id}'),
])
return [blob_id.encode(self.default_encoding)]
def serve_howto(self) -> list:
manual = pathlib.Path(__file__).parent.joinpath('HOWTO')
url = f'{self.environ.get("REQUEST_SCHEME", "https")}://{self.environ.get("HTTP_HOST","localhost")}/'
with open(manual, 'r') as fd:
buffer = fd.read()
buffer = buffer.replace('__URL__', url)
return self.ok(buffer.encode(self.default_encoding))
def handle_GET(self) -> list:
if self.environ.get('REQUEST_URI') == '/':
return self.serve_howto()
blob_id = self.environ.get('REQUEST_URI', '')
blob_id = pathlib.Path(blob_id).name
if buffer := self.read_blob(blob_id):
mime = self.get_mime(buffer)
return self.ok(buffer, mime)
return self.fail()
def handle_POST(self) -> list:
buffer = self.get_user_blob()
if not buffer:
return self.fail()
if type(buffer) == str:
buffer = buffer.encode(self.default_encoding)
blob_id = self.generate_unique_blob_id()
self.write_blob(blob_id, buffer)
return self.redirect(blob_id)
def handle(self) -> list:
match self.environ.get('REQUEST_METHOD'):
case 'GET':
return self.handle_GET()
case 'POST':
return self.handle_POST()
case _:
return self.fail()
def application(environ, response) -> list:
service = ShareService(environ, response)
return service.handle()