-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathplay_pong.py
More file actions
184 lines (148 loc) · 6.37 KB
/
play_pong.py
File metadata and controls
184 lines (148 loc) · 6.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import numpy as np
from kivy.app import App
from kivy.clock import Clock
from kivy.vector import Vector
from kivy.uix.widget import Widget
from kivy.core.window import Window
from pylsl import StreamInlet, resolve_stream
from kivy.properties import NumericProperty, ReferenceListProperty, ObjectProperty
def get_lsl_signal_stream():
"""
Uses pylsl to receive the EEG stream from the cap. Must ideally have two streams.
"""
print("looking for an EEG stream...")
streams = resolve_stream('type', 'EEG')
print("Received no of streams: {}".format(len(streams)))
# create a new inlet to read from the stream
inlet_1 = None
inlet_2 = None
# TODO: Change the stream_name to the desired value
for s in streams:
stream_name = StreamInlet(s).info().name()
if stream_name == "Player_1":
inlet_1 = StreamInlet(s)
elif stream_name == "PongPlayer2":
inlet_2 = StreamInlet(s)
if inlet_1 is None:
print("Stream for player 1 not found. Please ensure that the scenario is running.")
if inlet_2 is None:
print("Stream for player 2 not found. Please ensure that the scenario is running.")
return inlet_1, inlet_2
class PongPaddle(Widget):
"""
Widget used to specify the ball bouncing off of the paddle.
"""
score = NumericProperty(0)
def bounce_ball(self, ball):
if self.collide_widget(ball):
vx, vy = ball.velocity
offset = (ball.center_y - self.center_y) / (self.height / 2)
bounced = Vector(-1 * vx, vy)
vel = bounced * 1.1
ball.velocity = vel.x, vel.y + offset
class PongBall(Widget):
"""
Widget the describes the ball and its velocity.
"""
velocity_x = NumericProperty(0)
velocity_y = NumericProperty(0)
velocity = ReferenceListProperty(velocity_x, velocity_y)
def move(self):
self.pos = Vector(*self.velocity) + self.pos
class PongGame(Widget):
"""
This is the real deal. Contains events handlers for keyboard inputs if pressed, updates the
ball velocity on collision and also adds touchscreen controls (kinda useless for now).
"""
ball = ObjectProperty(None)
player1 = ObjectProperty(None)
player2 = ObjectProperty(None)
game = ObjectProperty(None)
ball_velocity = 10
def __init__(self, **kwargs):
super(PongGame, self).__init__(**kwargs)
self._keyboard = Window.request_keyboard(self._keyboard_closed, self)
self._keyboard.bind(on_key_down=self._on_keyboard_down)
self.speed = 25
self.beta_threshold = 2
self.left_movement_threshold = 0.2
self.right_movement_threshold = 0.2
def _keyboard_closed(self):
self._keyboard.unbind(on_key_down=self._on_keyboard_down)
self._keyboard = None
def _on_keyboard_down(self, keyboard, keycode, text, modifiers):
if keycode[1] == 'w' and self.player1.center_y < self.game.height - 100:
print("Up")
self.player1.center_y += self.speed
elif keycode[1] == 's' and self.player1.center_y > 100:
print("Down")
self.player1.center_y -= self.speed
elif keycode[1] == 'up' and self.player2.center_y < self.game.height - 100:
self.player2.center_y += self.speed
elif keycode[1] == 'down' and self.player2.center_y > 100:
self.player2.center_y -= self.speed
return True
def serve_ball(self, vel=(ball_velocity, 0)):
self.ball.center = self.center
self.ball.velocity = vel
def update(self, dt):
self.ball.move()
if inlet_1 is not None:
sample_1, _ = inlet_1.pull_sample()
if inlet_2 is not None:
sample_2, _ = inlet_2.pull_sample()
# bounce of paddles
self.player1.bounce_ball(self.ball)
self.player2.bounce_ball(self.ball)
# bounce ball off bottom or top
if (self.ball.y < self.y) or (self.ball.top > self.top):
self.ball.velocity_y *= -1
# went of to a side to score point?
if self.ball.x < self.x:
self.player2.score += 1
self.serve_ball(vel=(self.ball_velocity, 0))
if self.ball.x > self.width:
self.player1.score += 1
self.serve_ball(vel=(-self.ball_velocity, 0))
if inlet_1 is not None:
if len(sample_1) > 2:
avg_beta_signal_1 = np.mean(sample_1)
if avg_beta_signal_1 > self.beta_threshold and self.player1.center_y < self.game.height - 100:
self.player1.center_y += self.speed
if avg_beta_signal_1 <= self.beta_threshold and self.player1.center_y > 100:
self.player1.center_y -= self.speed
else:
if sample_1[1] > self.right_movement_threshold and self.player1.center_y < self.game.height - 100:
self.player1.center_y += self.speed
if sample_1[0] > self.left_movement_threshold and self.player1.center_y > 100:
self.player1.center_y -= self.speed
if inlet_2 is not None:
if len(sample_2) > 2:
avg_beta_signal_2 = np.mean(sample_2)
if avg_beta_signal_2 > self.beta_threshold and self.player2.center_y < self.game.height - 100:
self.player2.center_y += self.speed
if avg_beta_signal_2 <= self.beta_threshold and self.player2.center_y > 100:
self.player2.center_y -= self.speed
else:
if sample_2[1] > self.right_movement_threshold and self.player2.center_y < self.game.height - 100:
self.player2.center_y += self.speed
if sample_2[0] > self.left_movement_threshold <= self.beta_threshold and self.player2.center_y > 100:
self.player2.center_y -= self.speed
def on_touch_move(self, touch):
if touch.x < self.width / 3:
self.player1.center_y = touch.y
if touch.x > self.width - self.width / 3:
self.player2.center_y = touch.y
class PongApp(App):
"""
A class to start the game.
"""
def build(self):
game = PongGame()
game.serve_ball()
Clock.schedule_interval(game.update, 1.0 / 60.0)
print("The game's running")
return game
if __name__ == '__main__':
inlet_1, inlet_2 = get_lsl_signal_stream()
PongApp().run()