Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "ndjson-rpc-fdpass"
name = "jsonrpc-fdpass"
version = "0.1.0"
edition = "2021"
description = "NDJSON JSON-RPC 2.0 with File Descriptor Passing implementation"
description = "JSON-RPC 2.0 with Unix file descriptor passing"
authors = ["Colin Walters <walters@verbum.org>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/cgwalters/spec-json-rpc-fdpass"
Expand All @@ -15,5 +15,8 @@ thiserror = "1.0"
tokio = { version = "1.40", features = ["full"] }
tracing = "0.1"
tracing-subscriber = "0.3"
jsonrpsee = { version = "0.24", features = ["server", "client-core", "async-client"], default-features = false }

[dev-dependencies]
tempfile = "3.0"
jsonrpsee = { version = "0.24", features = ["server", "client-core"], default-features = false }
async-trait = "0.1"
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# A Specification for NDJSON JSON-RPC 2.0 with File Descriptor Passing (NDJSON-RPC-FD)
# JSON-RPC 2.0 with Unix File Descriptor Passing

This repository contains both a protocol specification and a Rust implementation
(`jsonrpc-fdpass` crate) for JSON-RPC 2.0 with file descriptor passing over Unix
domain sockets.

## 1. Overview

Expand Down
11 changes: 5 additions & 6 deletions src/main.rs → examples/demo.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ndjson_rpc_fdpass::{Client, Result, Server};
use jsonrpc_fdpass::{Client, Result, Server};
use serde_json::Value;
use std::fs::File;
use std::io::{Read, Write};
Expand Down Expand Up @@ -33,7 +33,7 @@ async fn run_server(listener: UnixListener) -> Result<()> {
// Register a method that reads from a file descriptor
server.register_method("read_file", |_method, _params, fds| {
if fds.is_empty() {
return Err(ndjson_rpc_fdpass::Error::InvalidMessage(
return Err(jsonrpc_fdpass::Error::InvalidMessage(
"Expected file descriptor".to_string(),
));
}
Expand All @@ -43,7 +43,7 @@ async fn run_server(listener: UnixListener) -> Result<()> {
let mut contents = String::new();

file.read_to_string(&mut contents)
.map_err(|e| ndjson_rpc_fdpass::Error::Io(e))?;
.map_err(jsonrpc_fdpass::Error::Io)?;

info!("Server read from file: {}", contents.trim());
Ok((Some(Value::String(contents)), Vec::new()))
Expand All @@ -63,7 +63,7 @@ async fn run_server(listener: UnixListener) -> Result<()> {

// Accept one connection and handle it
if let Ok((stream, _)) = listener.accept().await {
let transport = ndjson_rpc_fdpass::UnixSocketTransport::new(stream);
let transport = jsonrpc_fdpass::UnixSocketTransport::new(stream)?;
let (mut sender, mut receiver) = transport.split();

// Handle messages from this connection
Expand All @@ -85,8 +85,7 @@ async fn run_client(socket_path: PathBuf) -> Result<()> {
let mut client = Client::connect(&socket_path).await?;

// Create a temporary file to send to the server
let mut temp_file =
tempfile::NamedTempFile::new().map_err(|e| ndjson_rpc_fdpass::Error::Io(e))?;
let mut temp_file = tempfile::NamedTempFile::new().map_err(jsonrpc_fdpass::Error::Io)?;

write!(temp_file, "Hello from client file!").unwrap();
temp_file.flush().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct Client {
impl Client {
pub async fn connect<P: AsRef<Path>>(path: P) -> Result<Self> {
let stream = UnixStream::connect(path).await?;
let transport = UnixSocketTransport::new(stream);
let transport = UnixSocketTransport::new(stream)?;
let (sender, _receiver) = transport.split();

Ok(Self { sender, next_id: 1 })
Expand Down
28 changes: 15 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! # NDJSON JSON-RPC 2.0 with File Descriptor Passing
//! # JSON-RPC 2.0 with Unix File Descriptor Passing
//!
//! This crate provides an implementation of the NDJSON JSON-RPC 2.0 with File Descriptor Passing
//! specification. It enables reliable inter-process communication (IPC) over Unix domain sockets
//! with the ability to pass file descriptors alongside JSON-RPC messages.
//! This crate provides an implementation of JSON-RPC 2.0 with file descriptor passing over Unix
//! domain sockets. It enables reliable inter-process communication (IPC) with the ability to
//! pass file descriptors alongside JSON-RPC messages.
//!
//! ## Features
//!
//! - **JSON-RPC 2.0 compliance**: Full support for requests, responses, and notifications
//! - **File descriptor passing**: Pass file descriptors using Unix socket ancillary data
//! - **NDJSON framing**: Newline-delimited JSON for reliable message boundaries
//! - **Streaming JSON parsing**: Self-delimiting JSON messages without newline requirements
//! - **Async support**: Built on tokio for high-performance async I/O
//! - **Type-safe**: Rust's type system ensures correct message handling
//!
Expand All @@ -17,7 +17,7 @@
//! ### Server Example
//!
//! ```rust,no_run
//! use ndjson_rpc_fdpass::{Server, Result};
//! use jsonrpc_fdpass::{Server, Result};
//! use std::fs::File;
//! use serde_json::Value;
//!
Expand All @@ -33,7 +33,7 @@
//! file.read_to_string(&mut contents).unwrap();
//! Ok((Some(Value::String(contents)), Vec::new()))
//! } else {
//! Err(ndjson_rpc_fdpass::Error::InvalidMessage("No FD provided".into()))
//! Err(jsonrpc_fdpass::Error::InvalidMessage("No FD provided".into()))
//! }
//! });
//!
Expand All @@ -44,7 +44,7 @@
//! ### Client Example
//!
//! ```rust,no_run
//! use ndjson_rpc_fdpass::{Client, Result};
//! use jsonrpc_fdpass::{Client, Result};
//! use std::fs::File;
//! use std::os::unix::io::OwnedFd;
//! use serde_json::json;
Expand All @@ -71,13 +71,15 @@
//!
//! ## Protocol Details
//!
//! This implementation follows the NDJSON JSON-RPC with File Descriptor Passing specification:
//! This implementation is a minimal extension to JSON-RPC 2.0 that adds file descriptor
//! passing over Unix domain sockets:
//!
//! - Uses Unix domain sockets (SOCK_STREAM)
//! - Messages are framed using newline-delimited JSON (NDJSON)
//! - File descriptors are passed using ancillary data via sendmsg(2)/recvmsg(2)
//! - Each sendmsg() call contains exactly one complete NDJSON message
//! - File descriptors are represented in JSON using placeholder objects
//! - Standard JSON-RPC 2.0 message format with no additional framing requirements
//! - JSON objects are self-delimiting; no newline or length-prefix framing is required
//! - File descriptors are passed as ancillary data via sendmsg(2)/recvmsg(2)
//! - Each sendmsg() call contains exactly one complete JSON-RPC message
//! - File descriptors are represented in JSON using placeholder objects (see below)
//!
//! ### File Descriptor Placeholders
//!
Expand Down
96 changes: 60 additions & 36 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,41 @@ use jsonrpsee::types::error::ErrorObject as JsonRpcError;
use serde::{Deserialize, Serialize};
use std::os::unix::io::OwnedFd;

/// The JSON key used to identify file descriptor placeholders.
pub const FD_PLACEHOLDER_KEY: &str = "__jsonrpc_fd__";
/// The JSON key for the file descriptor index within a placeholder.
pub const FD_INDEX_KEY: &str = "index";
/// The JSON-RPC protocol version.
pub const JSONRPC_VERSION: &str = "2.0";

/// Count file descriptor placeholders in a JSON value.
pub fn count_fd_placeholders(value: &serde_json::Value) -> usize {
fn count_inner(value: &serde_json::Value, count: &mut usize) {
match value {
serde_json::Value::Object(map) => {
if let (Some(serde_json::Value::Bool(true)), Some(_)) =
(map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY))
{
*count += 1;
} else {
for v in map.values() {
count_inner(v, count);
}
}
}
serde_json::Value::Array(arr) => {
for v in arr {
count_inner(v, count);
}
}
_ => {}
}
}
let mut count = 0;
count_inner(value, &mut count);
count
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileDescriptorPlaceholder {
#[serde(rename = "__jsonrpc_fd__")]
Expand Down Expand Up @@ -55,7 +90,7 @@ pub enum JsonRpcMessage {
impl JsonRpcRequest {
pub fn new(method: String, params: Option<serde_json::Value>, id: serde_json::Value) -> Self {
Self {
jsonrpc: "2.0".to_string(),
jsonrpc: JSONRPC_VERSION.to_string(),
method,
params,
id,
Expand All @@ -66,7 +101,7 @@ impl JsonRpcRequest {
impl JsonRpcResponse {
pub fn success(result: serde_json::Value, id: serde_json::Value) -> Self {
Self {
jsonrpc: "2.0".to_string(),
jsonrpc: JSONRPC_VERSION.to_string(),
result: Some(result),
error: None,
id,
Expand All @@ -75,7 +110,7 @@ impl JsonRpcResponse {

pub fn error(error: JsonRpcError<'static>, id: serde_json::Value) -> Self {
Self {
jsonrpc: "2.0".to_string(),
jsonrpc: JSONRPC_VERSION.to_string(),
result: None,
error: Some(error),
id,
Expand All @@ -86,7 +121,7 @@ impl JsonRpcResponse {
impl JsonRpcNotification {
pub fn new(method: String, params: Option<serde_json::Value>) -> Self {
Self {
jsonrpc: "2.0".to_string(),
jsonrpc: JSONRPC_VERSION.to_string(),
method,
params,
}
Expand Down Expand Up @@ -137,18 +172,30 @@ impl MessageWithFds {
}

pub fn serialize_with_placeholders(&self) -> Result<String> {
self.serialize_with_placeholders_impl(false)
}

pub fn serialize_with_placeholders_pretty(&self) -> Result<String> {
self.serialize_with_placeholders_impl(true)
}

fn serialize_with_placeholders_impl(&self, pretty: bool) -> Result<String> {
let mut message_json = self.message.to_json_value()?;
self.insert_placeholders(&mut message_json)?;

let json_str = serde_json::to_string(&message_json)?;
Ok(format!("{}\n", json_str))
let json_str = if pretty {
serde_json::to_string_pretty(&message_json)?
} else {
serde_json::to_string(&message_json)?
};
Ok(json_str)
}

fn insert_placeholders(&self, value: &mut serde_json::Value) -> Result<()> {
let fd_count = self.file_descriptors.len();
let mut placeholder_indices = Vec::new();

self.collect_placeholder_indices(value, &mut placeholder_indices);
Self::collect_placeholder_indices(value, &mut placeholder_indices);

if placeholder_indices.len() != fd_count {
return Err(Error::MismatchedCount {
Expand All @@ -166,26 +213,26 @@ impl MessageWithFds {
Ok(())
}

fn collect_placeholder_indices(&self, value: &serde_json::Value, indices: &mut Vec<usize>) {
fn collect_placeholder_indices(value: &serde_json::Value, indices: &mut Vec<usize>) {
match value {
serde_json::Value::Object(map) => {
if let (
Some(serde_json::Value::Bool(true)),
Some(serde_json::Value::Number(index)),
) = (map.get("__jsonrpc_fd__"), map.get("index"))
) = (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY))
{
if let Some(index) = index.as_u64() {
indices.push(index as usize);
}
} else {
for v in map.values() {
self.collect_placeholder_indices(v, indices);
Self::collect_placeholder_indices(v, indices);
}
}
}
serde_json::Value::Array(arr) => {
for v in arr {
self.collect_placeholder_indices(v, indices);
Self::collect_placeholder_indices(v, indices);
}
}
_ => {}
Expand All @@ -195,8 +242,7 @@ impl MessageWithFds {
pub fn from_json_with_fds(json_str: &str, fds: Vec<OwnedFd>) -> Result<Self> {
let message_json: serde_json::Value = serde_json::from_str(json_str)?;

let mut placeholder_count = 0;
Self::count_placeholders(&message_json, &mut placeholder_count);
let placeholder_count = count_fd_placeholders(&message_json);

if placeholder_count != fds.len() {
return Err(Error::MismatchedCount {
Expand All @@ -215,28 +261,6 @@ impl MessageWithFds {
Ok(Self::new(message, fds))
}

fn count_placeholders(value: &serde_json::Value, count: &mut usize) {
match value {
serde_json::Value::Object(map) => {
if let (Some(serde_json::Value::Bool(true)), Some(_)) =
(map.get("__jsonrpc_fd__"), map.get("index"))
{
*count += 1;
} else {
for v in map.values() {
Self::count_placeholders(v, count);
}
}
}
serde_json::Value::Array(arr) => {
for v in arr {
Self::count_placeholders(v, count);
}
}
_ => {}
}
}

fn validate_placeholder_indices(
value: &serde_json::Value,
expected_count: usize,
Expand All @@ -260,7 +284,7 @@ impl MessageWithFds {
if let (
Some(serde_json::Value::Bool(true)),
Some(serde_json::Value::Number(index)),
) = (map.get("__jsonrpc_fd__"), map.get("index"))
) = (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY))
{
if let Some(index) = index.as_u64() {
indices.push(index as usize);
Expand Down
2 changes: 1 addition & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Server {
}

async fn handle_connection(&self, stream: UnixStream) -> Result<()> {
let transport = UnixSocketTransport::new(stream);
let transport = UnixSocketTransport::new(stream)?;
let (mut sender, mut receiver) = transport.split();

debug!("New connection established");
Expand Down
Loading