This repository was archived by the owner on Jun 19, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathapp.py
More file actions
138 lines (105 loc) · 4.56 KB
/
app.py
File metadata and controls
138 lines (105 loc) · 4.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import re
import subprocess
import tempfile
from tempfile import _TemporaryFileWrapper
from typing import Tuple, Union
from PIL import Image
import json
import gradio as gr
import os
import zipfile
VERSION = "0.1"
REGEXP_BASIC_JSON_PARSER = re.compile(r"\[.*\]", re.MULTILINE)
def get_temporary_filename():
filename: Union[str, None] = None
with tempfile.TemporaryFile() as tmp:
filename = os.path.basename(tmp.name)
return filename
def parse_images(images: list[_TemporaryFileWrapper]):
if not images or len(images) == 0: return []
return [image.name if image else "" for image in images]
# TODO: Make this more secure.
def upload_to_folder(images: list[_TemporaryFileWrapper]):
input_folder = os.path.join(".", "inputs")
if not os.path.exists(input_folder):
os.mkdir(input_folder)
file_paths = []
for image in images:
temp_name = get_temporary_filename()
image_extension = os.path.basename(image.name).split('.')[1] or "jpg"
image_path = os.path.join(
input_folder, "{}.{}".format(temp_name, image_extension))
file_paths = file_paths + [image_path]
print(image_path)
with Image.open(image.file) as image_file:
image_file.save(image_path)
return file_paths
# TODO: Write Unit Tests...
def parse_jsons(stdout: str):
lines = stdout.splitlines()
json_match = list(filter(lambda line: not line == None, map(lambda line: re.match(REGEXP_BASIC_JSON_PARSER,line), lines)))[0]
json_str = json_match.group(0) if json_match else "[]"
return json.loads(json_str)
def download_zip(image_tuples: list[Tuple[_TemporaryFileWrapper, str, int, str]]):
downloads_path = os.path.join(".", "downloads")
if not os.path.exists(downloads_path):
os.mkdir(downloads_path)
# TODO: Write better duplicate handling.
# TODO: Create datetime or some other UID or the temp file gets accidently delete on next download button click.?
file_path = os.path.join(downloads_path, "temp.zip")
with zipfile.ZipFile(file_path, 'w') as zip:
for image_pair in image_tuples:
zip.write(image_pair[0].name, os.path.join("images", str(image_pair[2]) + "-" + image_pair[1] + image_pair[3]))
return {
zip_file: gr.update(value=file_path, visible=True)
}
with gr.Blocks() as app:
files = gr.Files(file_types=["image"])
gallery = gr.Gallery().style(grid=4)
files.change(fn=parse_images, inputs=[files], outputs=[gallery])
button_interrogate = gr.Button("Interrogate")
button_download_zip = gr.Button("Download .ZIP", visible=False)
zip_file = gr.File(visible=False)
captions_json = gr.JSON()
caption_state = gr.State(value=None) # type: ignore
def interrogate_images(images: list[_TemporaryFileWrapper]):
output_images = []
output_json = []
caption_state_value = []
uploaded_file_paths = upload_to_folder(images)
parsed_stdout = interrogate_uploaded_images(uploaded_file_paths)
parsed_jsons = parse_jsons(parsed_stdout) # TODO: Simplify and optimized code
for json_item in parsed_jsons:
image = images[int(json_item["index"]) - 1] # type: ignore
caption = json_item["caption"] # type: ignore
index = json_item["index"] # type: ignore
filetype = json_item["filetype"] # type: ignore
output_images.append((
image.name,
caption
))
caption_state_value.append((
image,
caption,
index,
filetype
))
output_json = map(lambda output_image: (os.path.basename(
output_image[0]), output_image[1]), output_images)
[os.remove(uploaded_file_path) for uploaded_file_path in uploaded_file_paths]
return {
gallery: output_images,
captions_json: output_json,
button_download_zip: gr.update(visible=True),
caption_state: caption_state_value
}
def interrogate_uploaded_images(file_paths: list[str]):
command = "python main.py interrogate {}".format(" ".join(file_paths))
result = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, universal_newlines=True)
return result.stdout
button_interrogate.click(fn=interrogate_images, inputs=[
files], outputs=[gallery, captions_json, button_download_zip, caption_state])
button_download_zip.click(fn=download_zip, inputs=[
caption_state], outputs=[zip_file])
app.launch()