From c00e86a7335b222365fc744390e1057b22248ba1 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sat, 6 Dec 2025 16:04:58 -0700 Subject: [PATCH 01/24] Start ChromaDBService impl --- Cargo.lock | 30 +++++++ rholang/Cargo.toml | 2 + .../src/rust/interpreter/chromadb_service.rs | 89 +++++++++++++++++++ rholang/src/rust/interpreter/errors.rs | 3 + rholang/src/rust/interpreter/mod.rs | 1 + 5 files changed, 125 insertions(+) create mode 100644 rholang/src/rust/interpreter/chromadb_service.rs diff --git a/Cargo.lock b/Cargo.lock index 55dca876d..c73e899f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -668,6 +668,21 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chromadb" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6a1f527377f3d6e7c775ab985417e207a027663e15fd8555a8eb58ec8a62b53" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.22.1", + "minreq", + "reqwest 0.11.27", + "serde", + "serde_json", +] + [[package]] name = "chrono" version = "0.4.42" @@ -2604,6 +2619,19 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "minreq" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05015102dad0f7d61691ca347e9d9d9006685a64aefb3d79eecf62665de2153d" +dependencies = [ + "rustls 0.21.12", + "rustls-webpki 0.101.7", + "serde", + "serde_json", + "webpki-roots 0.25.4", +] + [[package]] name = "mio" version = "1.0.4" @@ -3898,6 +3926,7 @@ version = "0.1.0" dependencies = [ "bincode", "cc", + "chromadb", "clap", "crypto", "dotenv", @@ -3917,6 +3946,7 @@ dependencies = [ "regex", "rspace_plus_plus", "serde", + "serde_json", "shared", "tempfile", "thiserror 2.0.17", diff --git a/rholang/Cargo.toml b/rholang/Cargo.toml index f26def1bd..c263e3049 100644 --- a/rholang/Cargo.toml +++ b/rholang/Cargo.toml @@ -45,6 +45,8 @@ tempfile = "3.19.1" clap = { version = "4.5", features = ["derive"] } tracing = { workspace = true } tracing-subscriber = { workspace = true } +chromadb = "2.3.0" +serde_json = { workspace = true } [build-dependencies] cc = "1.0" diff --git a/rholang/src/rust/interpreter/chromadb_service.rs b/rholang/src/rust/interpreter/chromadb_service.rs new file mode 100644 index 000000000..f68c0d1dc --- /dev/null +++ b/rholang/src/rust/interpreter/chromadb_service.rs @@ -0,0 +1,89 @@ +use chromadb::{client::ChromaClientOptions, ChromaClient, ChromaCollection}; +use futures::TryFutureExt; +use serde_json; + +use super::errors::InterpreterError; + +pub type CollectionMetadata = serde_json::Map; + +pub struct ChromaDBService { + client: ChromaClient, +} + +impl ChromaDBService { + pub async fn new() -> Self { + // TODO (chase): Do we need custom options? i.e custom database name, authentication method, and url? + // If the chroma db is hosted alongside the node locally, custom options don't make much sense. + let client = ChromaClient::new(ChromaClientOptions::default()) + .await + .expect("Failed to build ChromaDB client"); + + Self { client } + } + + /// Creates a collection with given name and metadata. Semantics follow [`ChromaClient::create_collection`]. + /// Also see [`ChromaCollection::modify`] + /// + /// # Arguments + /// + /// * `name` - The name of the collection to create + /// * `metadata` - Optional metadata to associate with the collection. + /// Must be a JSON object with keys and values that are either numbers, strings or floats. + /// * `update_if_exists` - If true, update collection metadata if it already exists. Otherwise, error if exists. + pub async fn create_collection( + &self, + name: &str, + metadata: Option, + update_if_exists: bool, + ) -> Result<(), InterpreterError> { + let metadata_ref = metadata.as_ref(); + self.client + .create_collection(name, metadata.clone(), update_if_exists) + .and_then(async move |collection| { + /* Ideally there ought to be a way to check whether the returned collection + from create_collection already existed or not (without extra API calls). + + However, such functionality does not currently exist - so we resort to testing + whether or not the metadata of the returned collection is the same as the one provided. + + If not, clearly this collection already existed (with a different metadata), and we must + update it. + */ + if update_if_exists && collection.metadata() != metadata_ref { + // Update the collection metadata if required. + return collection.modify(None, metadata_ref).await; + } + Ok(()) + }) + .await + .map_err(|err| { + InterpreterError::ChromaDBError(format!("Failed to create collection: {}", err)) + }) + } + + /// Gets the metadata of an existing collection. + pub async fn get_collection_meta( + &self, + name: &str, + ) -> Result, InterpreterError> { + self.get_collection(name) + .map_ok(|collection| collection.metadata().cloned()) + .await + } + + /* TODO (chase): Other potential collection related methods: + - rename collection (not that necessary?) + - list collections (bad idea probably) + - delete collection (should blockchain data really be deleted?) + */ + + /// Helper for getting a collection - not be exposed as a service method. + async fn get_collection(&self, name: &str) -> Result { + self.client.get_collection(name).await.map_err(|err| { + InterpreterError::ChromaDBError(format!( + "Failed to get collection with name {name}: {}", + err + )) + }) + } +} diff --git a/rholang/src/rust/interpreter/errors.rs b/rholang/src/rust/interpreter/errors.rs index 7e81d5ff9..43f3627e1 100644 --- a/rholang/src/rust/interpreter/errors.rs +++ b/rholang/src/rust/interpreter/errors.rs @@ -80,6 +80,7 @@ pub enum InterpreterError { col: usize, }, OpenAIError(String), + ChromaDBError(String), IllegalArgumentError(String), IoError(String), } @@ -272,6 +273,8 @@ impl fmt::Display for InterpreterError { InterpreterError::OpenAIError(msg) => write!(f, "OpenAI error: {}", msg), + InterpreterError::ChromaDBError(msg) => write!(f, "ChromaDB error: {}", msg), + InterpreterError::IllegalArgumentError(msg) => write!(f, "Illegal argument: {}", msg), InterpreterError::IoError(msg) => write!(f, "IO error: {}", msg), diff --git a/rholang/src/rust/interpreter/mod.rs b/rholang/src/rust/interpreter/mod.rs index 264f93d1f..f98d7d0a3 100644 --- a/rholang/src/rust/interpreter/mod.rs +++ b/rholang/src/rust/interpreter/mod.rs @@ -1,6 +1,7 @@ use errors::InterpreterError; pub mod accounting; +pub mod chromadb_service; pub mod compiler; pub mod contract_call; pub mod deploy_parameters; From 991571f397ea561bb6f375d85c9d12aed001e8cc Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sat, 6 Dec 2025 18:36:47 -0700 Subject: [PATCH 02/24] Stricter type for collection metadata --- .../src/rust/interpreter/chromadb_service.rs | 81 ++++++++++++++++++- 1 file changed, 77 insertions(+), 4 deletions(-) diff --git a/rholang/src/rust/interpreter/chromadb_service.rs b/rholang/src/rust/interpreter/chromadb_service.rs index f68c0d1dc..64e58d43b 100644 --- a/rholang/src/rust/interpreter/chromadb_service.rs +++ b/rholang/src/rust/interpreter/chromadb_service.rs @@ -1,10 +1,68 @@ +use std::collections::HashMap; + use chromadb::{client::ChromaClientOptions, ChromaClient, ChromaCollection}; use futures::TryFutureExt; use serde_json; use super::errors::InterpreterError; -pub type CollectionMetadata = serde_json::Map; +#[derive(Clone, PartialEq, Eq, Debug)] +pub enum MetadataValue { + StringMeta(String), + NumberMeta(i64), + NullMeta, + // TODO (chase): Support floating point numbers once Rholang does? +} + +impl MetadataValue { + /// Private helper that expects a valid json_val to be transformed. + /// We know that the metadata values returned by the ChromaDB API will be well-formed. + fn from_value(json_val: serde_json::Value) -> Result { + match json_val { + serde_json::Value::Null => Ok(Self::NullMeta), + serde_json::Value::Number(number) => + // TODO (chase): Must handle floats if/when supported. + { + number + .as_i64() + .map(Self::NumberMeta) + .ok_or(InterpreterError::ChromaDBError( + format!( + "Only i64 numbers are supported for ChromaDB collection metadata value + Encountered: {number:?}" + ) + .to_string(), + )) + } + serde_json::Value::String(str) => Ok(Self::StringMeta(str)), + _ => Err(InterpreterError::ChromaDBError(format!( + "Unsupported collection metadata Value\nEncountered: {json_val:?}" + ))), + } + } +} + +impl Into for MetadataValue { + fn into(self) -> serde_json::Value { + match self { + MetadataValue::NullMeta => serde_json::Value::Null, + MetadataValue::StringMeta(str) => serde_json::Value::String(str), + MetadataValue::NumberMeta(num) => serde_json::Value::Number(num.into()), + } + } +} + +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct CollectionMetadata(HashMap); + +impl Into> for CollectionMetadata { + fn into(self) -> serde_json::Map { + self.0 + .into_iter() + .map(|(meta_key, meta_val)| (meta_key, meta_val.into())) + .collect::>() + } +} pub struct ChromaDBService { client: ChromaClient, @@ -33,9 +91,11 @@ impl ChromaDBService { pub async fn create_collection( &self, name: &str, - metadata: Option, + smart_metadata: Option, update_if_exists: bool, ) -> Result<(), InterpreterError> { + let metadata: Option> = + smart_metadata.map(|x| x.into()); let metadata_ref = metadata.as_ref(); self.client .create_collection(name, metadata.clone(), update_if_exists) @@ -66,9 +126,22 @@ impl ChromaDBService { &self, name: &str, ) -> Result, InterpreterError> { - self.get_collection(name) + let metadata = self + .get_collection(name) .map_ok(|collection| collection.metadata().cloned()) - .await + .await?; + match metadata { + Some(meta) => { + let res = meta + .into_iter() + .map(|(key, val)| { + MetadataValue::from_value(val).map(move |res| (key.clone(), res)) + }) + .collect::, _>>()?; + Ok(Some(CollectionMetadata(res))) + } + None => Ok(None), + } } /* TODO (chase): Other potential collection related methods: From 9a0998cef753124b81421498b581986704db6f5f Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sat, 6 Dec 2025 23:02:12 -0700 Subject: [PATCH 03/24] Remove reduntant type param from `Extractor` --- rholang/src/rust/interpreter/rho_type.rs | 36 ++++++++++++------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/rholang/src/rust/interpreter/rho_type.rs b/rholang/src/rust/interpreter/rho_type.rs index e57ae89ea..9f023a83c 100644 --- a/rholang/src/rust/interpreter/rho_type.rs +++ b/rholang/src/rust/interpreter/rho_type.rs @@ -257,13 +257,13 @@ impl RhoSysAuthToken { } } -pub trait Extractor { +pub trait Extractor { type RustType; fn unapply(p: &Par) -> Option; } -impl Extractor for RhoBoolean { +impl Extractor for RhoBoolean { type RustType = bool; fn unapply(p: &Par) -> Option { @@ -271,7 +271,7 @@ impl Extractor for RhoBoolean { } } -impl Extractor for RhoString { +impl Extractor for RhoString { type RustType = String; fn unapply(p: &Par) -> Option { @@ -279,7 +279,7 @@ impl Extractor for RhoString { } } -impl Extractor for RhoNil { +impl Extractor for RhoNil { type RustType = (); fn unapply(p: &Par) -> Option { @@ -291,7 +291,7 @@ impl Extractor for RhoNil { } } -impl Extractor for RhoByteArray { +impl Extractor for RhoByteArray { type RustType = Vec; fn unapply(p: &Par) -> Option { @@ -299,7 +299,7 @@ impl Extractor for RhoByteArray { } } -impl Extractor for RhoDeployerId { +impl Extractor for RhoDeployerId { type RustType = Vec; fn unapply(p: &Par) -> Option { @@ -307,7 +307,7 @@ impl Extractor for RhoDeployerId { } } -impl Extractor for RhoName { +impl Extractor for RhoName { type RustType = GPrivate; fn unapply(p: &Par) -> Option { @@ -315,7 +315,7 @@ impl Extractor for RhoName { } } -impl Extractor for RhoNumber { +impl Extractor for RhoNumber { type RustType = i64; fn unapply(p: &Par) -> Option { @@ -323,7 +323,7 @@ impl Extractor for RhoNumber { } } -impl Extractor for RhoUri { +impl Extractor for RhoUri { type RustType = String; fn unapply(p: &Par) -> Option { @@ -331,7 +331,7 @@ impl Extractor for RhoUri { } } -impl Extractor for RhoUnforgeable { +impl Extractor for RhoUnforgeable { type RustType = GUnforgeable; fn unapply(p: &Par) -> Option { @@ -339,7 +339,7 @@ impl Extractor for RhoUnforgeable { } } -impl Extractor for RhoExpression { +impl Extractor for RhoExpression { type RustType = Expr; fn unapply(p: &Par) -> Option { @@ -347,7 +347,7 @@ impl Extractor for RhoExpression { } } -impl Extractor for RhoSysAuthToken { +impl Extractor for RhoSysAuthToken { type RustType = GSysAuthToken; fn unapply(p: &Par) -> Option { @@ -355,10 +355,10 @@ impl Extractor for RhoSysAuthToken { } } -impl Extractor<(A, B)> for (A, B) +impl Extractor for (A, B) where - A: Extractor, - B: Extractor, + A: Extractor, + B: Extractor, { type RustType = (A::RustType, B::RustType); @@ -372,10 +372,10 @@ where } } -impl Extractor> for Either +impl Extractor for Either where - A: Extractor, - B: Extractor, + A: Extractor, + B: Extractor, { type RustType = Either; From 046572134761772561ed186ae252cce286a2d232 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sat, 6 Dec 2025 23:56:41 -0700 Subject: [PATCH 04/24] Update service and introduce helper trait impls --- .../src/rust/interpreter/chromadb_service.rs | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/rholang/src/rust/interpreter/chromadb_service.rs b/rholang/src/rust/interpreter/chromadb_service.rs index 64e58d43b..2d0e529d2 100644 --- a/rholang/src/rust/interpreter/chromadb_service.rs +++ b/rholang/src/rust/interpreter/chromadb_service.rs @@ -2,8 +2,11 @@ use std::collections::HashMap; use chromadb::{client::ChromaClientOptions, ChromaClient, ChromaCollection}; use futures::TryFutureExt; +use models::rhoapi::Par; use serde_json; +use crate::rust::interpreter::rho_type::{Extractor, RhoNumber, RhoString}; + use super::errors::InterpreterError; #[derive(Clone, PartialEq, Eq, Debug)] @@ -14,6 +17,16 @@ pub enum MetadataValue { // TODO (chase): Support floating point numbers once Rholang does? } +impl Extractor for MetadataValue { + type RustType = MetadataValue; + + fn unapply(p: &Par) -> Option { + RhoNumber::unapply(p) + .map(MetadataValue::NumberMeta) + .or_else(|| RhoString::unapply(p).map(MetadataValue::StringMeta)) + } +} + impl MetadataValue { /// Private helper that expects a valid json_val to be transformed. /// We know that the metadata values returned by the ChromaDB API will be well-formed. @@ -64,6 +77,14 @@ impl Into> for CollectionMetadata { } } +impl Extractor for CollectionMetadata { + type RustType = CollectionMetadata; + + fn unapply(p: &Par) -> Option { + as Extractor>::unapply(p).map(CollectionMetadata) + } +} + pub struct ChromaDBService { client: ChromaClient, } @@ -85,20 +106,21 @@ impl ChromaDBService { /// # Arguments /// /// * `name` - The name of the collection to create + /// * `ignore_or_update_if_exists` - If true, update collection metadata (if provided, else ignore) + /// if it already exists. Otherwise, error if exists. /// * `metadata` - Optional metadata to associate with the collection. /// Must be a JSON object with keys and values that are either numbers, strings or floats. - /// * `update_if_exists` - If true, update collection metadata if it already exists. Otherwise, error if exists. pub async fn create_collection( &self, name: &str, + ignore_or_update_if_exists: bool, smart_metadata: Option, - update_if_exists: bool, ) -> Result<(), InterpreterError> { let metadata: Option> = smart_metadata.map(|x| x.into()); let metadata_ref = metadata.as_ref(); self.client - .create_collection(name, metadata.clone(), update_if_exists) + .create_collection(name, metadata.clone(), ignore_or_update_if_exists) .and_then(async move |collection| { /* Ideally there ought to be a way to check whether the returned collection from create_collection already existed or not (without extra API calls). @@ -109,7 +131,7 @@ impl ChromaDBService { If not, clearly this collection already existed (with a different metadata), and we must update it. */ - if update_if_exists && collection.metadata() != metadata_ref { + if ignore_or_update_if_exists && collection.metadata() != metadata_ref { // Update the collection metadata if required. return collection.modify(None, metadata_ref).await; } From 18c910c0ecef23887df0c48e209e12a64270f864 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sat, 6 Dec 2025 23:58:14 -0700 Subject: [PATCH 05/24] Add helpers for parsing Rholang list and map types --- rholang/src/rust/interpreter/rho_type.rs | 96 ++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/rholang/src/rust/interpreter/rho_type.rs b/rholang/src/rust/interpreter/rho_type.rs index 9f023a83c..8ea1affb4 100644 --- a/rholang/src/rust/interpreter/rho_type.rs +++ b/rholang/src/rust/interpreter/rho_type.rs @@ -1,12 +1,19 @@ // See rholang/src/main/scala/coop/rchain/rholang/interpreter/RhoType.scala +use std::collections::HashMap; +use std::hash::Hash; + use models::rhoapi::g_unforgeable::UnfInstance; +use models::rhoapi::EList; use models::rhoapi::ETuple; use models::rhoapi::GPrivate; use models::rhoapi::GSysAuthToken; use models::rhoapi::GUnforgeable; use models::rhoapi::{expr::ExprInstance, Expr, GDeployerId, Par}; +use models::rust::par_map::ParMap; +use models::rust::par_map_type_mapper::ParMapTypeMapper; use models::rust::rholang::implicits::{single_expr, single_unforgeable}; +use models::rust::sorted_par_map::SortedParMap; use rspace_plus_plus::rspace::history::Either; pub struct RhoNil; @@ -143,6 +150,57 @@ impl RhoTuple2 { } } +pub struct RhoList; + +impl RhoList { + pub fn create_par(list: Vec) -> Par { + Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::EListBody(EList { + ps: list, + locally_free: Vec::new(), + connective_used: false, + remainder: None, + })), + }]) + } + + pub fn unapply(p: Par) -> Option> { + if let Some(expr) = single_expr(&p) { + if let Expr { + expr_instance: Some(ExprInstance::EListBody(EList { ps, .. })), + } = expr + { + return Some(ps); + } + } + None + } +} + +pub struct RhoMap; + +impl RhoMap { + pub fn create_par(hash_map: HashMap) -> Par { + Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::EMapBody(ParMapTypeMapper::par_map_to_emap( + ParMap::create_from_sorted_par_map(SortedParMap::create_from_map(hash_map)), + ))), + }]) + } + + pub fn unapply(p: Par) -> Option> { + if let Some(expr) = single_expr(&p) { + if let Expr { + expr_instance: Some(ExprInstance::EMapBody(emap)), + } = expr + { + return Some(ParMapTypeMapper::emap_to_par_map(emap).ps.ps); + } + } + None + } +} + pub struct RhoUri; impl RhoUri { @@ -372,6 +430,44 @@ where } } +impl Extractor for Vec +where + A: Extractor, +{ + type RustType = Vec; + + fn unapply(p: &Par) -> Option { + if let Some(plist) = RhoList::unapply(p.clone()) { + return plist.into_iter().map(|par| A::unapply(&par)).collect(); + } + None + } +} + +impl Extractor for HashMap +where + A: Extractor, + B: Extractor, + A::RustType: Eq + Hash, +{ + type RustType = HashMap; + + fn unapply(p: &Par) -> Option { + if let Some(pmap) = RhoMap::unapply(p.clone()) { + return pmap + .into_iter() + .map( + |(pkey, pvalue)| match (A::unapply(&pkey), B::unapply(&pvalue)) { + (Some(key), Some(value)) => Some((key, value)), + _ => None, + }, + ) + .collect(); + } + None + } +} + impl Extractor for Either where A: Extractor, From b9a1b0228f1fb73d79476b1c8f6718c25ad84d03 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sat, 6 Dec 2025 23:59:04 -0700 Subject: [PATCH 06/24] Hook up `create_collection` as a system process --- rholang/src/rust/interpreter/rho_runtime.rs | 51 ++++++- .../src/rust/interpreter/system_processes.rs | 130 +++++++++++++++--- 2 files changed, 157 insertions(+), 24 deletions(-) diff --git a/rholang/src/rust/interpreter/rho_runtime.rs b/rholang/src/rust/interpreter/rho_runtime.rs index 055f03bc2..fbd51fc20 100644 --- a/rholang/src/rust/interpreter/rho_runtime.rs +++ b/rholang/src/rust/interpreter/rho_runtime.rs @@ -25,6 +25,7 @@ use rspace_plus_plus::rspace::tuplespace_interface::Tuplespace; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use crate::rust::interpreter::chromadb_service::ChromaDBService; use crate::rust::interpreter::openai_service::OpenAIService; use crate::rust::interpreter::system_processes::{BodyRefs, FixedChannels}; @@ -815,6 +816,28 @@ fn std_rho_ai_processes() -> Vec { ] } +fn std_rho_chroma_processes() -> Vec { + vec![ + Definition { + urn: "rho:chroma:collection:new".to_string(), + fixed_channel: FixedChannels::chroma_create_collection(), + // TODO (chase): How to define overloads? + // This function can support 3 or 2 arguments (last one is optional). + arity: 3, + body_ref: BodyRefs::CHROMA_CREATE_COLLECTION, + handler: Box::new(|ctx| { + Box::new(move |args| { + let ctx = ctx.clone(); + Box::pin( + async move { ctx.system_processes.clone().chroma_create_collection(args).await }, + ) + }) + }), + remainder: None, + }, + ] +} + fn dispatch_table_creator( space: RhoISpace, dispatcher: RhoDispatch, @@ -822,6 +845,7 @@ fn dispatch_table_creator( invalid_blocks: InvalidBlocks, extra_system_processes: &mut Vec, openai_service: Arc>, + chromadb_service: Arc>, ) -> RhoDispatchMap { let mut dispatch_table = HashMap::new(); @@ -829,6 +853,7 @@ fn dispatch_table_creator( std_rho_crypto_processes() .iter_mut() .chain(std_rho_ai_processes().iter_mut()) + .chain(std_rho_chroma_processes().iter_mut()) .chain(extra_system_processes.iter_mut()), ) { // TODO: Remove cloning every time @@ -838,6 +863,7 @@ fn dispatch_table_creator( block_data.clone(), invalid_blocks.clone(), openai_service.clone(), + chromadb_service.clone(), )); dispatch_table.insert(tuple.0, tuple.1); @@ -888,6 +914,7 @@ async fn setup_reducer( merge_chs: Arc>>, mergeable_tag_name: Par, openai_service: Arc>, + chromadb_service: Arc>, cost: _cost, ) -> DebruijnInterpreter { // println!("\nsetup_reducer"); @@ -906,6 +933,7 @@ async fn setup_reducer( invalid_blocks, extra_system_processes, openai_service, + chromadb_service, ); let dispatcher = Arc::new(RholangAndScalaDispatcher { @@ -941,10 +969,12 @@ fn setup_maps_and_refs( let system_binding = std_system_processes(); let rho_crypto_binding = std_rho_crypto_processes(); let rho_ai_binding = std_rho_ai_processes(); + let rho_chroma_binding = std_rho_chroma_processes(); let combined_processes = system_binding .iter() .chain(rho_crypto_binding.iter()) .chain(rho_ai_binding.iter()) + .chain(rho_chroma_binding.iter()) .chain(extra_system_processes.iter()) .collect::>(); @@ -996,6 +1026,7 @@ where ))); let openai_service = Arc::new(tokio::sync::Mutex::new(OpenAIService::new())); + let chromadb_service = Arc::new(tokio::sync::Mutex::new(ChromaDBService::new().await)); let reducer = setup_reducer( charging_rspace, block_data_ref.clone(), @@ -1005,6 +1036,7 @@ where merge_chs, mergeable_tag_name, openai_service, + chromadb_service, cost, ) .await; @@ -1101,7 +1133,11 @@ where /// # Returns /// /// A configured `RhoRuntimeImpl` instance ready for executing Rholang code. -#[tracing::instrument(name = "create-play-runtime", target = "f1r3fly.rholang.runtime", skip_all)] +#[tracing::instrument( + name = "create-play-runtime", + target = "f1r3fly.rholang.runtime", + skip_all +)] pub async fn create_rho_runtime( rspace: T, mergeable_tag_name: Par, @@ -1136,7 +1172,11 @@ where /// # Returns /// /// A configured `RhoRuntimeImpl` instance with replay capabilities. -#[tracing::instrument(name = "create-replay-runtime", target = "f1r3fly.rholang.runtime", skip_all)] +#[tracing::instrument( + name = "create-replay-runtime", + target = "f1r3fly.rholang.runtime", + skip_all +)] pub async fn create_replay_rho_runtime( rspace: T, mergeable_tag_name: Par, @@ -1197,7 +1237,11 @@ where (rho_runtime, replay_rho_runtime) } -#[tracing::instrument(name = "create-play-runtime", target = "f1r3fly.rholang.runtime.create-play", skip_all)] +#[tracing::instrument( + name = "create-play-runtime", + target = "f1r3fly.rholang.runtime.create-play", + skip_all +)] pub async fn create_runtime_from_kv_store( stores: RSpaceStore, mergeable_tag_name: Par, @@ -1205,7 +1249,6 @@ pub async fn create_runtime_from_kv_store( additional_system_processes: &mut Vec, matcher: Arc>>, ) -> RhoRuntimeImpl { - let space: RSpace = RSpace::create(stores, matcher).unwrap(); diff --git a/rholang/src/rust/interpreter/system_processes.rs b/rholang/src/rust/interpreter/system_processes.rs index 02c3a71d5..3f52502cc 100644 --- a/rholang/src/rust/interpreter/system_processes.rs +++ b/rholang/src/rust/interpreter/system_processes.rs @@ -1,3 +1,6 @@ +use crate::rust::interpreter::chromadb_service::{ChromaDBService, CollectionMetadata}; +use crate::rust::interpreter::rho_type::Extractor; + use super::contract_call::ContractCall; use super::dispatch::RhoDispatch; use super::errors::{illegal_argument_error, InterpreterError}; @@ -37,8 +40,10 @@ use std::sync::Arc; // NOTE: Not implementing Logger pub type RhoSysFunction = Box< dyn Fn( - (Vec, bool, Vec), - ) -> Pin, InterpreterError>> + Send>> + Send + Sync, + (Vec, bool, Vec), + ) -> Pin, InterpreterError>> + Send>> + + Send + + Sync, >; pub type RhoDispatchMap = Arc>>; pub type Name = Par; @@ -170,6 +175,15 @@ impl FixedChannels { pub fn dev_null() -> Par { byte_name(24) } + + // ChromaDB section start + + // these bytes may need to change during finalization. + pub fn chroma_create_collection() -> Par { + byte_name(25) + } + + // ChromaDB section end } pub struct BodyRefs; @@ -196,6 +210,7 @@ impl BodyRefs { pub const RANDOM: i64 = 20; pub const GRPC_TELL: i64 = 21; pub const DEV_NULL: i64 = 22; + pub const CHROMA_CREATE_COLLECTION: i64 = 25; } pub fn non_deterministic_ops() -> HashSet { @@ -204,6 +219,7 @@ pub fn non_deterministic_ops() -> HashSet { BodyRefs::DALLE3, BodyRefs::TEXT_TO_AUDIO, BodyRefs::RANDOM, + BodyRefs::CHROMA_CREATE_COLLECTION, ]) } @@ -223,6 +239,7 @@ impl ProcessContext { block_data: Arc>, invalid_blocks: InvalidBlocks, openai_service: Arc>, + chromadb_service: Arc>, ) -> Self { ProcessContext { space: space.clone(), @@ -234,6 +251,7 @@ impl ProcessContext { space, block_data, openai_service, + chromadb_service, ), } } @@ -246,13 +264,15 @@ pub struct Definition { pub body_ref: BodyRef, pub handler: Box< dyn FnMut( - ProcessContext, - ) -> Box< - dyn Fn( - (Vec, bool, Vec), - ) - -> Pin, InterpreterError>> + Send>> + Send + Sync, - > + Send, + ProcessContext, + ) -> Box< + dyn Fn( + (Vec, bool, Vec), + ) + -> Pin, InterpreterError>> + Send>> + + Send + + Sync, + > + Send, >, pub remainder: Remainder, } @@ -265,13 +285,15 @@ impl Definition { body_ref: BodyRef, handler: Box< dyn FnMut( - ProcessContext, - ) -> Box< - dyn Fn( - (Vec, bool, Vec), - ) - -> Pin, InterpreterError>> + Send>> + Send + Sync, - > + Send, + ProcessContext, + ) -> Box< + dyn Fn( + (Vec, bool, Vec), + ) -> Pin< + Box, InterpreterError>> + Send>, + > + Send + + Sync, + > + Send, >, remainder: Remainder, ) -> Self { @@ -292,9 +314,11 @@ impl Definition { BodyRef, Box< dyn Fn( - (Vec, bool, Vec), - ) - -> Pin, InterpreterError>> + Send>> + Send + Sync, + (Vec, bool, Vec), + ) + -> Pin, InterpreterError>> + Send>> + + Send + + Sync, >, ) { (self.body_ref, (self.handler)(context)) @@ -355,6 +379,7 @@ pub struct SystemProcesses { pub space: RhoISpace, pub block_data: Arc>, openai_service: Arc>, + chromadb_service: Arc>, pretty_printer: PrettyPrinter, } @@ -364,12 +389,14 @@ impl SystemProcesses { space: RhoISpace, block_data: Arc>, openai_service: Arc>, + chromadb_service: Arc>, ) -> Self { SystemProcesses { dispatcher, space, block_data, openai_service, + chromadb_service, pretty_printer: PrettyPrinter::new(), } } @@ -1281,6 +1308,67 @@ impl SystemProcesses { Err(illegal_argument_error("casper_invalid_blocks_set")) } } + + // ChromaDB section start + + /// This supports two overloads: + /// - (collection_name: &str, ignore_if_exists: bool) + /// - (collection_name: &str, update_if_exists: bool, metadata: CollectionMetadata) + pub async fn chroma_create_collection( + &self, + contract_args: (Vec, bool, Vec), + ) -> Result, InterpreterError> { + let Some((produce, is_replay, previous_output, args)) = + self.is_contract_call().unapply(contract_args) + else { + return Err(illegal_argument_error("chroma_create_collection")); + }; + + let (collection_name, ignore_or_update_if_exists, metadata, ack) = match args.as_slice() { + [collection_name_par, update_if_exists_par, metadata_par, ack] => { + let (Some(collection_name), Some(update_if_exists), Some(metadata)) = ( + RhoString::unapply(collection_name_par), + RhoBoolean::unapply(update_if_exists_par), + ::unapply(metadata_par), + ) else { + return Err(illegal_argument_error("chroma_create_collection")); + }; + Ok((collection_name, update_if_exists, Some(metadata), ack)) + } + [collection_name_par, ignore_if_exists_par, ack] => { + let (Some(collection_name), Some(ignore_if_exists)) = ( + RhoString::unapply(collection_name_par), + RhoBoolean::unapply(ignore_if_exists_par), + ) else { + return Err(illegal_argument_error("chroma_create_collection")); + }; + Ok((collection_name, ignore_if_exists, None, ack)) + } + _ => Err(illegal_argument_error("chroma_create_collection")), + }?; + + // Common piece of code. + if is_replay { + produce(&previous_output, ack).await?; + return Ok(previous_output); + } + + let chromadb_service = self.chromadb_service.lock().await; + match chromadb_service + .create_collection(&collection_name, ignore_or_update_if_exists, metadata) + .await + { + Ok(_) => Ok(vec![]), + Err(e) => { + // TODO (chase): Is this right? It seems like other service methods do something similar. + let p = RhoString::create_par(collection_name); + produce(&[p], ack).await?; + return Err(e); + } + } + } + + // ChromaDB section end } // See casper/src/test/scala/coop/rchain/casper/helper/RhoSpec.scala @@ -1398,7 +1486,9 @@ pub fn test_framework_contracts() -> Vec { Box::new(move |args| { let sp = sp.clone(); let invalid_blocks = invalid_blocks.clone(); - Box::pin(async move { sp.casper_invalid_blocks_set(args, &invalid_blocks).await }) + Box::pin( + async move { sp.casper_invalid_blocks_set(args, &invalid_blocks).await }, + ) }) }), remainder: None, From edc809ae581cf5aa9cf2c4ca6eb3a06c5f16d237 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 7 Dec 2025 13:43:15 -0700 Subject: [PATCH 07/24] Fix create_collection arity declaration --- rholang/src/rust/interpreter/rho_runtime.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rholang/src/rust/interpreter/rho_runtime.rs b/rholang/src/rust/interpreter/rho_runtime.rs index fbd51fc20..da8df3a82 100644 --- a/rholang/src/rust/interpreter/rho_runtime.rs +++ b/rholang/src/rust/interpreter/rho_runtime.rs @@ -822,8 +822,8 @@ fn std_rho_chroma_processes() -> Vec { urn: "rho:chroma:collection:new".to_string(), fixed_channel: FixedChannels::chroma_create_collection(), // TODO (chase): How to define overloads? - // This function can support 3 or 2 arguments (last one is optional). - arity: 3, + // This function can support 4 or 3 arguments (including ack) (second to last one is optional). + arity: 4, body_ref: BodyRefs::CHROMA_CREATE_COLLECTION, handler: Box::new(|ctx| { Box::new(move |args| { From ed285d38cfd0011384194630b653c0be35889cea Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 7 Dec 2025 13:43:38 -0700 Subject: [PATCH 08/24] No overloading support at the moment --- .../src/rust/interpreter/chromadb_service.rs | 8 ++++--- .../src/rust/interpreter/system_processes.rs | 22 +++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/rholang/src/rust/interpreter/chromadb_service.rs b/rholang/src/rust/interpreter/chromadb_service.rs index 2d0e529d2..d6292ab48 100644 --- a/rholang/src/rust/interpreter/chromadb_service.rs +++ b/rholang/src/rust/interpreter/chromadb_service.rs @@ -106,8 +106,10 @@ impl ChromaDBService { /// # Arguments /// /// * `name` - The name of the collection to create - /// * `ignore_or_update_if_exists` - If true, update collection metadata (if provided, else ignore) - /// if it already exists. Otherwise, error if exists. + /// * `ignore_or_update_if_exists` - + /// If true and a non-empty collection metadata is proivded, update any existing metadata. + /// If true and no metadata is provided, ignore existing collection. + /// If false, error if a collection with the same name already exists. /// * `metadata` - Optional metadata to associate with the collection. /// Must be a JSON object with keys and values that are either numbers, strings or floats. pub async fn create_collection( @@ -117,7 +119,7 @@ impl ChromaDBService { smart_metadata: Option, ) -> Result<(), InterpreterError> { let metadata: Option> = - smart_metadata.map(|x| x.into()); + smart_metadata.and_then(|x| if x.0.is_empty() { None } else { Some(x.into()) }); let metadata_ref = metadata.as_ref(); self.client .create_collection(name, metadata.clone(), ignore_or_update_if_exists) diff --git a/rholang/src/rust/interpreter/system_processes.rs b/rholang/src/rust/interpreter/system_processes.rs index 3f52502cc..119125ad6 100644 --- a/rholang/src/rust/interpreter/system_processes.rs +++ b/rholang/src/rust/interpreter/system_processes.rs @@ -1311,9 +1311,6 @@ impl SystemProcesses { // ChromaDB section start - /// This supports two overloads: - /// - (collection_name: &str, ignore_if_exists: bool) - /// - (collection_name: &str, update_if_exists: bool, metadata: CollectionMetadata) pub async fn chroma_create_collection( &self, contract_args: (Vec, bool, Vec), @@ -1335,15 +1332,16 @@ impl SystemProcesses { }; Ok((collection_name, update_if_exists, Some(metadata), ack)) } - [collection_name_par, ignore_if_exists_par, ack] => { - let (Some(collection_name), Some(ignore_if_exists)) = ( - RhoString::unapply(collection_name_par), - RhoBoolean::unapply(ignore_if_exists_par), - ) else { - return Err(illegal_argument_error("chroma_create_collection")); - }; - Ok((collection_name, ignore_if_exists, None, ack)) - } + // TODO (chase): If overloading is supported for system processes - support the below method as well. + // [collection_name_par, ignore_if_exists_par, ack] => { + // let (Some(collection_name), Some(ignore_if_exists)) = ( + // RhoString::unapply(collection_name_par), + // RhoBoolean::unapply(ignore_if_exists_par), + // ) else { + // return Err(illegal_argument_error("chroma_create_collection")); + // }; + // Ok((collection_name, ignore_if_exists, None, ack)) + // } _ => Err(illegal_argument_error("chroma_create_collection")), }?; From 4297d882afdd32f2e2886160b65dd4a5c9c6c9bb Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 7 Dec 2025 13:43:48 -0700 Subject: [PATCH 09/24] Add examples for create-collection usage --- .../system-contract/chroma-db/01-create-collection.rho | 6 ++++++ .../system-contract/chroma-db/02-create-collection-meta.rho | 6 ++++++ 2 files changed, 12 insertions(+) create mode 100644 rholang/examples/system-contract/chroma-db/01-create-collection.rho create mode 100644 rholang/examples/system-contract/chroma-db/02-create-collection-meta.rho diff --git a/rholang/examples/system-contract/chroma-db/01-create-collection.rho b/rholang/examples/system-contract/chroma-db/01-create-collection.rho new file mode 100644 index 000000000..81781f1b9 --- /dev/null +++ b/rholang/examples/system-contract/chroma-db/01-create-collection.rho @@ -0,0 +1,6 @@ +new createCollection(`rho:chroma:collection:new`), stdout(`rho:io:stdout`), retCh in { + createCollection!("foo", true, {}, *retCh) | + for(@res <- retCh) { + stdout!("Creation succeeded!") + } +} \ No newline at end of file diff --git a/rholang/examples/system-contract/chroma-db/02-create-collection-meta.rho b/rholang/examples/system-contract/chroma-db/02-create-collection-meta.rho new file mode 100644 index 000000000..e654244da --- /dev/null +++ b/rholang/examples/system-contract/chroma-db/02-create-collection-meta.rho @@ -0,0 +1,6 @@ +new createCollection(`rho:chroma:collection:new`), stdout(`rho:io:stdout`), retCh in { + createCollection!("foo", true, {"meta1" : 1, "two" : "42", "three" : 42, "meta2": "bar"}, *retCh) | + for(@res <- retCh) { + stdout!("Creation succeeded!") + } +} \ No newline at end of file From bfae4e5425dfc9d67d56e1b9369c42e3401e9e4e Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 7 Dec 2025 14:31:12 -0700 Subject: [PATCH 10/24] Utilities for transforming collection metadata into Rholang par --- .../src/rust/interpreter/chromadb_service.rs | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/rholang/src/rust/interpreter/chromadb_service.rs b/rholang/src/rust/interpreter/chromadb_service.rs index d6292ab48..ecc200e00 100644 --- a/rholang/src/rust/interpreter/chromadb_service.rs +++ b/rholang/src/rust/interpreter/chromadb_service.rs @@ -5,7 +5,7 @@ use futures::TryFutureExt; use models::rhoapi::Par; use serde_json; -use crate::rust::interpreter::rho_type::{Extractor, RhoNumber, RhoString}; +use crate::rust::interpreter::rho_type::{Extractor, RhoMap, RhoNil, RhoNumber, RhoString}; use super::errors::InterpreterError; @@ -17,13 +17,26 @@ pub enum MetadataValue { // TODO (chase): Support floating point numbers once Rholang does? } +impl Into for MetadataValue { + fn into(self) -> Par { + match self { + Self::StringMeta(s) => RhoString::create_par(s), + Self::NumberMeta(n) => RhoNumber::create_par(n), + Self::NullMeta => RhoNil::create_par(), + } + } +} + impl Extractor for MetadataValue { type RustType = MetadataValue; fn unapply(p: &Par) -> Option { + if p.is_nil() { + return Some(Self::NullMeta); + } RhoNumber::unapply(p) - .map(MetadataValue::NumberMeta) - .or_else(|| RhoString::unapply(p).map(MetadataValue::StringMeta)) + .map(Self::NumberMeta) + .or_else(|| RhoString::unapply(p).map(Self::StringMeta)) } } @@ -77,6 +90,17 @@ impl Into> for CollectionMetadata { } } +impl Into for CollectionMetadata { + fn into(self) -> Par { + RhoMap::create_par( + self.0 + .into_iter() + .map(|(key, val)| (RhoString::create_par(key), val.into())) + .collect(), + ) + } +} + impl Extractor for CollectionMetadata { type RustType = CollectionMetadata; From 0f9b3ec099c88048663537929533c52ee3f578ce Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 7 Dec 2025 14:31:54 -0700 Subject: [PATCH 11/24] Add ChromaDB get collection metadata service method --- rholang/src/rust/interpreter/rho_runtime.rs | 27 +++++- .../src/rust/interpreter/system_processes.rs | 93 ++++++++++++++----- 2 files changed, 93 insertions(+), 27 deletions(-) diff --git a/rholang/src/rust/interpreter/rho_runtime.rs b/rholang/src/rust/interpreter/rho_runtime.rs index da8df3a82..be1ab26d8 100644 --- a/rholang/src/rust/interpreter/rho_runtime.rs +++ b/rholang/src/rust/interpreter/rho_runtime.rs @@ -828,9 +828,30 @@ fn std_rho_chroma_processes() -> Vec { handler: Box::new(|ctx| { Box::new(move |args| { let ctx = ctx.clone(); - Box::pin( - async move { ctx.system_processes.clone().chroma_create_collection(args).await }, - ) + Box::pin(async move { + ctx.system_processes + .clone() + .chroma_create_collection(args) + .await + }) + }) + }), + remainder: None, + }, + Definition { + urn: "rho:chroma:collection:meta".to_string(), + fixed_channel: FixedChannels::chroma_get_collection_meta(), + arity: 2, + body_ref: BodyRefs::CHROMA_GET_COLLECTION_META, + handler: Box::new(|ctx| { + Box::new(move |args| { + let ctx = ctx.clone(); + Box::pin(async move { + ctx.system_processes + .clone() + .chroma_get_collection_meta(args) + .await + }) }) }), remainder: None, diff --git a/rholang/src/rust/interpreter/system_processes.rs b/rholang/src/rust/interpreter/system_processes.rs index 119125ad6..ba18d8493 100644 --- a/rholang/src/rust/interpreter/system_processes.rs +++ b/rholang/src/rust/interpreter/system_processes.rs @@ -1,5 +1,5 @@ use crate::rust::interpreter::chromadb_service::{ChromaDBService, CollectionMetadata}; -use crate::rust::interpreter::rho_type::Extractor; +use crate::rust::interpreter::rho_type::{Extractor, RhoNil}; use super::contract_call::ContractCall; use super::dispatch::RhoDispatch; @@ -183,6 +183,10 @@ impl FixedChannels { byte_name(25) } + pub fn chroma_get_collection_meta() -> Par { + byte_name(26) + } + // ChromaDB section end } @@ -211,6 +215,7 @@ impl BodyRefs { pub const GRPC_TELL: i64 = 21; pub const DEV_NULL: i64 = 22; pub const CHROMA_CREATE_COLLECTION: i64 = 25; + pub const CHROMA_GET_COLLECTION_META: i64 = 26; } pub fn non_deterministic_ops() -> HashSet { @@ -220,6 +225,7 @@ pub fn non_deterministic_ops() -> HashSet { BodyRefs::TEXT_TO_AUDIO, BodyRefs::RANDOM, BodyRefs::CHROMA_CREATE_COLLECTION, + BodyRefs::CHROMA_GET_COLLECTION_META, ]) } @@ -1321,29 +1327,24 @@ impl SystemProcesses { return Err(illegal_argument_error("chroma_create_collection")); }; - let (collection_name, ignore_or_update_if_exists, metadata, ack) = match args.as_slice() { - [collection_name_par, update_if_exists_par, metadata_par, ack] => { - let (Some(collection_name), Some(update_if_exists), Some(metadata)) = ( - RhoString::unapply(collection_name_par), - RhoBoolean::unapply(update_if_exists_par), - ::unapply(metadata_par), - ) else { - return Err(illegal_argument_error("chroma_create_collection")); - }; - Ok((collection_name, update_if_exists, Some(metadata), ack)) - } - // TODO (chase): If overloading is supported for system processes - support the below method as well. - // [collection_name_par, ignore_if_exists_par, ack] => { - // let (Some(collection_name), Some(ignore_if_exists)) = ( - // RhoString::unapply(collection_name_par), - // RhoBoolean::unapply(ignore_if_exists_par), - // ) else { - // return Err(illegal_argument_error("chroma_create_collection")); - // }; - // Ok((collection_name, ignore_if_exists, None, ack)) - // } - _ => Err(illegal_argument_error("chroma_create_collection")), - }?; + let [collection_name_par, ignore_or_update_if_exists_par, metadata_par, ack] = + args.as_slice() + else { + return Err(illegal_argument_error("chroma_create_collection")); + }; + + let (Some(collection_name), Some(ignore_or_update_if_exists), Some(metadata)) = ( + RhoString::unapply(collection_name_par), + RhoBoolean::unapply(ignore_or_update_if_exists_par), + // It can either be nil, or a metadata map. + if metadata_par.is_nil() { + Some(None) + } else { + ::unapply(metadata_par).map(Some) + }, + ) else { + return Err(illegal_argument_error("chroma_create_collection")); + }; // Common piece of code. if is_replay { @@ -1366,6 +1367,50 @@ impl SystemProcesses { } } + pub async fn chroma_get_collection_meta( + &self, + contract_args: (Vec, bool, Vec), + ) -> Result, InterpreterError> { + let Some((produce, is_replay, previous_output, args)) = + self.is_contract_call().unapply(contract_args) + else { + return Err(illegal_argument_error("chroma_get_collection_meta")); + }; + + let [collection_name_par, ack] = args.as_slice() else { + return Err(illegal_argument_error("chroma_get_collection_meta")); + }; + let Some(collection_name) = RhoString::unapply(collection_name_par) else { + return Err(illegal_argument_error("chroma_get_collection_meta")); + }; + + // Common piece of code. + if is_replay { + produce(&previous_output, ack).await?; + return Ok(previous_output); + } + + let chromadb_service = self.chromadb_service.lock().await; + match chromadb_service.get_collection_meta(&collection_name).await { + Ok(meta) => { + let result_par = match meta { + None => RhoNil::create_par(), + Some(inner) => inner.into(), + }; + + let output = vec![result_par]; + produce(&output, &ack).await?; + Ok(output) + } + Err(e) => { + // TODO (chase): Is this right? It seems like other service methods do something similar. + let p = RhoString::create_par(collection_name); + produce(&[p], ack).await?; + return Err(e); + } + } + } + // ChromaDB section end } From c3c02192532c73c6e4b0d49d28ce0b8f6f034690 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 7 Dec 2025 14:32:00 -0700 Subject: [PATCH 12/24] Update example to use nil --- .../examples/system-contract/chroma-db/01-create-collection.rho | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rholang/examples/system-contract/chroma-db/01-create-collection.rho b/rholang/examples/system-contract/chroma-db/01-create-collection.rho index 81781f1b9..4a6ff8be4 100644 --- a/rholang/examples/system-contract/chroma-db/01-create-collection.rho +++ b/rholang/examples/system-contract/chroma-db/01-create-collection.rho @@ -1,5 +1,5 @@ new createCollection(`rho:chroma:collection:new`), stdout(`rho:io:stdout`), retCh in { - createCollection!("foo", true, {}, *retCh) | + createCollection!("foo", true, Nil, *retCh) | for(@res <- retCh) { stdout!("Creation succeeded!") } From 4c7a39c9a92ea951ca4ae3c37a08ce9c66907ecb Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 7 Dec 2025 15:55:06 -0700 Subject: [PATCH 13/24] Add upsert collection entries service method --- rholang/Cargo.toml | 2 +- .../src/rust/interpreter/chromadb_service.rs | 92 +++++++++++++++---- .../src/rust/interpreter/system_processes.rs | 4 +- 3 files changed, 79 insertions(+), 19 deletions(-) diff --git a/rholang/Cargo.toml b/rholang/Cargo.toml index c263e3049..8fd148924 100644 --- a/rholang/Cargo.toml +++ b/rholang/Cargo.toml @@ -45,7 +45,7 @@ tempfile = "3.19.1" clap = { version = "4.5", features = ["derive"] } tracing = { workspace = true } tracing-subscriber = { workspace = true } -chromadb = "2.3.0" +chromadb = { version = "2.3.0", features = ["openai"] } serde_json = { workspace = true } [build-dependencies] diff --git a/rholang/src/rust/interpreter/chromadb_service.rs b/rholang/src/rust/interpreter/chromadb_service.rs index ecc200e00..0358aa0c4 100644 --- a/rholang/src/rust/interpreter/chromadb_service.rs +++ b/rholang/src/rust/interpreter/chromadb_service.rs @@ -1,6 +1,9 @@ use std::collections::HashMap; -use chromadb::{client::ChromaClientOptions, ChromaClient, ChromaCollection}; +use chromadb::{ + client::ChromaClientOptions, collection::CollectionEntries as ChromaCollectionEntries, + embeddings::openai::OpenAIEmbeddings, ChromaClient, ChromaCollection, +}; use futures::TryFutureExt; use models::rhoapi::Par; use serde_json; @@ -79,9 +82,9 @@ impl Into for MetadataValue { } #[derive(Clone, PartialEq, Eq, Debug)] -pub struct CollectionMetadata(HashMap); +pub struct Metadata(HashMap); -impl Into> for CollectionMetadata { +impl Into> for Metadata { fn into(self) -> serde_json::Map { self.0 .into_iter() @@ -90,7 +93,7 @@ impl Into> for CollectionMetadata { } } -impl Into for CollectionMetadata { +impl Into for Metadata { fn into(self) -> Par { RhoMap::create_par( self.0 @@ -101,14 +104,24 @@ impl Into for CollectionMetadata { } } -impl Extractor for CollectionMetadata { - type RustType = CollectionMetadata; +impl Extractor for Metadata { + type RustType = Metadata; fn unapply(p: &Par) -> Option { - as Extractor>::unapply(p).map(CollectionMetadata) + as Extractor>::unapply(p).map(Metadata) } } +/// An entry in a collection. +/// At the moment, the embeddings are calculated using the OpenAI embedding function. +pub struct CollectionEntry<'a> { + document: &'a str, + metadata: Option, +} + +/// A mapping from a collection entry ID to the entry itself. +pub struct CollectionEntries<'a>(HashMap<&'a str, CollectionEntry<'a>>); + pub struct ChromaDBService { client: ChromaClient, } @@ -140,13 +153,13 @@ impl ChromaDBService { &self, name: &str, ignore_or_update_if_exists: bool, - smart_metadata: Option, + metadata: Option, ) -> Result<(), InterpreterError> { - let metadata: Option> = - smart_metadata.and_then(|x| if x.0.is_empty() { None } else { Some(x.into()) }); - let metadata_ref = metadata.as_ref(); + let dumb_metadata: Option> = + metadata.and_then(|x| if x.0.is_empty() { None } else { Some(x.into()) }); + let dumb_metadata_ref = dumb_metadata.as_ref(); self.client - .create_collection(name, metadata.clone(), ignore_or_update_if_exists) + .create_collection(name, dumb_metadata.clone(), ignore_or_update_if_exists) .and_then(async move |collection| { /* Ideally there ought to be a way to check whether the returned collection from create_collection already existed or not (without extra API calls). @@ -157,9 +170,9 @@ impl ChromaDBService { If not, clearly this collection already existed (with a different metadata), and we must update it. */ - if ignore_or_update_if_exists && collection.metadata() != metadata_ref { + if ignore_or_update_if_exists && collection.metadata() != dumb_metadata_ref { // Update the collection metadata if required. - return collection.modify(None, metadata_ref).await; + return collection.modify(None, dumb_metadata_ref).await; } Ok(()) }) @@ -173,7 +186,7 @@ impl ChromaDBService { pub async fn get_collection_meta( &self, name: &str, - ) -> Result, InterpreterError> { + ) -> Result, InterpreterError> { let metadata = self .get_collection(name) .map_ok(|collection| collection.metadata().cloned()) @@ -186,12 +199,59 @@ impl ChromaDBService { MetadataValue::from_value(val).map(move |res| (key.clone(), res)) }) .collect::, _>>()?; - Ok(Some(CollectionMetadata(res))) + Ok(Some(Metadata(res))) } None => Ok(None), } } + /// Upserts the given entries into the identified collection. See [`ChromaCollection::upsert`] + /// + /// # Arguments + /// + /// * `collection_name` - The name of the collection to create + /// * `entries` - A mapping of entry ID to entry. + /// + /// The embeddings are auto generated using OpenAI embedding function. + pub async fn upsert_entries<'a>( + &self, + collection_name: &str, + entries: CollectionEntries<'a>, + ) -> Result<(), InterpreterError> { + // Obtain the collection. + let collection = self.get_collection(collection_name).await?; + + // Transform the input into the version that the API expects. + let mut ids_vec: Vec<&'a str> = Vec::with_capacity(entries.0.len()); + let mut documents_vec = Vec::with_capacity(entries.0.len()); + let mut metadatas_vec = Vec::with_capacity(entries.0.len()); + for (entry_id, entry) in entries.0.into_iter() { + ids_vec.push(entry_id); + documents_vec.push(entry.document); + metadatas_vec.push(entry.metadata.unwrap_or(Metadata(HashMap::new())).into()); + } + let dumb_entries = ChromaCollectionEntries { + ids: ids_vec, + documents: Some(documents_vec), + metadatas: Some(metadatas_vec), + // The embedding are currently auto-filled by a pre-chosen embedding function. + embeddings: None, + }; + + // We'll use OpenAI to generate embeddings. + let embeddingsf = OpenAIEmbeddings::new(Default::default()); + collection + .upsert(dumb_entries, Some(Box::new(embeddingsf))) + .await + .map_err(|err| { + InterpreterError::ChromaDBError(format!( + "Failed to upsert entries in collection {collection_name}: {}", + err + )) + })?; + Ok(()) + } + /* TODO (chase): Other potential collection related methods: - rename collection (not that necessary?) - list collections (bad idea probably) diff --git a/rholang/src/rust/interpreter/system_processes.rs b/rholang/src/rust/interpreter/system_processes.rs index ba18d8493..ac4e1342c 100644 --- a/rholang/src/rust/interpreter/system_processes.rs +++ b/rholang/src/rust/interpreter/system_processes.rs @@ -1,4 +1,4 @@ -use crate::rust::interpreter::chromadb_service::{ChromaDBService, CollectionMetadata}; +use crate::rust::interpreter::chromadb_service::{ChromaDBService, Metadata}; use crate::rust::interpreter::rho_type::{Extractor, RhoNil}; use super::contract_call::ContractCall; @@ -1340,7 +1340,7 @@ impl SystemProcesses { if metadata_par.is_nil() { Some(None) } else { - ::unapply(metadata_par).map(Some) + ::unapply(metadata_par).map(Some) }, ) else { return Err(illegal_argument_error("chroma_create_collection")); From 7b4724094435f0f4a0d08e4a784e03c0b47436c5 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 7 Dec 2025 19:56:23 -0700 Subject: [PATCH 14/24] Fix rho type utilities taking direct value instead of reference --- rholang/src/rust/interpreter/reduce.rs | 18 +++++++++--------- rholang/src/rust/interpreter/rho_type.rs | 12 ++++++------ 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/rholang/src/rust/interpreter/reduce.rs b/rholang/src/rust/interpreter/reduce.rs index 442bdfb8b..83be073a3 100644 --- a/rholang/src/rust/interpreter/reduce.rs +++ b/rholang/src/rust/interpreter/reduce.rs @@ -352,7 +352,7 @@ impl DebruijnInterpreter { let data_clone = data.clone(); let persistent_flag = persistent; let is_replay_flag = is_replay; - + let mut futures: Vec< Pin< Box< @@ -362,10 +362,10 @@ impl DebruijnInterpreter { >, >, > = vec![]; - + let dispatch_fut = self_clone1.dispatch(continuation_clone, data_list_clone, is_replay_flag, previous_output_clone); futures.push(Box::pin(dispatch_fut) as Pin> + std::marker::Send>>); - + let produce_fut = self_clone2.produce(chan_clone, data_clone, persistent_flag); futures.push(Box::pin(produce_fut) as Pin> + std::marker::Send>>); @@ -384,7 +384,7 @@ impl DebruijnInterpreter { let continuation_clone = continuation.clone(); let data_list_clone = data_list.clone(); let previous_output_clone = previous_output_as_par.clone(); - + let mut futures: Vec< Pin< Box< @@ -449,7 +449,7 @@ impl DebruijnInterpreter { let persistent_flag = persistent; let peek_flag = peek; let is_replay_flag = is_replay; - + let mut futures: Vec< Pin< Box< @@ -459,10 +459,10 @@ impl DebruijnInterpreter { >, >, > = vec![]; - + let dispatch_fut = self_clone1.dispatch(continuation_clone, data_list_clone, is_replay_flag, previous_output_clone); futures.push(Box::pin(dispatch_fut) as Pin> + std::marker::Send>>); - + let consume_fut = self_clone2.consume(binds_clone, body_clone, persistent_flag, peek_flag); futures.push(Box::pin(consume_fut) as Pin> + std::marker::Send>>); @@ -481,7 +481,7 @@ impl DebruijnInterpreter { let continuation_clone = continuation.clone(); let data_list_clone = data_list.clone(); let previous_output_clone = previous_output_as_par.clone(); - + let mut futures: Vec< Pin< Box< @@ -3150,7 +3150,7 @@ impl DebruijnInterpreter { remainder: Option, ) -> Result { let key_pairs: Vec> = - ps.into_iter().map(|p| RhoTuple2::unapply(p)).collect(); + ps.into_iter().map(|p| RhoTuple2::unapply(&p)).collect(); if key_pairs.iter().any(|pair| !pair.is_some()) { Err(InterpreterError::MethodNotDefined { diff --git a/rholang/src/rust/interpreter/rho_type.rs b/rholang/src/rust/interpreter/rho_type.rs index 8ea1affb4..6cfd1e18e 100644 --- a/rholang/src/rust/interpreter/rho_type.rs +++ b/rholang/src/rust/interpreter/rho_type.rs @@ -133,7 +133,7 @@ impl RhoTuple2 { }]) } - pub fn unapply(p: Par) -> Option<(Par, Par)> { + pub fn unapply(p: &Par) -> Option<(Par, Par)> { if let Some(expr) = single_expr(&p) { if let Expr { expr_instance: Some(ExprInstance::ETupleBody(ETuple { ps, .. })), @@ -164,7 +164,7 @@ impl RhoList { }]) } - pub fn unapply(p: Par) -> Option> { + pub fn unapply(p: &Par) -> Option> { if let Some(expr) = single_expr(&p) { if let Expr { expr_instance: Some(ExprInstance::EListBody(EList { ps, .. })), @@ -188,7 +188,7 @@ impl RhoMap { }]) } - pub fn unapply(p: Par) -> Option> { + pub fn unapply(p: &Par) -> Option> { if let Some(expr) = single_expr(&p) { if let Expr { expr_instance: Some(ExprInstance::EMapBody(emap)), @@ -421,7 +421,7 @@ where type RustType = (A::RustType, B::RustType); fn unapply(p: &Par) -> Option { - if let Some((p1, p2)) = RhoTuple2::unapply(p.clone()) { + if let Some((p1, p2)) = RhoTuple2::unapply(p) { if let (Some(a), Some(b)) = (A::unapply(&p1), B::unapply(&p2)) { return Some((a, b)); } @@ -437,7 +437,7 @@ where type RustType = Vec; fn unapply(p: &Par) -> Option { - if let Some(plist) = RhoList::unapply(p.clone()) { + if let Some(plist) = RhoList::unapply(p) { return plist.into_iter().map(|par| A::unapply(&par)).collect(); } None @@ -453,7 +453,7 @@ where type RustType = HashMap; fn unapply(p: &Par) -> Option { - if let Some(pmap) = RhoMap::unapply(p.clone()) { + if let Some(pmap) = RhoMap::unapply(p) { return pmap .into_iter() .map( From 7ebb40759741ebda38a305df3c67c3410592d115 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 7 Dec 2025 19:56:32 -0700 Subject: [PATCH 15/24] Hook up upsert entries --- .../src/rust/interpreter/chromadb_service.rs | 46 +++++++++++++---- rholang/src/rust/interpreter/rho_runtime.rs | 18 +++++++ .../src/rust/interpreter/system_processes.rs | 49 ++++++++++++++++++- 3 files changed, 103 insertions(+), 10 deletions(-) diff --git a/rholang/src/rust/interpreter/chromadb_service.rs b/rholang/src/rust/interpreter/chromadb_service.rs index 0358aa0c4..3b15649d2 100644 --- a/rholang/src/rust/interpreter/chromadb_service.rs +++ b/rholang/src/rust/interpreter/chromadb_service.rs @@ -8,7 +8,9 @@ use futures::TryFutureExt; use models::rhoapi::Par; use serde_json; -use crate::rust::interpreter::rho_type::{Extractor, RhoMap, RhoNil, RhoNumber, RhoString}; +use crate::rust::interpreter::rho_type::{ + Extractor, RhoMap, RhoNil, RhoNumber, RhoString, RhoTuple2, +}; use super::errors::InterpreterError; @@ -114,13 +116,39 @@ impl Extractor for Metadata { /// An entry in a collection. /// At the moment, the embeddings are calculated using the OpenAI embedding function. -pub struct CollectionEntry<'a> { - document: &'a str, +pub struct CollectionEntry { + document: String, metadata: Option, } +impl<'a> Extractor for CollectionEntry { + type RustType = CollectionEntry; + + fn unapply(p: &Par) -> Option { + let (document_par, metadata_par) = RhoTuple2::unapply(p)?; + let document = RhoString::unapply(&document_par)?; + let metadata = if metadata_par.is_nil() { + Some(None) + } else { + ::unapply(&metadata_par).map(Some) + }?; + Some(CollectionEntry { + document: document, + metadata, + }) + } +} + /// A mapping from a collection entry ID to the entry itself. -pub struct CollectionEntries<'a>(HashMap<&'a str, CollectionEntry<'a>>); +pub struct CollectionEntries(HashMap); + +impl Extractor for CollectionEntries { + type RustType = CollectionEntries; + + fn unapply(p: &Par) -> Option { + as Extractor>::unapply(p).map(CollectionEntries) + } +} pub struct ChromaDBService { client: ChromaClient, @@ -213,16 +241,16 @@ impl ChromaDBService { /// * `entries` - A mapping of entry ID to entry. /// /// The embeddings are auto generated using OpenAI embedding function. - pub async fn upsert_entries<'a>( + pub async fn upsert_entries( &self, collection_name: &str, - entries: CollectionEntries<'a>, + entries: CollectionEntries, ) -> Result<(), InterpreterError> { // Obtain the collection. let collection = self.get_collection(collection_name).await?; // Transform the input into the version that the API expects. - let mut ids_vec: Vec<&'a str> = Vec::with_capacity(entries.0.len()); + let mut ids_vec = Vec::with_capacity(entries.0.len()); let mut documents_vec = Vec::with_capacity(entries.0.len()); let mut metadatas_vec = Vec::with_capacity(entries.0.len()); for (entry_id, entry) in entries.0.into_iter() { @@ -231,8 +259,8 @@ impl ChromaDBService { metadatas_vec.push(entry.metadata.unwrap_or(Metadata(HashMap::new())).into()); } let dumb_entries = ChromaCollectionEntries { - ids: ids_vec, - documents: Some(documents_vec), + ids: ids_vec.iter().map(|x| x.as_str()).collect(), + documents: Some(documents_vec.iter().map(|x| x.as_str()).collect()), metadatas: Some(metadatas_vec), // The embedding are currently auto-filled by a pre-chosen embedding function. embeddings: None, diff --git a/rholang/src/rust/interpreter/rho_runtime.rs b/rholang/src/rust/interpreter/rho_runtime.rs index be1ab26d8..763b14109 100644 --- a/rholang/src/rust/interpreter/rho_runtime.rs +++ b/rholang/src/rust/interpreter/rho_runtime.rs @@ -856,6 +856,24 @@ fn std_rho_chroma_processes() -> Vec { }), remainder: None, }, + Definition { + urn: "rho:chroma:collection:entries:new".to_string(), + fixed_channel: FixedChannels::chroma_upsert_entries(), + arity: 3, + body_ref: BodyRefs::CHOMRA_UPSERT_ENTRIES, + handler: Box::new(|ctx| { + Box::new(move |args| { + let ctx = ctx.clone(); + Box::pin(async move { + ctx.system_processes + .clone() + .chroma_upsert_entries(args) + .await + }) + }) + }), + remainder: None, + } ] } diff --git a/rholang/src/rust/interpreter/system_processes.rs b/rholang/src/rust/interpreter/system_processes.rs index ac4e1342c..acdd64b5c 100644 --- a/rholang/src/rust/interpreter/system_processes.rs +++ b/rholang/src/rust/interpreter/system_processes.rs @@ -1,4 +1,4 @@ -use crate::rust::interpreter::chromadb_service::{ChromaDBService, Metadata}; +use crate::rust::interpreter::chromadb_service::{ChromaDBService, CollectionEntries, Metadata}; use crate::rust::interpreter::rho_type::{Extractor, RhoNil}; use super::contract_call::ContractCall; @@ -187,6 +187,10 @@ impl FixedChannels { byte_name(26) } + pub fn chroma_upsert_entries() -> Par { + byte_name(27) + } + // ChromaDB section end } @@ -216,6 +220,7 @@ impl BodyRefs { pub const DEV_NULL: i64 = 22; pub const CHROMA_CREATE_COLLECTION: i64 = 25; pub const CHROMA_GET_COLLECTION_META: i64 = 26; + pub const CHOMRA_UPSERT_ENTRIES: i64 = 27; } pub fn non_deterministic_ops() -> HashSet { @@ -226,6 +231,7 @@ pub fn non_deterministic_ops() -> HashSet { BodyRefs::RANDOM, BodyRefs::CHROMA_CREATE_COLLECTION, BodyRefs::CHROMA_GET_COLLECTION_META, + BodyRefs::CHOMRA_UPSERT_ENTRIES, ]) } @@ -1411,6 +1417,47 @@ impl SystemProcesses { } } + pub async fn chroma_upsert_entries( + &self, + contract_args: (Vec, bool, Vec), + ) -> Result, InterpreterError> { + let Some((produce, is_replay, previous_output, args)) = + self.is_contract_call().unapply(contract_args) + else { + return Err(illegal_argument_error("chroma_upsert_entries")); + }; + + let [collection_name_par, entries_par, ack] = args.as_slice() else { + return Err(illegal_argument_error("chroma_upsert_entries")); + }; + let (Some(collection_name), Some(entries)) = ( + RhoString::unapply(collection_name_par), + ::unapply(entries_par), + ) else { + return Err(illegal_argument_error("chroma_upsert_entries")); + }; + + // Common piece of code. + if is_replay { + produce(&previous_output, ack).await?; + return Ok(previous_output); + } + + let chromadb_service = self.chromadb_service.lock().await; + match chromadb_service + .upsert_entries(&collection_name, entries) + .await + { + Ok(_) => Ok(vec![]), + Err(e) => { + // TODO (chase): Is this right? It seems like other service methods do something similar. + let p = RhoString::create_par(collection_name); + produce(&[p], ack).await?; + return Err(e); + } + } + } + // ChromaDB section end } From 6f2f0dc6688924336baa37fd0e969b8c0503ca42 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 14 Dec 2025 16:56:21 -0700 Subject: [PATCH 16/24] Add helper for working with SBERT embeddings --- Cargo.lock | 514 +++++++++++++++++- rholang/Cargo.toml | 4 + rholang/src/rust/interpreter/util/mod.rs | 1 + .../rust/interpreter/util/sbert_embeddings.rs | 24 + 4 files changed, 540 insertions(+), 3 deletions(-) create mode 100644 rholang/src/rust/interpreter/util/sbert_embeddings.rs diff --git a/Cargo.lock b/Cargo.lock index c73e899f1..9cd7fb805 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,17 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "ahash" version = "0.7.8" @@ -49,6 +60,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -471,7 +488,7 @@ dependencies = [ "arrayvec", "cc", "cfg-if", - "constant_time_eq", + "constant_time_eq 0.3.1", ] [[package]] @@ -592,6 +609,48 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +[[package]] +name = "bzip2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.13+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14" +dependencies = [ + "cc", + "pkg-config", +] + +[[package]] +name = "cached-path" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa547071d682c054b998bbd527565da1704728e7c45e0243e6643f36d0cbe551" +dependencies = [ + "flate2", + "fs2", + "glob", + "indicatif", + "log", + "rand 0.8.5", + "reqwest 0.11.27", + "serde", + "serde_json", + "sha2", + "tar", + "tempfile", + "thiserror 1.0.69", + "zip 0.6.6", +] + [[package]] name = "casper" version = "0.1.0" @@ -697,6 +756,16 @@ dependencies = [ "windows-link 0.2.0", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clang-sys" version = "1.8.1" @@ -842,12 +911,31 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e47641d3deaf41fb1538ac1f54735925e275eaf3bf4d55c81b137fba797e5cbb" +[[package]] +name = "console" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b430743a6eb14e9764d4260d4c0d8123087d504eeb9c48f2b2a5e810dd369df4" +dependencies = [ + "encode_unicode", + "libc", + "once_cell", + "unicode-width 0.2.2", + "windows-sys 0.61.1", +] + [[package]] name = "const-oid" version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "constant_time_eq" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" + [[package]] name = "constant_time_eq" version = "0.3.1" @@ -1017,6 +1105,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde_core", +] + +[[package]] +name = "csv-core" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782" +dependencies = [ + "memchr", +] + [[package]] name = "curve25519-dalek" version = "4.1.3" @@ -1127,6 +1236,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -1215,6 +1345,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encode_unicode" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" + [[package]] name = "encoding" version = "0.2.33" @@ -1383,6 +1519,18 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" +[[package]] +name = "filetime" +version = "0.2.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc0505cd1b6fa6580283f6bdf70a73fcf4aba1184038c90902b92b3dd0df63ed" +dependencies = [ + "cfg-if", + "libc", + "libredox", + "windows-sys 0.60.2", +] + [[package]] name = "find-msvc-tools" version = "0.1.3" @@ -1448,6 +1596,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -1673,6 +1831,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" +dependencies = [ + "cfg-if", + "crunchy", + "zerocopy", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1698,6 +1867,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash 0.8.12", + "allocator-api2", ] [[package]] @@ -2202,6 +2372,27 @@ dependencies = [ "serde_core", ] +[[package]] +name = "indicatif" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d207dc617c7a380ab07ff572a6e52fa202a2a8f355860ac9c38e23f8196be1b" +dependencies = [ + "console", + "lazy_static", + "number_prefix", + "regex", +] + +[[package]] +name = "inout" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" +dependencies = [ + "generic-array", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -2224,6 +2415,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -2348,6 +2548,17 @@ dependencies = [ "windows-targets 0.53.4", ] +[[package]] +name = "libredox" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" +dependencies = [ + "bitflags 2.9.4", + "libc", + "redox_syscall", +] + [[package]] name = "libz-rs-sys" version = "0.5.2" @@ -2468,6 +2679,16 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "matrixmultiply" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06de3016e9fae57a36fd14dba131fccf49f74b40b7fbdb472f96e361ec71a08" +dependencies = [ + "autocfg", + "rawpointer", +] + [[package]] name = "memchr" version = "2.7.6" @@ -2709,6 +2930,19 @@ dependencies = [ "tempfile", ] +[[package]] +name = "ndarray" +version = "0.15.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adb12d4e967ec485a5f71c6311fe28158e9d6f4bc4a447b474184d0f91a8fa32" +dependencies = [ + "matrixmultiply", + "num-complex", + "num-integer", + "num-traits", + "rawpointer", +] + [[package]] name = "neli" version = "0.6.5" @@ -2885,6 +3119,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -2919,6 +3162,12 @@ dependencies = [ "libc", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "oid-registry" version = "0.7.1" @@ -3136,6 +3385,12 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "ordered-float" version = "3.9.2" @@ -3199,12 +3454,35 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "password-hash" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7676374caaee8a325c9e7a2ae557f216c5563a171d6997b0ef8a65af35147700" +dependencies = [ + "base64ct", + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "paste" version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pbkdf2" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83a0692ec44e4cf1ef28ca317f14f8f07da2d95ec3fa01f86e4467b725e60917" +dependencies = [ + "digest", + "hmac", + "password-hash", + "sha2", +] + [[package]] name = "pem" version = "3.0.5" @@ -3482,6 +3760,12 @@ dependencies = [ "prost 0.14.1", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "ptr_meta" version = "0.1.4" @@ -3731,6 +4015,12 @@ dependencies = [ "bitflags 2.9.4", ] +[[package]] +name = "rawpointer" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" + [[package]] name = "rayon" version = "1.11.0" @@ -3773,6 +4063,17 @@ dependencies = [ "bitflags 2.9.4", ] +[[package]] +name = "redox_users" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" +dependencies = [ + "getrandom 0.2.16", + "libredox", + "thiserror 1.0.69", +] + [[package]] name = "regex" version = "1.12.2" @@ -3924,10 +4225,13 @@ dependencies = [ name = "rholang" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", "bincode", "cc", "chromadb", "clap", + "console", "crypto", "dotenv", "futures", @@ -3945,6 +4249,7 @@ dependencies = [ "rayon", "regex", "rspace_plus_plus", + "rust-bert", "serde", "serde_json", "shared", @@ -4111,6 +4416,26 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rust-bert" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d42c5e4175577f25c58a4be357f09fc2aeb701093e861c41b7f60d1cbf7e61a3" +dependencies = [ + "cached-path", + "dirs", + "half", + "lazy_static", + "ordered-float 4.6.0", + "regex", + "rust_tokenizers", + "serde", + "serde_json", + "tch", + "thiserror 1.0.69", + "uuid", +] + [[package]] name = "rust-embed" version = "8.8.0" @@ -4161,6 +4486,26 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rust_tokenizers" +version = "8.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19599f60a688b5160247ee9c37a6af8b0c742ee8b160c5b44acc0f0eb265a59f" +dependencies = [ + "csv", + "hashbrown 0.14.5", + "itertools 0.11.0", + "lazy_static", + "protobuf", + "rayon", + "regex", + "serde", + "serde_json", + "thiserror 1.0.69", + "unicode-normalization", + "unicode-normalization-alignments", +] + [[package]] name = "rustc-hash" version = "2.1.1" @@ -4346,7 +4691,7 @@ dependencies = [ "nix", "radix_trie", "unicode-segmentation", - "unicode-width", + "unicode-width 0.1.14", "utf8parse", "winapi", ] @@ -4357,6 +4702,16 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "safetensors" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d93279b86b3de76f820a8854dd06cbc33cfa57a417b19c47f6a25280112fb1df" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "same-file" version = "1.0.6" @@ -4862,6 +5217,34 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tar" +version = "0.4.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a" +dependencies = [ + "filetime", + "libc", + "xattr", +] + +[[package]] +name = "tch" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3585f5bbf1ddf2498d7586bf870c7bb785a0bf1be09c54d0f93fce51d5f3c7fc" +dependencies = [ + "half", + "lazy_static", + "libc", + "ndarray", + "rand 0.8.5", + "safetensors", + "thiserror 1.0.69", + "torch-sys", + "zip 0.6.6", +] + [[package]] name = "tempfile" version = "3.23.0" @@ -5246,6 +5629,21 @@ dependencies = [ "tonic-prost", ] +[[package]] +name = "torch-sys" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef116d446d79bb2447748550baee86850d2d32d366cc9bdd4b217bdbe10cac63" +dependencies = [ + "anyhow", + "cc", + "libc", + "serde", + "serde_json", + "ureq", + "zip 0.6.6", +] + [[package]] name = "tower" version = "0.5.2" @@ -5518,6 +5916,24 @@ version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" +[[package]] +name = "unicode-normalization" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd4f6878c9cb28d874b009da9e8d183b5abc80117c40bbd187a1fde336be6e8" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-normalization-alignments" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43f613e4fa046e69818dd287fdc4bc78175ff20331479dab6e1b0f98d57062de" +dependencies = [ + "smallvec", +] + [[package]] name = "unicode-segmentation" version = "1.12.0" @@ -5530,12 +5946,36 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" +[[package]] +name = "unicode-width" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" + [[package]] name = "untrusted" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "ureq" +version = "2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02d1a66277ed75f640d608235660df48c8e3c19f3b4edb6a263315626cc3c01d" +dependencies = [ + "base64 0.22.1", + "flate2", + "log", + "once_cell", + "rustls 0.23.32", + "rustls-pki-types", + "serde", + "serde_json", + "url", + "webpki-roots 0.26.11", +] + [[package]] name = "url" version = "2.5.7" @@ -5617,7 +6057,7 @@ dependencies = [ "serde_json", "url", "utoipa", - "zip", + "zip 3.0.0", ] [[package]] @@ -5828,6 +6268,15 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.2", +] + [[package]] name = "webpki-roots" version = "1.0.2" @@ -6278,6 +6727,16 @@ dependencies = [ "time", ] +[[package]] +name = "xattr" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" +dependencies = [ + "libc", + "rustix", +] + [[package]] name = "xml-rs" version = "0.8.28" @@ -6433,6 +6892,26 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "zip" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261" +dependencies = [ + "aes", + "byteorder", + "bzip2", + "constant_time_eq 0.1.5", + "crc32fast", + "crossbeam-utils", + "flate2", + "hmac", + "pbkdf2", + "sha1", + "time", + "zstd", +] + [[package]] name = "zip" version = "3.0.0" @@ -6464,3 +6943,32 @@ dependencies = [ "log", "simd-adler32", ] + +[[package]] +name = "zstd" +version = "0.11.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "5.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/rholang/Cargo.toml b/rholang/Cargo.toml index 8fd148924..34939fb85 100644 --- a/rholang/Cargo.toml +++ b/rholang/Cargo.toml @@ -47,6 +47,10 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } chromadb = { version = "2.3.0", features = ["openai"] } serde_json = { workspace = true } +rust-bert = "0.23.0" +console = "0.16.0" +async-trait.workspace = true +anyhow = "1.0.100" [build-dependencies] cc = "1.0" diff --git a/rholang/src/rust/interpreter/util/mod.rs b/rholang/src/rust/interpreter/util/mod.rs index 8737734ac..a377320a4 100644 --- a/rholang/src/rust/interpreter/util/mod.rs +++ b/rholang/src/rust/interpreter/util/mod.rs @@ -8,6 +8,7 @@ use super::matcher::has_locally_free::HasLocallyFree; pub mod address_tools; pub mod base58; pub mod rev_address; +pub mod sbert_embeddings; // Helper enum. This is 'GeneratedMessage' in Scala #[derive(Clone, Debug)] diff --git a/rholang/src/rust/interpreter/util/sbert_embeddings.rs b/rholang/src/rust/interpreter/util/sbert_embeddings.rs new file mode 100644 index 000000000..438a084bd --- /dev/null +++ b/rholang/src/rust/interpreter/util/sbert_embeddings.rs @@ -0,0 +1,24 @@ +use anyhow; +use async_trait::async_trait; +use chromadb::embeddings::EmbeddingFunction; +use rust_bert::pipelines::sentence_embeddings::{ + SentenceEmbeddingsBuilder, SentenceEmbeddingsModelType, +}; + +// Helper SBERT embedding function to be used in ChromaDB. +pub struct SBERTEmbeddings {} + +#[async_trait] +impl EmbeddingFunction for SBERTEmbeddings { + async fn embed(&self, docs: &[&str]) -> anyhow::Result>> { + // TODO (chase): The embedding model shouldn't be created each time but stored inside ChromaDBService. + // However, the model cannot be easily shared between threads. + // See: https://github.com/guillaume-be/rust-bert/issues/389 + // TODO (chase): Are we supposed to be using a local model instead? + let sbert_embeddings = + SentenceEmbeddingsBuilder::remote(SentenceEmbeddingsModelType::AllMiniLmL6V2) + .create_model()?; + let res = sbert_embeddings.encode(docs)?; + Ok(res) + } +} From aa33b3d98b09bd576c167c364fef022712ef2596 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 14 Dec 2025 16:56:39 -0700 Subject: [PATCH 17/24] Add query method as well as support for SBERT --- .../src/rust/interpreter/chromadb_service.rs | 156 ++++++++++++++++-- rholang/src/rust/interpreter/rho_runtime.rs | 24 ++- .../src/rust/interpreter/system_processes.rs | 75 ++++++++- 3 files changed, 229 insertions(+), 26 deletions(-) diff --git a/rholang/src/rust/interpreter/chromadb_service.rs b/rholang/src/rust/interpreter/chromadb_service.rs index 3b15649d2..d399302ba 100644 --- a/rholang/src/rust/interpreter/chromadb_service.rs +++ b/rholang/src/rust/interpreter/chromadb_service.rs @@ -1,15 +1,19 @@ use std::collections::HashMap; use chromadb::{ - client::ChromaClientOptions, collection::CollectionEntries as ChromaCollectionEntries, - embeddings::openai::OpenAIEmbeddings, ChromaClient, ChromaCollection, + client::ChromaClientOptions, + collection::{CollectionEntries as ChromaCollectionEntries, QueryOptions}, + embeddings::{openai::OpenAIEmbeddings, EmbeddingFunction}, + ChromaClient, ChromaCollection, }; use futures::TryFutureExt; +use itertools::izip; use models::rhoapi::Par; use serde_json; -use crate::rust::interpreter::rho_type::{ - Extractor, RhoMap, RhoNil, RhoNumber, RhoString, RhoTuple2, +use crate::rust::interpreter::{ + rho_type::{Extractor, RhoMap, RhoNil, RhoNumber, RhoString, RhoTuple2}, + util::sbert_embeddings::SBERTEmbeddings, }; use super::errors::InterpreterError; @@ -86,6 +90,18 @@ impl Into for MetadataValue { #[derive(Clone, PartialEq, Eq, Debug)] pub struct Metadata(HashMap); +impl Metadata { + fn from_json_map( + json_map: serde_json::Map, + ) -> Result { + json_map + .into_iter() + .map(|(key, val)| MetadataValue::from_value(val).map(move |res| (key.clone(), res))) + .collect::, _>>() + .map(Metadata) + } +} + impl Into> for Metadata { fn into(self) -> serde_json::Map { self.0 @@ -139,6 +155,21 @@ impl<'a> Extractor for CollectionEntry { } } +impl Into for CollectionEntry { + fn into(self) -> Par { + RhoMap::create_par(HashMap::from([ + ( + RhoString::create_par("document".to_string()), + RhoString::create_par(self.document), + ), + ( + RhoString::create_par("metadata".to_string()), + self.metadata.map_or(RhoNil::create_par(), Into::into), + ), + ])) + } +} + /// A mapping from a collection entry ID to the entry itself. pub struct CollectionEntries(HashMap); @@ -150,6 +181,17 @@ impl Extractor for CollectionEntries { } } +impl Into for CollectionEntries { + fn into(self) -> Par { + RhoMap::create_par( + self.0 + .into_iter() + .map(|(key, val)| (RhoString::create_par(key), val.into())) + .collect(), + ) + } +} + pub struct ChromaDBService { client: ChromaClient, } @@ -220,15 +262,7 @@ impl ChromaDBService { .map_ok(|collection| collection.metadata().cloned()) .await?; match metadata { - Some(meta) => { - let res = meta - .into_iter() - .map(|(key, val)| { - MetadataValue::from_value(val).map(move |res| (key.clone(), res)) - }) - .collect::, _>>()?; - Ok(Some(Metadata(res))) - } + Some(meta) => Ok(Some(Metadata::from_json_map(meta)?)), None => Ok(None), } } @@ -239,12 +273,14 @@ impl ChromaDBService { /// /// * `collection_name` - The name of the collection to create /// * `entries` - A mapping of entry ID to entry. + /// * `use_openai_embeddings` - Set to true if the embeddings should be generated via OpenAI instead of SBERT. /// - /// The embeddings are auto generated using OpenAI embedding function. + /// The embeddings are auto generated using SBERT (default) or OpenAI (if specified). pub async fn upsert_entries( &self, collection_name: &str, entries: CollectionEntries, + use_openai_embeddings: bool, ) -> Result<(), InterpreterError> { // Obtain the collection. let collection = self.get_collection(collection_name).await?; @@ -266,10 +302,13 @@ impl ChromaDBService { embeddings: None, }; - // We'll use OpenAI to generate embeddings. - let embeddingsf = OpenAIEmbeddings::new(Default::default()); + let embeddingsf: Box = if use_openai_embeddings { + Box::new(OpenAIEmbeddings::new(Default::default())) + } else { + Box::new(SBERTEmbeddings {}) + }; collection - .upsert(dumb_entries, Some(Box::new(embeddingsf))) + .upsert(dumb_entries, Some(embeddingsf)) .await .map_err(|err| { InterpreterError::ChromaDBError(format!( @@ -280,6 +319,89 @@ impl ChromaDBService { Ok(()) } + /// Upserts the given entries into the identified collection. See [`ChromaCollection::query`] + /// + /// # Arguments + /// + /// * `collection_name` - The name of the collection to create + /// * `doc_texts` - The document texts to get the closest neighbors of. + /// * `use_openai_embeddings` - Set to true if the embeddings should be generated via OpenAI instead of SBERT. + /// + /// The embeddings are auto generated using SBERT (default) or OpenAI (if specified). + /// NOTE: If there are any matching documents with metadata that could not be deserialized (i.e contains floats), + /// the metadata will be none. + pub async fn query( + &self, + collection_name: &str, + doc_texts: Vec<&str>, + use_openai_embeddings: bool, + ) -> Result, InterpreterError> { + // Obtain the collection. + let collection = self.get_collection(collection_name).await?; + + let query_options = QueryOptions { + query_texts: Some(doc_texts), + query_embeddings: None, + n_results: None, + where_metadata: None, + where_document: None, + // We don't need the "distances". + include: Some(vec!["documents", "metadatas"]), + }; + + let embeddingsf: Box = if use_openai_embeddings { + Box::new(OpenAIEmbeddings::new(Default::default())) + } else { + Box::new(SBERTEmbeddings {}) + }; + + let raw_res = collection + .query(query_options, Some(embeddingsf)) + .await + .map_err(|err| { + InterpreterError::ChromaDBError(format!( + "Failed to upsert entries in collection {collection_name}: {}", + err + )) + })?; + let doc_ids_per_text = raw_res.ids; + let docs_per_text = raw_res + .documents + .ok_or(InterpreterError::ChromaDBError(format!( + "Expected field documents in query result; for collection {collection_name}" + )))?; + let metadatas_per_text = + raw_res + .metadatas + .ok_or(InterpreterError::ChromaDBError(format!( + "Expected field metadatas in query result; for collection {collection_name}" + )))?; + let entries_per_text = izip!(doc_ids_per_text, docs_per_text, metadatas_per_text) + .map( + |(doc_ids, docs, metadatas)| -> HashMap { + izip!(doc_ids, docs, metadatas) + .map(|(id, document, metadata)| -> (String, CollectionEntry) { + ( + id, + CollectionEntry { + document, + // Metadata deserialization causes the metadata to not be returned. + // Silent errors are terrible but there's no good way to do this. We don't want + // to drop the entire query result because of one metadata, but Rholang doesn't + // have rich error types. So we also can't have a Result<> for each metadata field. + metadata: metadata + .and_then(|meta| Metadata::from_json_map(meta).ok()), + }, + ) + }) + .collect() + }, + ) + .map(CollectionEntries) + .collect(); + Ok(entries_per_text) + } + /* TODO (chase): Other potential collection related methods: - rename collection (not that necessary?) - list collections (bad idea probably) diff --git a/rholang/src/rust/interpreter/rho_runtime.rs b/rholang/src/rust/interpreter/rho_runtime.rs index 763b14109..098bfdd6e 100644 --- a/rholang/src/rust/interpreter/rho_runtime.rs +++ b/rholang/src/rust/interpreter/rho_runtime.rs @@ -859,8 +859,8 @@ fn std_rho_chroma_processes() -> Vec { Definition { urn: "rho:chroma:collection:entries:new".to_string(), fixed_channel: FixedChannels::chroma_upsert_entries(), - arity: 3, - body_ref: BodyRefs::CHOMRA_UPSERT_ENTRIES, + arity: 4, + body_ref: BodyRefs::CHROMA_UPSERT_ENTRIES, handler: Box::new(|ctx| { Box::new(move |args| { let ctx = ctx.clone(); @@ -873,7 +873,25 @@ fn std_rho_chroma_processes() -> Vec { }) }), remainder: None, - } + }, + Definition { + urn: "rho:chroma:collection:query".to_string(), + fixed_channel: FixedChannels::chroma_query(), + arity: 4, + body_ref: BodyRefs::CHROMA_QUERY, + handler: Box::new(|ctx| { + Box::new(move |args| { + let ctx = ctx.clone(); + Box::pin(async move { + ctx.system_processes + .clone() + .chroma_query(args) + .await + }) + }) + }), + remainder: None, + }, ] } diff --git a/rholang/src/rust/interpreter/system_processes.rs b/rholang/src/rust/interpreter/system_processes.rs index acdd64b5c..a18c99de0 100644 --- a/rholang/src/rust/interpreter/system_processes.rs +++ b/rholang/src/rust/interpreter/system_processes.rs @@ -1,5 +1,5 @@ use crate::rust::interpreter::chromadb_service::{ChromaDBService, CollectionEntries, Metadata}; -use crate::rust::interpreter::rho_type::{Extractor, RhoNil}; +use crate::rust::interpreter::rho_type::{Extractor, RhoList, RhoNil}; use super::contract_call::ContractCall; use super::dispatch::RhoDispatch; @@ -191,6 +191,10 @@ impl FixedChannels { byte_name(27) } + pub fn chroma_query() -> Par { + byte_name(28) + } + // ChromaDB section end } @@ -220,7 +224,8 @@ impl BodyRefs { pub const DEV_NULL: i64 = 22; pub const CHROMA_CREATE_COLLECTION: i64 = 25; pub const CHROMA_GET_COLLECTION_META: i64 = 26; - pub const CHOMRA_UPSERT_ENTRIES: i64 = 27; + pub const CHROMA_UPSERT_ENTRIES: i64 = 27; + pub const CHROMA_QUERY: i64 = 28; } pub fn non_deterministic_ops() -> HashSet { @@ -231,7 +236,8 @@ pub fn non_deterministic_ops() -> HashSet { BodyRefs::RANDOM, BodyRefs::CHROMA_CREATE_COLLECTION, BodyRefs::CHROMA_GET_COLLECTION_META, - BodyRefs::CHOMRA_UPSERT_ENTRIES, + BodyRefs::CHROMA_UPSERT_ENTRIES, + BodyRefs::CHROMA_QUERY, ]) } @@ -1427,12 +1433,13 @@ impl SystemProcesses { return Err(illegal_argument_error("chroma_upsert_entries")); }; - let [collection_name_par, entries_par, ack] = args.as_slice() else { + let [collection_name_par, entries_par, use_openai_par, ack] = args.as_slice() else { return Err(illegal_argument_error("chroma_upsert_entries")); }; - let (Some(collection_name), Some(entries)) = ( + let (Some(collection_name), Some(entries), Some(use_openai_embeddings)) = ( RhoString::unapply(collection_name_par), ::unapply(entries_par), + RhoBoolean::unapply(use_openai_par), ) else { return Err(illegal_argument_error("chroma_upsert_entries")); }; @@ -1445,7 +1452,7 @@ impl SystemProcesses { let chromadb_service = self.chromadb_service.lock().await; match chromadb_service - .upsert_entries(&collection_name, entries) + .upsert_entries(&collection_name, entries, use_openai_embeddings) .await { Ok(_) => Ok(vec![]), @@ -1458,6 +1465,62 @@ impl SystemProcesses { } } + pub async fn chroma_query( + &self, + contract_args: (Vec, bool, Vec), + ) -> Result, InterpreterError> { + let Some((produce, is_replay, previous_output, args)) = + self.is_contract_call().unapply(contract_args) + else { + return Err(illegal_argument_error("chroma_query")); + }; + + let [collection_name_par, doc_texts_par, use_openai_par, ack] = args.as_slice() else { + return Err(illegal_argument_error("chroma_query")); + }; + let (Some(collection_name), Some(doc_texts), Some(use_openai_embeddings)) = ( + RhoString::unapply(collection_name_par), + as Extractor>::unapply(doc_texts_par), + RhoBoolean::unapply(use_openai_par), + ) else { + return Err(illegal_argument_error("chroma_query")); + }; + + // Common piece of code. + if is_replay { + produce(&previous_output, ack).await?; + return Ok(previous_output); + } + + let chromadb_service = self.chromadb_service.lock().await; + match chromadb_service + .query( + &collection_name, + doc_texts.iter().map(|s| s.as_ref()).collect(), + use_openai_embeddings, + ) + .await + { + Ok(res) => { + let result_par_vec: Vec = res + .into_iter() + .map(Into::into) + .collect(); + let result_par = RhoList::create_par(result_par_vec); + + let output = vec![result_par]; + produce(&output, &ack).await?; + Ok(output) + } + Err(e) => { + // TODO (chase): Is this right? It seems like other service methods do something similar. + let p = RhoString::create_par(collection_name); + produce(&[p], ack).await?; + return Err(e); + } + } + } + // ChromaDB section end } From ffa9feee4ffcf1ec9f6443b6a12ea78aae5f48d0 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 14 Dec 2025 18:20:56 -0700 Subject: [PATCH 18/24] Fix CollectionEntry to Rho Par conversion --- rholang/src/rust/interpreter/chromadb_service.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/rholang/src/rust/interpreter/chromadb_service.rs b/rholang/src/rust/interpreter/chromadb_service.rs index d399302ba..2abc70156 100644 --- a/rholang/src/rust/interpreter/chromadb_service.rs +++ b/rholang/src/rust/interpreter/chromadb_service.rs @@ -157,16 +157,10 @@ impl<'a> Extractor for CollectionEntry { impl Into for CollectionEntry { fn into(self) -> Par { - RhoMap::create_par(HashMap::from([ - ( - RhoString::create_par("document".to_string()), - RhoString::create_par(self.document), - ), - ( - RhoString::create_par("metadata".to_string()), - self.metadata.map_or(RhoNil::create_par(), Into::into), - ), - ])) + RhoTuple2::create_par(( + RhoString::create_par(self.document), + self.metadata.map_or(RhoNil::create_par(), Into::into), + )) } } From 964c75585c82efd39acbf188e5049bf5f2420d32 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Tue, 16 Dec 2025 22:14:51 -0700 Subject: [PATCH 19/24] Minor fixes and updates --- rholang/src/rust/interpreter/rho_runtime.rs | 9 ++---- .../src/rust/interpreter/system_processes.rs | 31 ++++++++----------- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/rholang/src/rust/interpreter/rho_runtime.rs b/rholang/src/rust/interpreter/rho_runtime.rs index 098bfdd6e..600121174 100644 --- a/rholang/src/rust/interpreter/rho_runtime.rs +++ b/rholang/src/rust/interpreter/rho_runtime.rs @@ -875,19 +875,14 @@ fn std_rho_chroma_processes() -> Vec { remainder: None, }, Definition { - urn: "rho:chroma:collection:query".to_string(), + urn: "rho:chroma:collection:entries:query".to_string(), fixed_channel: FixedChannels::chroma_query(), arity: 4, body_ref: BodyRefs::CHROMA_QUERY, handler: Box::new(|ctx| { Box::new(move |args| { let ctx = ctx.clone(); - Box::pin(async move { - ctx.system_processes - .clone() - .chroma_query(args) - .await - }) + Box::pin(async move { ctx.system_processes.clone().chroma_query(args).await }) }) }), remainder: None, diff --git a/rholang/src/rust/interpreter/system_processes.rs b/rholang/src/rust/interpreter/system_processes.rs index a18c99de0..23c69ca46 100644 --- a/rholang/src/rust/interpreter/system_processes.rs +++ b/rholang/src/rust/interpreter/system_processes.rs @@ -1369,14 +1369,17 @@ impl SystemProcesses { .create_collection(&collection_name, ignore_or_update_if_exists, metadata) .await { - Ok(_) => Ok(vec![]), + Ok(_) => (), Err(e) => { - // TODO (chase): Is this right? It seems like other service methods do something similar. let p = RhoString::create_par(collection_name); produce(&[p], ack).await?; return Err(e); } - } + }; + + let output = vec![Par::default()]; + produce(&output, ack).await?; + Ok(output) } pub async fn chroma_get_collection_meta( @@ -1451,18 +1454,13 @@ impl SystemProcesses { } let chromadb_service = self.chromadb_service.lock().await; - match chromadb_service + chromadb_service .upsert_entries(&collection_name, entries, use_openai_embeddings) - .await - { - Ok(_) => Ok(vec![]), - Err(e) => { - // TODO (chase): Is this right? It seems like other service methods do something similar. - let p = RhoString::create_par(collection_name); - produce(&[p], ack).await?; - return Err(e); - } - } + .await?; + // TODO (chase): Is this right? It seems like other service methods do something similar. + let p = RhoString::create_par(collection_name); + produce(&[p], ack).await?; + Ok(vec![]) } pub async fn chroma_query( @@ -1502,10 +1500,7 @@ impl SystemProcesses { .await { Ok(res) => { - let result_par_vec: Vec = res - .into_iter() - .map(Into::into) - .collect(); + let result_par_vec: Vec = res.into_iter().map(Into::into).collect(); let result_par = RhoList::create_par(result_par_vec); let output = vec![result_par]; From a6cac4eeeb465da8900aba5bfd22a02605bf8335 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Tue, 16 Dec 2025 22:15:05 -0700 Subject: [PATCH 20/24] Runnable Examples --- .../chroma-db/01-create-collection.rho | 6 +++--- .../chroma-db/02-create-collection-meta.rho | 2 +- .../chroma-db/03-get-collection-meta.rho | 6 ++++++ .../chroma-db/04-upsert-entries.rho | 16 ++++++++++++++++ .../chroma-db/05-query-entries.rho | 6 ++++++ 5 files changed, 32 insertions(+), 4 deletions(-) create mode 100644 rholang/examples/system-contract/chroma-db/03-get-collection-meta.rho create mode 100644 rholang/examples/system-contract/chroma-db/04-upsert-entries.rho create mode 100644 rholang/examples/system-contract/chroma-db/05-query-entries.rho diff --git a/rholang/examples/system-contract/chroma-db/01-create-collection.rho b/rholang/examples/system-contract/chroma-db/01-create-collection.rho index 4a6ff8be4..ea71bbb53 100644 --- a/rholang/examples/system-contract/chroma-db/01-create-collection.rho +++ b/rholang/examples/system-contract/chroma-db/01-create-collection.rho @@ -1,6 +1,6 @@ new createCollection(`rho:chroma:collection:new`), stdout(`rho:io:stdout`), retCh in { - createCollection!("foo", true, Nil, *retCh) | - for(@res <- retCh) { - stdout!("Creation succeeded!") + createCollection!("bar", true, Nil, *retCh) | + for(@ok <- retCh) { + stdout!(ok) } } \ No newline at end of file diff --git a/rholang/examples/system-contract/chroma-db/02-create-collection-meta.rho b/rholang/examples/system-contract/chroma-db/02-create-collection-meta.rho index e654244da..47ee59a0c 100644 --- a/rholang/examples/system-contract/chroma-db/02-create-collection-meta.rho +++ b/rholang/examples/system-contract/chroma-db/02-create-collection-meta.rho @@ -1,6 +1,6 @@ new createCollection(`rho:chroma:collection:new`), stdout(`rho:io:stdout`), retCh in { createCollection!("foo", true, {"meta1" : 1, "two" : "42", "three" : 42, "meta2": "bar"}, *retCh) | for(@res <- retCh) { - stdout!("Creation succeeded!") + stdout!(res) } } \ No newline at end of file diff --git a/rholang/examples/system-contract/chroma-db/03-get-collection-meta.rho b/rholang/examples/system-contract/chroma-db/03-get-collection-meta.rho new file mode 100644 index 000000000..137ec9049 --- /dev/null +++ b/rholang/examples/system-contract/chroma-db/03-get-collection-meta.rho @@ -0,0 +1,6 @@ +new getCollectionMeta(`rho:chroma:collection:meta`), stdout(`rho:io:stdout`), retCh in { + getCollectionMeta!("foo", *retCh) | + for(@res <- retCh) { + stdout!(res) + } +} \ No newline at end of file diff --git a/rholang/examples/system-contract/chroma-db/04-upsert-entries.rho b/rholang/examples/system-contract/chroma-db/04-upsert-entries.rho new file mode 100644 index 000000000..532ec911b --- /dev/null +++ b/rholang/examples/system-contract/chroma-db/04-upsert-entries.rho @@ -0,0 +1,16 @@ +new upsertEntries(`rho:chroma:collection:entries:new`), stdout(`rho:io:stdout`), retCh in { + upsertEntries!( + "foo", + { "doc1": ("Hello world!", Nil), + "doc2": ( + "Hello world again!", + { "meta1": "42" } + ) + }, + true, + *retCh + ) | + for(@res <- retCh) { + stdout!(res) + } +} \ No newline at end of file diff --git a/rholang/examples/system-contract/chroma-db/05-query-entries.rho b/rholang/examples/system-contract/chroma-db/05-query-entries.rho new file mode 100644 index 000000000..90ca1390d --- /dev/null +++ b/rholang/examples/system-contract/chroma-db/05-query-entries.rho @@ -0,0 +1,6 @@ +new queryEntries(`rho:chroma:collection:entries:query`), stdout(`rho:io:stdout`), retCh in { + queryEntries!("foo", [ "Hello world" ], true, *retCh) | + for(@res <- retCh) { + stdout!(res) + } +} \ No newline at end of file From 02b5bbd06078a64e1f5ed14c30f590151fbe8eec Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 21 Dec 2025 23:16:41 -0700 Subject: [PATCH 21/24] Add tests for ChromaDB --- .../src/rust/interpreter/chromadb_service.rs | 10 +- .../tests/external_services/chroma_db_spec.rs | 199 ++++++++++++++++++ rholang/tests/external_services/mod.rs | 1 + rholang/tests/mod.rs | 1 + 4 files changed, 209 insertions(+), 2 deletions(-) create mode 100644 rholang/tests/external_services/chroma_db_spec.rs create mode 100644 rholang/tests/external_services/mod.rs diff --git a/rholang/src/rust/interpreter/chromadb_service.rs b/rholang/src/rust/interpreter/chromadb_service.rs index 2abc70156..01d8a4234 100644 --- a/rholang/src/rust/interpreter/chromadb_service.rs +++ b/rholang/src/rust/interpreter/chromadb_service.rs @@ -90,6 +90,12 @@ impl Into for MetadataValue { #[derive(Clone, PartialEq, Eq, Debug)] pub struct Metadata(HashMap); +impl From<[(String, MetadataValue); N]> for Metadata { + fn from(x: [(String, MetadataValue); N]) -> Self { + Self(HashMap::from(x)) + } +} + impl Metadata { fn from_json_map( json_map: serde_json::Map, @@ -133,8 +139,8 @@ impl Extractor for Metadata { /// An entry in a collection. /// At the moment, the embeddings are calculated using the OpenAI embedding function. pub struct CollectionEntry { - document: String, - metadata: Option, + pub document: String, + pub metadata: Option, } impl<'a> Extractor for CollectionEntry { diff --git a/rholang/tests/external_services/chroma_db_spec.rs b/rholang/tests/external_services/chroma_db_spec.rs new file mode 100644 index 000000000..3e31a0e0f --- /dev/null +++ b/rholang/tests/external_services/chroma_db_spec.rs @@ -0,0 +1,199 @@ +use models::rhoapi::{expr, Expr, Par}; +use rholang::rust::interpreter::accounting::costs::{parsing_cost, subtraction_cost_with_value}; +use rholang::rust::interpreter::chromadb_service::{self, CollectionEntry, MetadataValue}; +use rholang::rust::interpreter::rho_type::{RhoList, RhoNil, RhoNumber}; +use rholang::rust::interpreter::{ + errors::InterpreterError, + interpreter::EvaluateResult, + rho_runtime::{RhoRuntime, RhoRuntimeImpl}, + storage::storage_printer, + test_utils::resources::with_runtime, +}; +use std::collections::HashSet; + +fn storage_contents(runtime: &RhoRuntimeImpl) -> String { + storage_printer::pretty_print(runtime) +} + +async fn success(runtime: &mut RhoRuntimeImpl, term: &str) -> Result<(), InterpreterError> { + execute(runtime, term).await.map(|res| { + assert!( + res.errors.is_empty(), + "{}", + format!("Execution failed for: {}. Cause: {:?}", term, res.errors) + ) + }) +} + +async fn failure(runtime: &mut RhoRuntimeImpl, term: &str) -> Result<(), InterpreterError> { + execute(runtime, term).await.map(|res| { + assert!( + !res.errors.is_empty(), + "Expected {} to fail - it didn't.", + term + ) + }) +} + +async fn execute( + runtime: &mut RhoRuntimeImpl, + term: &str, +) -> Result { + runtime.evaluate_with_term(term).await +} + +#[tokio::test] +async fn collection_should_yield_correct_meta_after_creation() { + let meta_contract = r#" + new createCollection(`rho:chroma:collection:new`), + getCollectionMeta(`rho:chroma:collection:meta`), + stdout(`rho:io:stdout`), createRet, metaRet in { + createCollection!("test-collection", true, {"meta1" : 1, "two" : "42", "three" : 42, "meta2": "bar"}, *createRet) | + for(@res <- createRet) { + getCollectionMeta!("test-collection", *metaRet) | + for(@res <- metaRet) { + @0!(res) + } + } + } + "#; + + test_runtime( + meta_contract, + Some(chromadb_service::Metadata::from([ + ("meta1".to_string(), MetadataValue::NumberMeta(1)), + ("two".to_string(), MetadataValue::StringMeta("42".to_string())), + ("three".to_string(), MetadataValue::NumberMeta(42)), + ("meta2".to_string(), MetadataValue::StringMeta("bar".to_string())), + ]) + .into()), + ).await +} + +#[tokio::test] +async fn collection_should_yield_correct_meta_after_creation_empty() { + let meta_contract = r#" + new createCollection(`rho:chroma:collection:new`), + getCollectionMeta(`rho:chroma:collection:meta`), + createRet, metaRet in { + createCollection!("test-collection-nil-meta", true, Nil, *createRet) | + for(@res <- createRet) { + getCollectionMeta!("test-collection-nil-meta", *metaRet) | + for(@res <- metaRet) { + @0!(res) + } + } + } + "#; + + test_runtime(meta_contract, Some(RhoNil::create_par())).await +} + +#[tokio::test] +async fn entry_should_be_queried() { + let meta_contract = r#" + new createCollection(`rho:chroma:collection:new`), + upsertEntries(`rho:chroma:collection:entries:new`), + queryEntries(`rho:chroma:collection:entries:query`), + createRet, upsertRet, queryRet in { + createCollection!("test-collection-entries", true, Nil, *createRet) | + for(@x <- createRet) { + upsertEntries!( + "foo", + { "doc1": ("Hello world!", Nil), + "doc2": ( + "Hello world again!", + { "meta1": "42" } + ) + }, + true, + *upsertRet + ) + } | + for(@y <- upsertRet) { + queryEntries!("test-collection-entries", [ "Hello world" ], true, *queryRet) + } | + for(@res <- queryRet) { + @0!(res) + } + } + "#; + + test_runtime( + meta_contract, + Some(RhoList::create_par(vec![ + CollectionEntry { + document: "Hello world!".to_string(), + metadata: None, + } + .into(), + CollectionEntry { + document: "Hello world again!".to_string(), + metadata: Some(chromadb_service::Metadata::from([( + "meta2".to_string(), + MetadataValue::StringMeta("42".to_string()), + )])), + }.into(), + ])), + ).await +} + +#[tokio::test] +async fn query_should_return_empty() { + let meta_contract = r#" + new createCollection(`rho:chroma:collection:new`), + upsertEntries(`rho:chroma:collection:entries:new`), + queryEntries(`rho:chroma:collection:entries:query`), + createRet, upsertRet, queryRet in { + createCollection!("test-collection-entries-empty", true, Nil, *createRet) | + for(@x <- createRet) { + upsertEntries!( + "foo", + { "doc1": ("Hello world!", Nil), + "doc2": ( + "Hello world again!", + { "meta1": "42" } + ) + }, + true, + *upsertRet + ) + } | + for(@y <- upsertRet) { + queryEntries!("test-collection-entries-empty", [ "None" ], true, *queryRet) + } | + for(@res <- queryRet) { + @0!(res) + } + } + "#; + + test_runtime( + meta_contract, + Some(RhoList::create_par(vec![])), + ).await +} + +async fn test_runtime(contract: &str, expected: Option) { + with_runtime("interpreter-spec-", |mut runtime| async move { + success(&mut runtime, contract).await.unwrap(); + + let tuple_space = runtime.get_hot_changes(); + + fn rho_int(n: i64) -> Vec { + vec![RhoNumber::create_par(n)] + } + + let ch_zero = rho_int(0); + println!("ch_zero: {:?}", ch_zero); + + let tuple_space_data = tuple_space.get(&ch_zero); + println!("tuple_space_data: {:?}", tuple_space_data); + + let results = tuple_space_data + .map(|row| row.data[0].a.pars[0].clone()); + + assert_eq!(results, expected); + }) + .await +} diff --git a/rholang/tests/external_services/mod.rs b/rholang/tests/external_services/mod.rs new file mode 100644 index 000000000..62558d4db --- /dev/null +++ b/rholang/tests/external_services/mod.rs @@ -0,0 +1 @@ +mod chroma_db_spec; diff --git a/rholang/tests/mod.rs b/rholang/tests/mod.rs index 1fb4711d9..09ae19aa3 100644 --- a/rholang/tests/mod.rs +++ b/rholang/tests/mod.rs @@ -1,5 +1,6 @@ mod accounting; mod crypto_channels_spec; +mod external_services; mod interpreter_spec; mod matcher; mod parser_test; From 3bdeeeede5e6a87a9ae30c0648a67ddd1693a387 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 21 Dec 2025 23:16:52 -0700 Subject: [PATCH 22/24] Add SBERT downloader --- sbert-downloader/Dockerfile | 10 ++++++++++ sbert-downloader/README.md | 7 +++++++ sbert-downloader/downloader.py | 21 +++++++++++++++++++++ 3 files changed, 38 insertions(+) create mode 100644 sbert-downloader/Dockerfile create mode 100644 sbert-downloader/README.md create mode 100644 sbert-downloader/downloader.py diff --git a/sbert-downloader/Dockerfile b/sbert-downloader/Dockerfile new file mode 100644 index 000000000..f58121d74 --- /dev/null +++ b/sbert-downloader/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.11-slim + +RUN pip install --no-cache-dir \ + transformers \ + torch --extra-index-url https://download.pytorch.org/whl/cpu + +WORKDIR /sbert-downloader +COPY downloader.py . + +CMD ["python", "downloader.py"] \ No newline at end of file diff --git a/sbert-downloader/README.md b/sbert-downloader/README.md new file mode 100644 index 000000000..98e99aa7f --- /dev/null +++ b/sbert-downloader/README.md @@ -0,0 +1,7 @@ +# SBERT-downloader + +This sets up a helper script to download an SBERT model (see [`downloader.py`](./downloader.py)) and save it locally. The script is meant to be run with the provided [Dockerfile](./Dockerfile), which should be composed as a service (see: [`shard-with-autopropose.yml`](../docker/shard-with-autopropose.yml)). In this way, docker-compose would be able to run the script in a container to download the image and share it via a volume. + +See the docker-compose file linked above to see where the model is stored. This path will be needed to set the ENV var `SBERT_PATH` for use by the Rholang interpreter. + +NOTE: The downloaded pytorch is set to the CPU only index-url to avoid downloading the CUDA library alongside (which takes up a lot of space). diff --git a/sbert-downloader/downloader.py b/sbert-downloader/downloader.py new file mode 100644 index 000000000..e043e3fdd --- /dev/null +++ b/sbert-downloader/downloader.py @@ -0,0 +1,21 @@ +from transformers import AutoTokenizer, AutoModel +from pathlib import Path +import sys + +MODEL_NAME = "google-bert/bert-base-uncased" +OUT_DIR = Path("/models/bert-base-uncased") + +# Simple existence check +if (OUT_DIR / "config.json").exists(): + print("BERT model already present, skipping download.") + sys.exit(0) + +OUT_DIR.mkdir(parents=True, exist_ok=True) + +tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) +model = AutoModel.from_pretrained(MODEL_NAME) + +tokenizer.save_pretrained(OUT_DIR) +model.save_pretrained(OUT_DIR) + +print("BERT model downloaded.") From 35c0f754c335562885c979bab1e27b6c078fd58e Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 21 Dec 2025 23:20:41 -0700 Subject: [PATCH 23/24] Add chromadb and SBERT downloader to docker compose --- .gitignore | 8 ++++ docker/shard-with-autopropose.yml | 19 ++++++++- .../tests/external_services/chroma_db_spec.rs | 39 +++++++++++-------- 3 files changed, 49 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index ac422d001..1af2f784b 100644 --- a/.gitignore +++ b/.gitignore @@ -51,6 +51,14 @@ boot/ rholang/bnfc/ rholang/src/main/java/ +# SBERT +######### +models/ + +# ChromaDB +############# +chroma_data/ + # Secp256k1 ################### crypto/secp256k1/ diff --git a/docker/shard-with-autopropose.yml b/docker/shard-with-autopropose.yml index 06e0f177f..b7da389d9 100644 --- a/docker/shard-with-autopropose.yml +++ b/docker/shard-with-autopropose.yml @@ -33,6 +33,23 @@ services: - ./certs/bootstrap/node.certificate.pem:/var/lib/rnode/node.certificate.pem:ro - ./certs/bootstrap/node.key.pem:/var/lib/rnode/node.key.pem:ro + chroma-db: + <<: *default-rnode + image: chromadb/chroma + volumes: + - ./chroma_data:/data + ports: + - "8000:8000" + + # Download and cache the SBERT model to be used + sbert-init: + <<: *default-rnode + build: + context: ../sbert-downloader + dockerfile: Dockerfile + volumes: + - ./models:/models + validator1: <<: *default-rnode container_name: $VALIDATOR1_HOST @@ -183,4 +200,4 @@ services: networks: f1r3fly: - driver: bridge \ No newline at end of file + driver: bridge \ No newline at end of file diff --git a/rholang/tests/external_services/chroma_db_spec.rs b/rholang/tests/external_services/chroma_db_spec.rs index 3e31a0e0f..42d4553b2 100644 --- a/rholang/tests/external_services/chroma_db_spec.rs +++ b/rholang/tests/external_services/chroma_db_spec.rs @@ -60,14 +60,23 @@ async fn collection_should_yield_correct_meta_after_creation() { test_runtime( meta_contract, - Some(chromadb_service::Metadata::from([ - ("meta1".to_string(), MetadataValue::NumberMeta(1)), - ("two".to_string(), MetadataValue::StringMeta("42".to_string())), - ("three".to_string(), MetadataValue::NumberMeta(42)), - ("meta2".to_string(), MetadataValue::StringMeta("bar".to_string())), - ]) - .into()), - ).await + Some( + chromadb_service::Metadata::from([ + ("meta1".to_string(), MetadataValue::NumberMeta(1)), + ( + "two".to_string(), + MetadataValue::StringMeta("42".to_string()), + ), + ("three".to_string(), MetadataValue::NumberMeta(42)), + ( + "meta2".to_string(), + MetadataValue::StringMeta("bar".to_string()), + ), + ]) + .into(), + ), + ) + .await } #[tokio::test] @@ -133,9 +142,11 @@ async fn entry_should_be_queried() { "meta2".to_string(), MetadataValue::StringMeta("42".to_string()), )])), - }.into(), + } + .into(), ])), - ).await + ) + .await } #[tokio::test] @@ -168,10 +179,7 @@ async fn query_should_return_empty() { } "#; - test_runtime( - meta_contract, - Some(RhoList::create_par(vec![])), - ).await + test_runtime(meta_contract, Some(RhoList::create_par(vec![]))).await } async fn test_runtime(contract: &str, expected: Option) { @@ -190,8 +198,7 @@ async fn test_runtime(contract: &str, expected: Option) { let tuple_space_data = tuple_space.get(&ch_zero); println!("tuple_space_data: {:?}", tuple_space_data); - let results = tuple_space_data - .map(|row| row.data[0].a.pars[0].clone()); + let results = tuple_space_data.map(|row| row.data[0].a.pars[0].clone()); assert_eq!(results, expected); }) From c095011bda62fbf28779aebf5105f22a697bac12 Mon Sep 17 00:00:00 2001 From: TotallyNotChase <44284917+TotallyNotChase@users.noreply.github.com> Date: Sun, 21 Dec 2025 23:43:19 -0700 Subject: [PATCH 24/24] Use local SBERT model --- .../src/rust/interpreter/util/sbert_embeddings.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/rholang/src/rust/interpreter/util/sbert_embeddings.rs b/rholang/src/rust/interpreter/util/sbert_embeddings.rs index 438a084bd..17156bfe1 100644 --- a/rholang/src/rust/interpreter/util/sbert_embeddings.rs +++ b/rholang/src/rust/interpreter/util/sbert_embeddings.rs @@ -1,9 +1,9 @@ +use std::env; + use anyhow; use async_trait::async_trait; use chromadb::embeddings::EmbeddingFunction; -use rust_bert::pipelines::sentence_embeddings::{ - SentenceEmbeddingsBuilder, SentenceEmbeddingsModelType, -}; +use rust_bert::pipelines::sentence_embeddings::SentenceEmbeddingsBuilder; // Helper SBERT embedding function to be used in ChromaDB. pub struct SBERTEmbeddings {} @@ -14,10 +14,9 @@ impl EmbeddingFunction for SBERTEmbeddings { // TODO (chase): The embedding model shouldn't be created each time but stored inside ChromaDBService. // However, the model cannot be easily shared between threads. // See: https://github.com/guillaume-be/rust-bert/issues/389 - // TODO (chase): Are we supposed to be using a local model instead? - let sbert_embeddings = - SentenceEmbeddingsBuilder::remote(SentenceEmbeddingsModelType::AllMiniLmL6V2) - .create_model()?; + let model_path = + env::var("SBERT_PATH").expect("Failed to load SBERT_PATH environment variable"); + let sbert_embeddings = SentenceEmbeddingsBuilder::local(model_path).create_model()?; let res = sbert_embeddings.encode(docs)?; Ok(res) }