-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcollisearch_bash.py
More file actions
174 lines (151 loc) · 5.46 KB
/
collisearch_bash.py
File metadata and controls
174 lines (151 loc) · 5.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/env python3.7
import argparse
import os
import subprocess
from multiprocessing import Process, Queue
from time import perf_counter as clock
import requests
from shinjuku import lt
from shinjuku.search import dijkstra
from shinjuku.transcode import encode_comp, decode_comp, realise_comp
true = set([])
def getsortedsls():
objs = []
r = requests.get("https://catagolue.appspot.com/textcensus/b3s23/synthesis-costs")
lines = r.content.decode().partition("\n")[2].splitlines()
for (k, line) in enumerate(lines, 1):
apgcode, cost = [x.strip('"') for x in line.split(",")]
if not apgcode.startswith("xs"):
continue # not a still life
cost = int(cost)
if cost == 999999_999999_999999:
min_paths[apgcode] = (9999, None, None) # infinity
cost = 9999
if cost < 100000_000000_000002: # pseudo
pass
else:
true.add(apgcode)
cost -= 100000_000000_000000 # cost according to Cata
if apgcode not in min_paths:
min_paths[apgcode] = (cost, None, None)
sjk_cost = min_paths[apgcode][0]
if sjk_cost > cost:
min_paths[apgcode][0] = cost
objs.append((apgcode, cost))
objs.sort(key=lambda x: x[1])
return objs
def cost(apgcode):
if apgcode in min_paths:
return min_paths[apgcode][0]
return -1
def workerfunc(outqueue, inqueue, id):
os.chdir("./C++")
proc = subprocess.Popen("./GameOfLife", stdout=subprocess.PIPE, stdin=subprocess.PIPE)
res = ""
while True:
try:
inqueue.get_nowait()
return
except:
line = proc.stdout.readline().decode("utf-8")
if line != "\n":
res += line
else:
res = res.replace('\n', '')
reslist = res.split(' ')
outqueue.put(reslist + [id])
res = ""
def getargs():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-d", "--out-dir", help="directory to store synthesis rles in",
default="./collisearch_results")
args = parser.parse_args()
return args
def makemosaic(synths, targetdir):
mosaic = lt.pattern()
nsynths = len(synths)
for i, synth in enumerate(synths):
xoff = (i % 50) * 100
yoff = (i // 50) * 100
rle = synths[synth][0]
mosaic += lt.pattern(rle)(xoff, yoff)
mosaic.write_rle(f"%s/%s.rle" % (targetdir, nsynths))
def backtrack(apgcode, sofar, used_by, min_paths):
pred = min_paths[apgcode][1]
if pred is None:
return
sofar.add(apgcode)
if pred not in used_by:
used_by[pred] = set([])
for x in sofar:
used_by[pred].add(x)
backtrack(pred, sofar, used_by, min_paths)
def getuses(min_paths):
used_by = {}
i = 0
for code in min_paths:
backtrack(code, set([]), used_by, min_paths)
i += 1
return used_by
def printuses(used_by, apgcode):
if apgcode not in used_by:
return "no uses"
uses = used_by[apgcode]
return f"used by {len(uses)}, first 5 uses: {list(uses)[:5]}"
if __name__ == "__main__":
min_paths = dijkstra()
objs = getsortedsls()
costfile = open("./C++/costs.txt", "w")
for x in objs:
# print(f"{x[0]} {x[1]}")
costfile.write(f"{x[0]} {x[1]}\n")
used_by = getuses(min_paths)
compfile = open(
"./collisearch_out.sjk",
"a")
compfile.write("\n")
args = getargs()
nthreads = 6
resqueue = Queue()
workqueue = Queue()
arguments = [resqueue, workqueue]
workers = [Process(target=workerfunc, args=arguments + [i]) for i in range(nthreads)]
upto = [0 for i in range(nthreads)]
starttime = clock()
[w.start() for w in workers]
synths = {}
prevmax = 0
try:
while True:
synth = resqueue.get()
prevcost = 9999
if len(synth) == 6:
upto[synth[5]] = int(synth[4])
apgcode = synth[0]
if apgcode in synths:
prevcost = synths[apgcode][1]
else:
prevcost = min(prevcost, int(synth[1]))
if int(synth[2]) < prevcost:
comp = encode_comp(synth[3])
#input, output, cost = decode_comp(comp)
print(comp)
compfile.write(comp + "\n")
compfile.flush()
truestr = "true" if apgcode in true else "pseudo"
filename = f"%spattern.rle" % os.getpid()
msg = f"Object {synth[0]} ({truestr}) (previous cost {prevcost}, this cost {synth[2]}) " \
f"({printuses(used_by, synth[0])}) produced by the following collision: \n{lt.pattern(synth[3]).rle_string(filename=filename)}"
print(msg)
synths[apgcode] = (synth[3], int(synth[2]))
print(f"%s total synths\n" % len(synths))
makemosaic(synths, args.out_dir)
else:
upto[synth[1]] = int(synth[0])
if sum(upto) >= prevmax + 10000:
prevmax = sum(upto)
print(f"tested {sum(upto)} in %.2f seconds, per-thread {upto}\n" % (clock() - starttime,))
except KeyboardInterrupt:
[workqueue.put("terminate") for w in workers]
[w.terminate() for w in workers]
raise KeyboardInterrupt