Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions groundstation/stream_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from sockets.stream_socket import StreamSocket
from transfer.request import Request
from transfer.notification import Notification
import settings

Expand All @@ -17,11 +16,6 @@ def __init__(self, addr):
self.socket.connect((addr, settings.PORT))
self.socket.setblocking(False)

def begin_handshake(self, station):
request = Request("LISTALLOBJECTS", station=station, stream=self)
station.register_request(request)
self.enqueue(request)

def notify_new_object(self, station, path):
# TODO FSWatcher should probably be responsible for catching these to
# keep signal:noise sane
Expand Down
Empty file.
10 changes: 10 additions & 0 deletions groundstation/transfer/handshake_handlers/naive_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from groundstation.transfer.request import Request

class NaiveHandshakeHandler(object):
def __init__(self, station):
self.station = station

def begin_handshake(self, station):
request = Request("LISTALLOBJECTS", station=station, stream=self)
self.station.register_request(request)
station.enqueue(request)
24 changes: 24 additions & 0 deletions groundstation/utils/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
class Queue(list):
"""A queue structure which maintains a list of N recent items, resizable
dynamically"""

def __init__(self, size):
self._size = size

def append(self, obj):
if obj not in self:
super(Queue, self).append(obj)
self.trim()

def trim(self):
while len(self) > self._size:
self.pop(0)

@property
def size(self):
return self._size

@size.setter
def size(self, size):
self._size = size
self.trim()
10 changes: 8 additions & 2 deletions stationd
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ from groundstation.utils import path2id
import groundstation.utils.pid

import groundstation.events.tcpnetwork_event as tcpnetwork_event
import groundstation.transfer.handshake_handlers as handshake_handlers

import groundstation.deferred

Expand All @@ -50,6 +51,10 @@ peer_sockets = PeerSocketPool()

last_beacon = time.time() - BEACON_TIMEOUT # Gaurantee that we'll announce on the first run

HANDSHAKE_HANDLERS = {
"stationd": handshake_handlers.naive_sync.NaiveHandshakeHandler,
}

def _read_sockets():
read_sockets = []
for i in sockets: read_sockets.append(i)
Expand Down Expand Up @@ -83,11 +88,11 @@ def handle_discoverer_event(sock):
if not station.recently_queried(event.payload):
client = StreamClient(peer[0])
peer_sockets.append(client)
client.begin_handshake(station)
handshake_handler.begin(station)
else: # XXX This should check if we have open transactions.
if not station.recently_queried(event.payload):
client = peer_sockets[peer[0]]
client.begin_handshake(station)
handshake_handler.begin(station)

else:
# Peer's uuid is smaller, we should do nothing and await connection
Expand Down Expand Up @@ -155,6 +160,7 @@ for host in args.hosts:
if args.pidfile:
groundstation.utils.pid.register_pidfile(args.pidfile)

handshake_handler = HANDSHAKE_HANDLERS[sys.argv[0]](station)
while True:
log.info("Current status :: Iterators: %i Deferreds: %i" %
(len(station.iterators), len(station.deferreds)))
Expand Down
24 changes: 24 additions & 0 deletions test/test_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import unittest

from groundstation.utils.queue import Queue

class TestQueue(unittest.TestCase):
def test_trims_fifo(self):
q = Queue(2)
q.append(0)
q.append(1)
q.append(2)
self.assertNotIn(0, q)
self.assertIn(1, q)
self.assertIn(2, q)

def test_trims_fifo_dynamically(self):
q = Queue(10)
q.append(0)
q.append(1)
q.append(2)

q.size = 2
self.assertNotIn(0, q)
self.assertIn(1, q)
self.assertIn(2, q)
6 changes: 5 additions & 1 deletion test/test_station_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
TestListener, \
TestClient

from groundstation.transfer.handshake_handlers.naive_sync \
import NaiveHandshakeHandler



class StationConnectionTestCase(StationIntegrationFixture):
Expand Down Expand Up @@ -44,7 +47,8 @@ def tick():

self.assertEqual(len(swrite), 1)

client.begin_handshake(self.stations[0])
handshaker = NaiveHandshakeHandler(client)
handshaker.begin_handshake(self.stations[0])
client.send()

(sread, swrite, _) = tick()
Expand Down