Skip to content

Commit 4340aa9

Browse files
Implement Symmetric Send/Receive Architecture with Persistent Worker Threads (#35)
* feat: add support for bp_sendmsg flags Signed-off-by: Sylvain Pierrot <pierrot.sylvain14@gmail.com> * feat: implement symmetric send/receive architecture with persistent worker threads Signed-off-by: Sylvain Pierrot <pierrot.sylvain14@gmail.com> --------- Signed-off-by: Sylvain Pierrot <pierrot.sylvain14@gmail.com>
1 parent 6866a40 commit 4340aa9

16 files changed

Lines changed: 509 additions & 371 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ demo-app-bp-recv
5959
.vagrant
6060
sender
6161
receiver
62+
bp_client
6263
tests/sender
6364
tests/receiver
6465
daemon/bp_daemon

bp_client.c

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
#include "include/bp_socket.h"
2+
#include <errno.h>
3+
#include <pthread.h>
4+
#include <signal.h>
5+
#include <stdint.h>
6+
#include <stdio.h>
7+
#include <stdlib.h>
8+
#include <string.h>
9+
#include <sys/socket.h>
10+
#include <sys/types.h>
11+
#include <time.h>
12+
#include <unistd.h>
13+
14+
#define BUFFER_SIZE 1024
15+
#define AF_BP 28
16+
17+
volatile int running = 1;
18+
19+
struct client_data {
20+
int fd;
21+
struct sockaddr_bp dest_addr;
22+
struct sockaddr_bp src_addr;
23+
};
24+
25+
void handle_sigint(int sig) {
26+
printf("\nInterrupt received, shutting down...\n");
27+
running = 0;
28+
}
29+
30+
void *send_thread(void *arg) {
31+
struct client_data *data = (struct client_data *)arg;
32+
char send_buffer[BUFFER_SIZE];
33+
int message_count = 0;
34+
35+
printf("Send thread started\n");
36+
37+
while (running) {
38+
message_count++;
39+
snprintf(send_buffer, sizeof(send_buffer), "Hello from client! Message #%d",
40+
message_count);
41+
42+
int flags = 0;
43+
flags |= MSG_ACK_REQUESTED;
44+
45+
int ret =
46+
sendto(data->fd, send_buffer, strlen(send_buffer) + 1, flags,
47+
(struct sockaddr *)&data->dest_addr, sizeof(data->dest_addr));
48+
if (ret < 0) {
49+
if (errno == EAGAIN || errno == EWOULDBLOCK) {
50+
continue;
51+
}
52+
printf("sendto failed for ipn:%u.%u: %s\n",
53+
data->dest_addr.bp_addr.ipn.node_id,
54+
data->dest_addr.bp_addr.ipn.service_id, strerror(errno));
55+
break;
56+
}
57+
58+
printf("Message sent: %s\n", send_buffer);
59+
}
60+
61+
printf("Send thread exiting\n");
62+
return NULL;
63+
}
64+
65+
void *receive_thread(void *arg) {
66+
struct client_data *data = (struct client_data *)arg;
67+
struct msghdr msg;
68+
struct iovec iov;
69+
char buffer[BUFFER_SIZE];
70+
struct sockaddr_bp src_addr;
71+
72+
// Set up message structure for receiving
73+
iov.iov_base = buffer;
74+
iov.iov_len = sizeof(buffer);
75+
memset(&msg, 0, sizeof(msg));
76+
msg.msg_iov = &iov;
77+
msg.msg_iovlen = 1;
78+
memset(&src_addr, 0, sizeof(src_addr));
79+
msg.msg_name = &src_addr;
80+
msg.msg_namelen = sizeof(src_addr);
81+
82+
printf("Receive thread started\n");
83+
84+
while (running) {
85+
ssize_t n = recvmsg(data->fd, &msg, 0);
86+
if (n < 0) {
87+
if (errno == EINTR) {
88+
printf("\nInterrupted by signal, exiting...\n");
89+
break;
90+
}
91+
if (errno == EAGAIN || errno == EWOULDBLOCK) {
92+
// Timeout reached, continue loop to check running flag
93+
continue;
94+
}
95+
perror("recvmsg failed");
96+
break;
97+
}
98+
99+
printf("Received message (%zd bytes): %.*s\n", n, (int)n, buffer);
100+
if (msg.msg_namelen >= sizeof(struct sockaddr_bp)) {
101+
printf("Bundle sent by ipn:%u.%u\n", src_addr.bp_addr.ipn.node_id,
102+
src_addr.bp_addr.ipn.service_id);
103+
}
104+
105+
memset(&msg, 0, sizeof(msg));
106+
msg.msg_iov = &iov;
107+
msg.msg_iovlen = 1;
108+
memset(&src_addr, 0, sizeof(src_addr));
109+
msg.msg_name = &src_addr;
110+
msg.msg_namelen = sizeof(src_addr);
111+
}
112+
113+
printf("Receive thread exiting\n");
114+
return NULL;
115+
}
116+
117+
int main(int argc, char *argv[]) {
118+
struct sockaddr_bp dest_addr, src_addr;
119+
int fd;
120+
uint32_t node_id, service_id;
121+
int ret = 0;
122+
pthread_t send_tid, recv_tid;
123+
struct client_data data;
124+
125+
if (argc < 3) {
126+
printf("Usage: %s <node_id> <service_id>\n", argv[0]);
127+
return EXIT_FAILURE;
128+
}
129+
130+
signal(SIGINT, handle_sigint);
131+
132+
node_id = (uint32_t)atoi(argv[1]);
133+
service_id = (uint32_t)atoi(argv[2]);
134+
135+
if (service_id < 1 || service_id > 255) {
136+
fprintf(stderr, "Invalid service_id (must be in 1-255)\n");
137+
return EXIT_FAILURE;
138+
}
139+
140+
if (node_id == 0) {
141+
fprintf(stderr, "Invalid node_id (cannot be 0)\n");
142+
return EXIT_FAILURE;
143+
}
144+
145+
fd = socket(AF_BP, SOCK_DGRAM, 0);
146+
if (fd < 0) {
147+
perror("socket creation failed");
148+
return EXIT_FAILURE;
149+
}
150+
151+
src_addr.bp_family = AF_BP;
152+
src_addr.bp_scheme = BP_SCHEME_IPN;
153+
src_addr.bp_addr.ipn.node_id = 10;
154+
src_addr.bp_addr.ipn.service_id = 2;
155+
if (bind(fd, (struct sockaddr *)&src_addr, sizeof(src_addr)) == -1) {
156+
perror("Failed to bind socket");
157+
ret = EXIT_FAILURE;
158+
goto out;
159+
}
160+
161+
struct timeval timeout;
162+
timeout.tv_sec = 1; // 1 second timeout
163+
timeout.tv_usec = 0;
164+
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) {
165+
perror("Failed to set socket timeout");
166+
ret = EXIT_FAILURE;
167+
goto out;
168+
}
169+
170+
dest_addr.bp_family = AF_BP;
171+
dest_addr.bp_scheme = BP_SCHEME_IPN;
172+
dest_addr.bp_addr.ipn.node_id = node_id;
173+
dest_addr.bp_addr.ipn.service_id = service_id;
174+
175+
data.fd = fd;
176+
data.dest_addr = dest_addr;
177+
data.src_addr = src_addr;
178+
179+
printf("BP Client started - sending messages to ipn:%u.%u\n", node_id,
180+
service_id);
181+
printf("Press Ctrl+C to exit.\n");
182+
183+
if (pthread_create(&send_tid, NULL, send_thread, &data) != 0) {
184+
perror("Failed to create send thread");
185+
ret = EXIT_FAILURE;
186+
goto out;
187+
}
188+
189+
if (pthread_create(&recv_tid, NULL, receive_thread, &data) != 0) {
190+
perror("Failed to create receive thread");
191+
running = 0;
192+
pthread_join(send_tid, NULL);
193+
ret = EXIT_FAILURE;
194+
goto out;
195+
}
196+
197+
pthread_join(send_tid, NULL);
198+
pthread_join(recv_tid, NULL);
199+
200+
out:
201+
close(fd);
202+
printf("Socket closed.\n");
203+
return ret;
204+
}

