diff --git a/README.md b/README.md index 98c90d9..2394838 100644 --- a/README.md +++ b/README.md @@ -36,4 +36,20 @@ client2.start_stream() client3.start_stream() ``` +# Vidstream-Updated- +### Updating Vidstream package +- Making server to handle multiple clients connection +- Server broadcast received frame to all connected clients except the one who send the frame +- The video to be displayed to the client and not server + +#### Example + +![Server Image](./img/server.png) + + +![Client 1 Image](./img/client1.png) + + +![Client 2 Image](./img/client2.png) + Check out: https://www.youtube.com/c/NeuralNine \ No newline at end of file diff --git a/img/client1.png b/img/client1.png new file mode 100644 index 0000000..db5a31d Binary files /dev/null and b/img/client1.png differ diff --git a/img/client2.png b/img/client2.png new file mode 100644 index 0000000..0e30810 Binary files /dev/null and b/img/client2.png differ diff --git a/img/server.png b/img/server.png new file mode 100644 index 0000000..86788bd Binary files /dev/null and b/img/server.png differ diff --git a/vidstream/streaming.py b/vidstream/streaming.py index 55f5c2b..5b18e4d 100644 --- a/vidstream/streaming.py +++ b/vidstream/streaming.py @@ -78,6 +78,7 @@ def __init__(self, host, port, slots=8, quit_key='q'): quit_key : chr key that has to be pressed to close connection (default = 'q') """ + self.__connections = [] # list to store client connections self.__host = host self.__port = port self.__slots = slots @@ -120,6 +121,8 @@ def __server_listening(self): continue else: self.__used_slots += 1 + self.__connections.append(connection) + print(f"New connection from {address[0]}:{address[1]}") self.__block.release() thread = threading.Thread(target=self.__client_connection, args=(connection, address,)) thread.start() @@ -173,15 +176,21 @@ def __client_connection(self, connection, address): frame_data = data[:msg_size] data = data[msg_size:] - frame = pickle.loads(frame_data, fix_imports=True, encoding="bytes") - frame = cv2.imdecode(frame, cv2.IMREAD_COLOR) - cv2.imshow(str(address), frame) + # Broadcast the frame to all connected clients except the sender + for client_conn in self.__connections: + if client_conn is not connection: + try: + client_conn.sendall(struct.pack('>L', msg_size) + frame_data) + except ConnectionResetError: + self.__connections.remove(client_conn) + print(f"Connection to {address[0]}:{address[1]} lost!") + client_conn.close() + if cv2.waitKey(1) == ord(self.__quit_key): connection.close() self.__used_slots -= 1 break - class StreamingClient: """ Abstract class for the generic streaming client. @@ -221,7 +230,7 @@ class StreamingClient: start_stream : starts the client stream in a new thread """ - def __init__(self, host, port): + def __init__(self, host, port, quit_key='q'): """ Creates a new instance of StreamingClient. @@ -237,7 +246,9 @@ def __init__(self, host, port): self.__port = port self._configure() self.__running = False + self.__quit_key = quit_key self.__client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + def _configure(self): """ @@ -262,6 +273,47 @@ def _cleanup(self): """ cv2.destroyAllWindows() + + def __receive_frames(self): + """ + Receives and displays the video frames. + """ + self.__client_socket.connect((self.__host, self.__port)) + while self.__running: + payload_size = struct.calcsize(">L") + data = b"" + + while self.__running: + while len(data) < payload_size: + received = self.__client_socket.recv(4096) + if received == b"": + break + data += received + + if len(data) < payload_size: + break + + packed_msg_size = data[:payload_size] + data = data[payload_size:] + + msg_size = struct.unpack(">L", packed_msg_size)[0] + + while len(data) < msg_size: + data += self.__client_socket.recv(4096) + + frame_data = data[:msg_size] + data = data[msg_size:] + + # Decode and display the frame + frame = pickle.loads(frame_data, fix_imports=True, encoding="bytes") + frame = cv2.imdecode(frame, cv2.IMREAD_COLOR) + + cv2.imshow("Video Stream", frame) + if cv2.waitKey(1) == ord(self.__quit_key): + break + + self.stop_stream() + def __client_streaming(self): """ Main method for streaming the client data. @@ -284,18 +336,30 @@ def __client_streaming(self): self._cleanup() - def start_stream(self): + + def start_stream(self, mode): """ Starts client stream if it is not already running. + Parameters + ---------- + mode : str + The mode of operation: 'send' or 'receive'. """ if self.__running: print("Client is already streaming!") else: self.__running = True - client_thread = threading.Thread(target=self.__client_streaming) + if mode.lower() == 'send': + client_thread = threading.Thread(target=self.__client_streaming) + elif mode == 'receive': + client_thread = threading.Thread(target=self.__receive_frames) + else: + raise ValueError("Invalid mode. Must be 'send' or 'receive'.") + client_thread.start() + def stop_stream(self): """ Stops client stream if running