Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 35 additions & 40 deletions helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@
from io import BytesIO
from PIL import Image

from hopper.client import *
from hopper.common import *

PIPE_DIRECTORY = "/home/pi/pipes"

HOPPER_CLIENT = HopperClient()
LOG_PIPE_NAME = PipeName((PipeType.RECEIVING, "log", "helper"), PIPE_DIRECTORY)
HOPPER_CLIENT.open_pipe(LOG_PIPE_NAME, delete=True, create=True)

CONNECTIONS = set()

# The following sets up the asynchronous waiting for file change
picture_watcher = aionotify.Watcher()
log_watcher = aionotify.Watcher()

img_static_path = "/home/pi/shepherd/shepherd/static/"
img_input_file = img_static_path + "image.jpg"
Expand All @@ -34,13 +41,17 @@
log_static_path = "/media/RobotUSB/"
log_input_file = log_static_path + "logs.txt"

log_buffer = []
ERASE_ESCAPE_SEQUENCE = "\033[2J"

file_open_attempts = 10
wait_between_attempts = 0.1

picture_watcher.watch(
alias="image", path=img_input_file, flags=aionotify.Flags.MODIFY
) # sets up watcher
log_watcher.watch(alias="logs", path=log_input_file, flags=aionotify.Flags.MODIFY)




def shrink_image(img):
Expand Down Expand Up @@ -110,53 +121,33 @@ async def wait_for_picture_change():

async def wait_for_log_change():
loop = asyncio.get_event_loop()

websockets.broadcast(CONNECTIONS, ERASE_ESCAPE_SEQUENCE + "\n")

bypass = False # so first image is not ignored.
while not os.path.exists(log_input_file):
await asyncio.sleep(0.5) # twiddle thumbs :)
if not bypass:
bypass = True
await log_watcher.setup(loop)
print("Log change watcher is running.")
while True:
d = HOPPER_CLIENT.read(LOG_PIPE_NAME)

while True: # for all events
if not bypass:
event = await log_watcher.get_event() # blocks until file changed
else:
bypass = False # reset bypass
if d is not None:
ds = d.decode("utf-8")
if ERASE_ESCAPE_SEQUENCE in ds:
log_buffer.clear()
websockets.broadcast(CONNECTIONS, ERASE_ESCAPE_SEQUENCE + "\n")
print("Received erase sequence")
else:
websockets.broadcast(CONNECTIONS, "[LOGS]" + ds)
log_buffer.append("[LOGS]" + ds)
print("[LOGS]" + ds, end="")

with open(log_input_file, "r") as l:
old_logs = l.read()
for c in range(file_open_attempts):
await asyncio.sleep(wait_between_attempts) # give it time to write the file.
try: # this runs until the bot has finished writing the logs
with open(log_input_file, "r") as l:
new_logs = l.read()
print("Opened logs successfully")
break
except:
print("Error opening logs: attempt \#" + str(c))

if c >= 9:
continue # error with this file, go back and wait for next change.

new_logs.replace(old_logs, "") # only new logs remain.
index = len(new_logs) - len(old_logs)
old_logs = new_logs
new_logs = new_logs[index:]
await asyncio.sleep(0.1)

websockets.broadcast(CONNECTIONS, "[LOGS]" + new_logs) # sends new logs.
print("Logs broadcast.")

# politely stops watching file system.
log_watcher.close()
loop.stop()
loop.close()


async def register(websocket): # Runs every time someone connects
CONNECTIONS.add(websocket)
print("Someone has connected to the websocket.")

for c in range(file_open_attempts):
time.sleep(wait_between_attempts) # give it time to write the file.
try: # this runs until the bot has finished writing the image
Expand All @@ -172,7 +163,12 @@ async def register(websocket): # Runs every time someone connects
if not bypass:
img = shrink_image(img)
img_b64 = im_2_b64(img).decode()
await websocket.send(img_b64)
await websocket.send("[CAMERA]" + img_b64)

# Send previous logs
for l in log_buffer:
await websocket.send(l)

try:
await websocket.wait_closed()
finally:
Expand All @@ -185,6 +181,5 @@ async def main():
wait_for_picture_change(), wait_for_log_change()
) # runs the file change checker and webserver at the same time.


asyncio.run(main())
print("Goodbye.")
12 changes: 12 additions & 0 deletions runner/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from enum import Enum

class State(Enum):
# Once shepherd is up, we are by definition ready to run code, so
# there's no need for a "booting" state.
ready = object()
running = object()
post_run = object()

class Mode(Enum):
dev = "dev"
comp = "comp"
56 changes: 56 additions & 0 deletions runner/reaper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from enums import State
import threading
import errno

class Reaper:
@staticmethod
def reap(state, user_code, output_file, reason="", reap_grace_time=5):
if reason is None:
print("Reaping user code")
else:
print("Reaping user code ({})".format(reason))
if state != State.running:
print("Warning: told to stop code, but state is {}, not State.running!".format(state))
try:
user_code.terminate()
except OSError as e:
if e.errno == errno.ESRCH: # No such process
pass
else:
raise
if user_code.poll() is None:
butcher_thread = threading.Timer(reap_grace_time, Reaper.butcher, [user_code])
butcher_thread.daemon = True
butcher_thread.start()
try:
user_code.communicate()
except Exception as e:
print("death: Caught an error while killing user code, sod Python's I/O handling...")
print("death: The error was: {}: {}".format(type(e), e))
butcher_thread.cancel()
if output_file is not None:
try:
output_file.write("\n==== END OF ROUND ====\n\n")
except Exception:
pass
try:
output_file.close()
except Exception as e:
print("death: Caught an error while closing user code's output.")
print("death: The error was: {}: {}".format(type(e).__name__, e))

print("Done reaping user code")
return State.post_run

@staticmethod
def butcher(user_code):
if user_code.poll() is None:
print("Butchering user code")
try:
user_code.kill()
except OSError as e:
if e.errno == errno.ESRCH: # No such process
pass
else:
raise
print("Done butchering user code")
Loading
Loading