bp_socket/af_bp.c

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size)
285285
}
286286

287287
if (size > 0) {
288-
payload = kmalloc(size, GFP_KERNEL);
288+
payload = kmalloc(size, GFP_ATOMIC);
289289
if (!payload) {
290290
pr_err("bp_sendmsg: failed to allocate memory\n");
291291
ret = -ENOMEM;
@@ -299,17 +299,14 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size)
299299
}
300300

301301
ret = send_bundle_doit(payload, size, dest_node_id,
302-
dest_service_id, bp->bp_node_id, bp->bp_service_id, 8443);
302+
dest_service_id, bp->bp_node_id, bp->bp_service_id,
303+
msg->msg_flags, 8443);
303304
if (ret < 0) {
304305
pr_err(
305306
"bp_sendmsg: send_bundle_doit failed (%d)\n", ret);
306307
goto err_free;
307308
}
308309

309-
pr_info("bp_sendmsg: bundle sent for endpoint ipn:%u.%u (size: "
310-
"%zu)\n",
311-
bp->bp_node_id, bp->bp_service_id, size);
312-
313310
kfree(payload);
314311
}
315312

bp_socket/bp_genl.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ static const struct nla_policy nla_policy[BP_GENL_A_MAX + 1] = {
1212
[BP_GENL_A_DEST_SERVICE_ID] = { .type = NLA_U32 },
1313
[BP_GENL_A_PAYLOAD] = { .type = NLA_BINARY },
1414
[BP_GENL_A_ADU] = { .type = NLA_U64 },
15+
[BP_GENL_A_FLAGS] = { .type = NLA_U32 },
1516
};
1617

