-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_qredis.py
More file actions
49 lines (40 loc) · 1.36 KB
/
test_qredis.py
File metadata and controls
49 lines (40 loc) · 1.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import argparse,os,sys,time,subprocess
import logging
import qredis
import utils
import redis
logger = logging.getLogger(__name__)
class QpyTester(utils.QueueTester):
server_process = None
def start_server(self):
self.server_process = subprocess.Popen([
'./servers/redis-server',
'--save','',
'--loglevel','warning',
])
timeout = time.time() + 10
while time.time() < timeout:
try:
client = qredis.Client()
client.stats()
return
except Exception as error:
# logger.warn("Error connecting to redis: %s", error)
time.sleep(1)
self.server_process.kill()
raise OSError("Redis not answering after 10 seconds")
def stop_server(self):
if not self.server_process: return
self.server_process.terminate()
status = self.server_process.wait()
if status != -15:
logger.warn("redis exited with %d", status)
def connect(self,queues_to_watch=None):
self.client = qredis.Client()
self.queues_to_watch = queues_to_watch
def send(self, queue, message):
self.client.put(queue,message)
def recv(self, timeout=0):
return self.client.reserve(self.queues_to_watch)
if __name__ == '__main__':
QpyTester.main()