-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest.py
More file actions
158 lines (135 loc) · 4.31 KB
/
test.py
File metadata and controls
158 lines (135 loc) · 4.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
''' Test
Crawler and peripheral testing.
TODO:
- Finish writing test cases.
- Automate testing using unit testing modules (optional).
- Implement logging system to log all testing information (optional).
'''
#from picamera import PiCamera
from time import sleep
from modules.controller import DS4
from modules.messaging import OutboundMessaging, InboundMessaging, SerialMessaging, SerialPort
from crawler import Crawler
import serial
import requests
import configparser
import logging
def serial_messaging_test():
options = {
'Baudrate' = 115200,
'Timeout' = 3.0,
'Device' = '/dev/ttyUSB0'
}
messenger = SerialMessaging(options)
while True:
messenger.connect():
while messenger.connect():
msg, time = recieve_message()
print(msg)
sleep(2)
def outbound_test():
port = serial.Serial('/dev/ttyUSB0', baudrate=115200, timeout=3.0)
port.write('*'.encode())
port.write("Hello".encode())
'''
message = ""
outbound_thread = OutboundMessaging(port, 0.1, message)
print(outbound_thread.message)
for i in range(30):
outbound_thread.set_message(str(i))
outbound_thread.send_message()
sleep(1)
'''
outbound_test()
def inbound_test():
port = serial.Serial('dev/ttyUSB0', baudrate=115200, timeout=3.0)
message = ""
inbound_thread = InboundMessaging(port, 0.1, message)
inbound_thread.start()
for i in range(30):
outbound_thread.get_message()
delay(1)
#inbound_test()
def crawler_test():
#Attempts to connect with the crawler and send instructions.
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Crawler Test')
logger.setLevel(logging.INFO)
fh = logging.FileHandler('logs/drive.log')
fh.setLevel(logging.INFO)
fh.setFormatter(formatter)
logger.addHandler(fh)
config = configparser.ConfigParser()
config.read('config.ini')
crawler = Crawler(logger, config['COMMUNICATION'])
controller = DS4()
try:
controller.connect()
#controller.start()
#controller.join()
crawler.connect()
while True:
print('.')
controller.get_buttons()
controller.get_axes()
crawler.set_motor_instruction(controller.axes[controller.RIGHT_Y_AXIS])
crawler.set_steering_instruction(controller.axes[controller.LEFT_X_AXIS])
crawler.set_brake_instruction(controller.buttons[controller.L2])
crawler.set_instruction_message()
print(crawler.instructions)
sleep(0.2)
finally:
print('Crawler Test Done.')
crawler.disconnect()
controller.disconnect()
#crawler_test()
def controller_test():
''' Polls connected bluetooth controller for some amount of time '''
print("Controller test started..")
controller = DS4()
controller.run()
'''
controller.connect()
print(controller.name)
print(controller.connected)
try:
while controller.is_connected():
print("R2: ", controller.get_button(controller.R2))
print("X: ", controller.get_axes()[controller.LEFT_X_AXIS])
print(controller.connected)
sleep(0.5)
except KeyboardInterrupt:
controller.connected = False
finally:
controller.disconnect()
print("Controller test ended.")
'''
#controller_test()
def camera_preview():
''' Quick picamera preview '''
print("Camera preview Started, please have monitor connected..")
camera = PiCamera()
camera.start_preview()
sleep(10)
camera.stop_preview()
print("Camera preview has ended.")
#camera_preview()
def serial_test():
''' Serial test used to ensure proper setup between de10 and RPI '''
device = '/dev/ttyUSB0'
port = serial.Serial(device, baudrate=115200, timeout=3.0)
try:
while True:
port.write('*'.encode())
sleep(1)
except KeyboardInterrupt:
print("Ending session.")
finally:
port.close()
print("Completed serial test.")
#serial_test()
def request_test():
''' API update test '''
UPDATE_URL = "http://192.168.0.4:8080/api/update/"
r = requests.post(UPDATE_URL, data={'crawler': 'crawler data here'})
#request_test()