1718
static struct genl_ops genl_ops[] = { {
@@ -41,14 +42,14 @@ struct genl_family genl_fam = {
4142

4243
int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id,
4344
u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id,
44-
int port_id)
45+
u_int32_t flags, int port_id)
4546
{
4647
void* msg_head;
4748
struct sk_buff* msg;
4849
size_t msg_size;
4950
int ret;
5051

51-
msg_size = 4 * nla_total_size(sizeof(u_int32_t))
52+
msg_size = 5 * nla_total_size(sizeof(u_int32_t))
5253
+ nla_total_size(payload_size);
5354
msg = genlmsg_new(msg_size, GFP_KERNEL);
5455
if (!msg) {
@@ -104,6 +105,13 @@ int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id,
104105
goto err_cancel;
105106
}
106107

108+
ret = nla_put_u32(msg, BP_GENL_A_FLAGS, flags);
109+
if (ret) {
110+
pr_err(
111+
"send_bundle: failed to put BP_GENL_A_FLAGS (%d)\n", ret);
112+
goto err_cancel;
113+
}
114+
107115
genlmsg_end(msg, msg_head);
108116
return genlmsg_unicast(&init_net, msg, port_id);
109117

bp_socket/bp_genl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ int open_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id);
99
int close_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id);
1010
int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id,
1111
u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id,
12-
int port_id);
12+
u_int32_t flags, int port_id);
1313
int enqueue_bundle_doit(struct sk_buff* skb, struct genl_info* info);
1414
int destroy_bundle_doit(uint64_t adu, int port_id);
1515

