Skip to content

Commit c2d3d49

Browse files
committed
transport: Fix partial write handling for large messages
The sender was ignoring the return value from sendmsg/send, which indicates how many bytes were actually written. For messages larger than the kernel socket buffer (~200KB), this caused message truncation. Fix by looping until all bytes are sent: - Track bytes_sent offset and continue from where we left off - Send FDs only with the first sendmsg call (they're queued by receiver) - Use regular send() for subsequent chunks Also update README.md to clarify transmission rules: - FDs MUST arrive before/with final message bytes Add tests for 1MB messages with and without file descriptors. Signed-off-by: Colin Walters <walters@verbum.org>
1 parent 6e92db4 commit c2d3d49

3 files changed

Lines changed: 201 additions & 36 deletions

File tree

README.md

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,11 @@ JSON is a self-delimiting format—a compliant parser can determine where one JS
2828
* Each message MUST be a complete, valid JSON object.
2929
* Whitespace between messages is permitted but not required.
3030

31-
### 2.3. Transmission Rule
31+
### 2.3. Transmission Rules
3232

33-
To unambiguously associate file descriptors with their corresponding message, a sending party **MUST** adhere to the following rule:
33+
To ensure file descriptors are correctly associated with their corresponding messages, a sending party MUST adhere to the following rules:
3434

35-
**A single sendmsg(2) system call MUST contain exactly one and only one complete JSON-RPC message.**
36-
37-
File descriptors intended for that message MUST be included as ancillary data in that same sendmsg() call. A sendmsg() call that includes file descriptors MUST also contain a complete JSON-RPC message.
38-
39-
* **Rationale:** This strict 1:1 mapping between a sendmsg() call, a single JSON-RPC message, and its associated file descriptors is the core of the protocol. It leverages the kernel's guarantee that data and ancillary data from a single sendmsg call are delivered atomically to the underlying transport. This allows the receiver to reliably associate FDs with messages even if multiple messages are coalesced in the stream.
35+
1. **File Descriptor Ordering:** All file descriptors referenced by a message MUST be sent (via ancillary data) before or with the final bytes of that message. The receiver dequeues FDs in order as complete messages are parsed; if the required FDs have not yet arrived, the connection is terminated with a Mismatched Count error.
4036

4137
## 3. Message Format
4238

@@ -138,4 +134,4 @@ The security considerations are identical to those for other Unix domain socket
138134

139135
* **Socket Permissions:** Filesystem permissions on the socket file are the primary access control mechanism.
140136
* **Trust Boundary:** The communicating processes must have a degree of mutual trust, as passing a file descriptor is a grant of capability.
141-
* **Resource Management:** The receiving process is responsible for closing all file descriptors it receives to prevent resource leaks. If a connection is terminated due to a protocol error, the receiver MUST ensure that any FDs remaining in its queue are closed.
137+
* **Resource Management:** The receiving process is responsible for closing all file descriptors it receives to prevent resource leaks. If a connection is terminated due to a protocol error, the receiver MUST ensure that any FDs remaining in its queue are closed.

src/transport.rs

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -75,36 +75,66 @@ impl Sender {
7575

7676
let fds = message_with_fds.file_descriptors;
7777

78-
self.stream
79-
.async_io(Interest::WRITABLE, || {
80-
let sockfd = self.stream.as_fd();
81-
82-
if fds.is_empty() {
83-
// No file descriptors to send - use regular send
84-
rustix::net::send(sockfd, &data, SendFlags::empty())
85-
.map_err(|e| to_io_error(e, "send"))?;
86-
} else {
87-
// Convert OwnedFd to BorrowedFd for sending
88-
let borrowed_fds: Vec<_> = fds.iter().map(|fd| fd.as_fd()).collect();
89-
90-
let mut buffer: Vec<u8> =
91-
vec![0u8; rustix::cmsg_space!(ScmRights(MAX_FDS_PER_MESSAGE))];
92-
let mut control = SendAncillaryBuffer::new(buffer.as_mut_slice());
93-
94-
if !control.push(SendAncillaryMessage::ScmRights(&borrowed_fds)) {
95-
return Err(io::Error::other(
96-
"Failed to add file descriptors to control message",
97-
));
78+
// Track how many bytes we've sent so far
79+
let mut bytes_sent = 0usize;
80+
// FDs are only sent with the first sendmsg call
81+
let mut fds_sent = false;
82+
83+
while bytes_sent < data.len() {
84+
let remaining = &data[bytes_sent..];
85+
86+
let sent = self
87+
.stream
88+
.async_io(Interest::WRITABLE, || {
89+
let sockfd = self.stream.as_fd();
90+
91+
if !fds_sent && !fds.is_empty() {
92+
// First chunk with FDs: use sendmsg with ancillary data
93+
let borrowed_fds: Vec<_> = fds.iter().map(|fd| fd.as_fd()).collect();
94+
95+
let mut buffer: Vec<u8> =
96+
vec![0u8; rustix::cmsg_space!(ScmRights(MAX_FDS_PER_MESSAGE))];
97+
let mut control = SendAncillaryBuffer::new(buffer.as_mut_slice());
98+
99+
if !control.push(SendAncillaryMessage::ScmRights(&borrowed_fds)) {
100+
return Err(io::Error::other(
101+
"Failed to add file descriptors to control message",
102+
));
103+
}
104+
105+
let iov = [IoSlice::new(remaining)];
106+
let sent =
107+
rustix::net::sendmsg(sockfd, &iov, &mut control, SendFlags::empty())
108+
.map_err(|e| to_io_error(e, "sendmsg"))?;
109+
110+
Ok(sent)
111+
} else {
112+
// No FDs or FDs already sent: use regular send
113+
let sent = rustix::net::send(sockfd, remaining, SendFlags::empty())
114+
.map_err(|e| to_io_error(e, "send"))?;
115+
Ok(sent)
98116
}
117+
})
118+
.await
119+
.map_err(Error::Io)?;
99120

100-
let iov = [IoSlice::new(&data)];
101-
rustix::net::sendmsg(sockfd, &iov, &mut control, SendFlags::empty())
102-
.map_err(|e| to_io_error(e, "sendmsg"))?;
103-
}
104-
Ok(())
105-
})
106-
.await
107-
.map_err(Error::Io)
121+
bytes_sent += sent;
122+
123+
// Mark FDs as sent after first successful sendmsg
124+
if !fds_sent && !fds.is_empty() {
125+
fds_sent = true;
126+
trace!("Sent {} FDs with first chunk ({} bytes)", fds.len(), sent);
127+
}
128+
129+
trace!(
130+
"Sent {}/{} bytes (this chunk: {})",
131+
bytes_sent,
132+
data.len(),
133+
sent
134+
);
135+
}
136+
137+
Ok(())
108138
}
109139
}
110140

tests/integration_tests.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1885,3 +1885,142 @@ async fn test_sender_pretty_mode() -> Result<()> {
18851885

18861886
Ok(())
18871887
}
1888+
1889+
/// Test that large messages exceeding kernel buffer size are sent correctly.
1890+
///
1891+
/// This reproduces a bug where partial writes from sendmsg() were not handled,
1892+
/// causing large messages to be truncated.
1893+
#[tokio::test]
1894+
async fn test_large_message_exceeds_kernel_buffer() -> Result<()> {
1895+
// Create a large payload that will exceed the typical kernel socket buffer
1896+
// (usually ~200KB). We'll use 1MB to be safe.
1897+
let large_data = "x".repeat(1024 * 1024);
1898+
1899+
let (client_stream, server_stream) = tokio::net::UnixStream::pair().unwrap();
1900+
1901+
let expected_data = large_data.clone();
1902+
let server_handle = tokio::spawn(async move {
1903+
let transport = UnixSocketTransport::new(server_stream).unwrap();
1904+
let (_sender, mut receiver) = transport.split();
1905+
1906+
let message_with_fds = receiver.receive().await?;
1907+
let message = message_with_fds.message;
1908+
1909+
// Verify we received the complete message
1910+
if let JsonRpcMessage::Request(req) = message {
1911+
let params = req.params.unwrap();
1912+
let received_data = params["data"].as_str().unwrap();
1913+
assert_eq!(
1914+
received_data.len(),
1915+
expected_data.len(),
1916+
"Message was truncated! Expected {} bytes, got {} bytes",
1917+
expected_data.len(),
1918+
received_data.len()
1919+
);
1920+
assert_eq!(received_data, expected_data);
1921+
} else {
1922+
panic!("Expected request message");
1923+
}
1924+
1925+
Ok::<(), jsonrpc_fdpass::Error>(())
1926+
});
1927+
1928+
let transport = UnixSocketTransport::new(client_stream).unwrap();
1929+
let (mut sender, _receiver) = transport.split();
1930+
1931+
let request = JsonRpcRequest::new(
1932+
"large_data".to_string(),
1933+
Some(serde_json::json!({
1934+
"data": large_data
1935+
})),
1936+
Value::Number(1.into()),
1937+
);
1938+
let message = JsonRpcMessage::Request(request);
1939+
let message_with_fds = MessageWithFds::new(message, vec![]);
1940+
1941+
sender.send(message_with_fds).await?;
1942+
1943+
// Wait for server to process and verify
1944+
server_handle.await.unwrap()?;
1945+
1946+
Ok(())
1947+
}
1948+
1949+
/// Test that large messages with file descriptors work correctly.
1950+
///
1951+
/// This tests the case where FDs must be sent with the first chunk,
1952+
/// and remaining data sent in subsequent chunks.
1953+
#[tokio::test]
1954+
async fn test_large_message_with_fd() -> Result<()> {
1955+
// Create a large payload
1956+
let large_data = "y".repeat(1024 * 1024);
1957+
1958+
// Create a temp file to pass
1959+
let mut temp_file = NamedTempFile::new().unwrap();
1960+
write!(temp_file, "FD test content").unwrap();
1961+
temp_file.flush().unwrap();
1962+
temp_file.seek(SeekFrom::Start(0)).unwrap();
1963+
let fd: OwnedFd = temp_file.into_file().into();
1964+
1965+
let (client_stream, server_stream) = tokio::net::UnixStream::pair().unwrap();
1966+
1967+
let expected_data = large_data.clone();
1968+
let server_handle = tokio::spawn(async move {
1969+
let transport = UnixSocketTransport::new(server_stream).unwrap();
1970+
let (_sender, mut receiver) = transport.split();
1971+
1972+
let message_with_fds = receiver.receive().await?;
1973+
let message = message_with_fds.message;
1974+
let fds = message_with_fds.file_descriptors;
1975+
1976+
// Verify we received the FD
1977+
assert_eq!(fds.len(), 1, "Expected 1 file descriptor");
1978+
1979+
// Verify we can read from the FD
1980+
let mut file = File::from(fds.into_iter().next().unwrap());
1981+
let mut contents = String::new();
1982+
file.read_to_string(&mut contents).unwrap();
1983+
assert_eq!(contents, "FD test content");
1984+
1985+
// Verify we received the complete message
1986+
if let JsonRpcMessage::Request(req) = message {
1987+
let params = req.params.unwrap();
1988+
let received_data = params["data"].as_str().unwrap();
1989+
assert_eq!(
1990+
received_data.len(),
1991+
expected_data.len(),
1992+
"Message was truncated! Expected {} bytes, got {} bytes",
1993+
expected_data.len(),
1994+
received_data.len()
1995+
);
1996+
} else {
1997+
panic!("Expected request message");
1998+
}
1999+
2000+
Ok::<(), jsonrpc_fdpass::Error>(())
2001+
});
2002+
2003+
let transport = UnixSocketTransport::new(client_stream).unwrap();
2004+
let (mut sender, _receiver) = transport.split();
2005+
2006+
let request = JsonRpcRequest::new(
2007+
"large_data_with_fd".to_string(),
2008+
Some(serde_json::json!({
2009+
"data": large_data,
2010+
"file": {
2011+
"__jsonrpc_fd__": true,
2012+
"index": 0
2013+
}
2014+
})),
2015+
Value::Number(1.into()),
2016+
);
2017+
let message = JsonRpcMessage::Request(request);
2018+
let message_with_fds = MessageWithFds::new(message, vec![fd]);
2019+
2020+
sender.send(message_with_fds).await?;
2021+
2022+
// Wait for server to process and verify
2023+
server_handle.await.unwrap()?;
2024+
2025+
Ok(())
2026+
}

0 commit comments

Comments
 (0)