Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,20 @@ client2.start_stream()
client3.start_stream()
```

# Vidstream-Updated-
### Updating Vidstream package
- Making server to handle multiple clients connection
- Server broadcast received frame to all connected clients except the one who send the frame
- The video to be displayed to the client and not server

#### Example

![Server Image](./img/server.png)


![Client 1 Image](./img/client1.png)


![Client 2 Image](./img/client2.png)

Check out: https://www.youtube.com/c/NeuralNine
Binary file added img/client1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/client2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/server.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
78 changes: 71 additions & 7 deletions vidstream/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, host, port, slots=8, quit_key='q'):
quit_key : chr
key that has to be pressed to close connection (default = 'q')
"""
self.__connections = [] # list to store client connections
self.__host = host
self.__port = port
self.__slots = slots
Expand Down Expand Up @@ -120,6 +121,8 @@ def __server_listening(self):
continue
else:
self.__used_slots += 1
self.__connections.append(connection)
print(f"New connection from {address[0]}:{address[1]}")
self.__block.release()
thread = threading.Thread(target=self.__client_connection, args=(connection, address,))
thread.start()
Expand Down Expand Up @@ -173,15 +176,21 @@ def __client_connection(self, connection, address):
frame_data = data[:msg_size]
data = data[msg_size:]

frame = pickle.loads(frame_data, fix_imports=True, encoding="bytes")
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
cv2.imshow(str(address), frame)
# Broadcast the frame to all connected clients except the sender
for client_conn in self.__connections:
if client_conn is not connection:
try:
client_conn.sendall(struct.pack('>L', msg_size) + frame_data)
except ConnectionResetError:
self.__connections.remove(client_conn)
print(f"Connection to {address[0]}:{address[1]} lost!")
client_conn.close()

if cv2.waitKey(1) == ord(self.__quit_key):
connection.close()
self.__used_slots -= 1
break


class StreamingClient:
"""
Abstract class for the generic streaming client.
Expand Down Expand Up @@ -221,7 +230,7 @@ class StreamingClient:
start_stream : starts the client stream in a new thread
"""

def __init__(self, host, port):
def __init__(self, host, port, quit_key='q'):
"""
Creates a new instance of StreamingClient.

Expand All @@ -237,7 +246,9 @@ def __init__(self, host, port):
self.__port = port
self._configure()
self.__running = False
self.__quit_key = quit_key
self.__client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


def _configure(self):
"""
Expand All @@ -262,6 +273,47 @@ def _cleanup(self):
"""
cv2.destroyAllWindows()


def __receive_frames(self):
"""
Receives and displays the video frames.
"""
self.__client_socket.connect((self.__host, self.__port))
while self.__running:
payload_size = struct.calcsize(">L")
data = b""

while self.__running:
while len(data) < payload_size:
received = self.__client_socket.recv(4096)
if received == b"":
break
data += received

if len(data) < payload_size:
break

packed_msg_size = data[:payload_size]
data = data[payload_size:]

msg_size = struct.unpack(">L", packed_msg_size)[0]

while len(data) < msg_size:
data += self.__client_socket.recv(4096)

frame_data = data[:msg_size]
data = data[msg_size:]

# Decode and display the frame
frame = pickle.loads(frame_data, fix_imports=True, encoding="bytes")
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)

cv2.imshow("Video Stream", frame)
if cv2.waitKey(1) == ord(self.__quit_key):
break

self.stop_stream()

def __client_streaming(self):
"""
Main method for streaming the client data.
Expand All @@ -284,18 +336,30 @@ def __client_streaming(self):

self._cleanup()

def start_stream(self):

def start_stream(self, mode):
"""
Starts client stream if it is not already running.
Parameters
----------
mode : str
The mode of operation: 'send' or 'receive'.
"""

if self.__running:
print("Client is already streaming!")
else:
self.__running = True
client_thread = threading.Thread(target=self.__client_streaming)
if mode.lower() == 'send':
client_thread = threading.Thread(target=self.__client_streaming)
elif mode == 'receive':
client_thread = threading.Thread(target=self.__receive_frames)
else:
raise ValueError("Invalid mode. Must be 'send' or 'receive'.")

client_thread.start()


def stop_stream(self):
"""
Stops client stream if running
Expand Down