daemon/bp_genl_handlers.c

Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,15 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) {
6767
u_int32_t dest_node_id, dest_service_id, src_node_id, src_service_id;
6868
char dest_eid[64];
6969
int written;
70-
pthread_t thread;
71-
struct ion_send_args *args;
72-
struct endpoint_ctx *ctx;
73-
void *payload_copy;
70+
u_int32_t flags;
71+
int ret;
7472

7573
if (!attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_DEST_NODE_ID] ||
7674
!attrs[BP_GENL_A_DEST_SERVICE_ID] || !attrs[BP_GENL_A_SRC_NODE_ID] ||
77-
!attrs[BP_GENL_A_SRC_SERVICE_ID]) {
75+
!attrs[BP_GENL_A_SRC_SERVICE_ID] || !attrs[BP_GENL_A_FLAGS]) {
7876
log_error(
7977
"handle_send_bundle: missing attribute(s) in SEND_BUNDLE command (payload, node ID, "
80-
"service ID)");
78+
"service ID, flags)");
8179
return -EINVAL;
8280
}
8381

@@ -87,6 +85,7 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) {
8785
dest_service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]);
8886
src_node_id = nla_get_u32(attrs[BP_GENL_A_SRC_NODE_ID]);
8987
src_service_id = nla_get_u32(attrs[BP_GENL_A_SRC_SERVICE_ID]);
88+
flags = nla_get_u32(attrs[BP_GENL_A_FLAGS]);
9089

9190
written = snprintf(dest_eid, sizeof(dest_eid), "ipn:%u.%u", dest_node_id, dest_service_id);
9291
if (written < 0 || written >= (int)sizeof(dest_eid)) {
@@ -95,42 +94,16 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) {
9594
return -EINVAL;
9695
}
9796

98-
ctx = endpoint_registry_get(src_node_id, src_service_id);
99-
if (!ctx) {
100-
log_error("[ipn:%u.%u] handle_send_bundle: no endpoint for ipn:%u.%u", src_node_id,
101-
src_service_id, src_node_id, src_service_id);
102-
return -ENODEV;
97+
ret = endpoint_registry_enqueue_send(src_node_id, src_service_id, dest_eid, payload,
98+
payload_size, flags);
99+
if (ret < 0) {
100+
log_error("[ipn:%u.%u] handle_send_bundle: failed to enqueue send (error: %d)", src_node_id,
101+
src_service_id, ret);
102+
return ret;
103103
}
104104

105-
payload_copy = malloc(payload_size);
106-
if (!payload_copy) {
107-
log_error("[ipn:%u.%u] handle_send_bundle: failed to allocate payload", src_node_id,
108-
src_service_id);
109-
return -ENOMEM;
110-
}
111-
memcpy(payload_copy, payload, payload_size);
112-
113-
// Enqueue to send thread using source endpoint SAP
114-
// Launch async send thread
115-
args = malloc(sizeof(struct ion_send_args));
116-
if (!args) return -ENOMEM;
117-
args->node_id = src_node_id;
118-
args->service_id = src_service_id;
119-
args->dest_eid = strndup(dest_eid, sizeof(dest_eid));
120-
args->netlink_sock = daemon->genl_bp_sock;
121-
args->netlink_mutex = &daemon->netlink_mutex;
122-
args->netlink_family = daemon->genl_bp_family_id;
123-
args->payload = payload_copy;
124-
args->payload_size = payload_size;
125-
126-
if (pthread_create(&thread, NULL, ion_send_thread, args) != 0) {
127-
log_error("[ipn:%u.%u] handle_send_bundle: failed to create send thread", src_node_id,
128-
src_service_id);
129-
free(args->dest_eid);
130-
free(args->payload);
131-
free(args);
132-
return -errno;
133-
}
105+
log_info("[ipn:%u.%u] SEND_BUNDLE: bundle queued for sending to EID %s, size %zu (bytes)",
106+
src_node_id, src_service_id, dest_eid, payload_size);
134107

135108
return 0;
136109
}

0 commit comments

Comments
 (0)