Skip to content

Commit b9a1b02

Browse files
Hook up create_collection as a system process
1 parent 18c910c commit b9a1b02

File tree

2 files changed

+157
-24
lines changed

2 files changed

+157
-24
lines changed

rholang/src/rust/interpreter/rho_runtime.rs

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use rspace_plus_plus::rspace::tuplespace_interface::Tuplespace;
2525
use std::collections::{HashMap, HashSet};
2626
use std::sync::Arc;
2727

28+
use crate::rust::interpreter::chromadb_service::ChromaDBService;
2829
use crate::rust::interpreter::openai_service::OpenAIService;
2930
use crate::rust::interpreter::system_processes::{BodyRefs, FixedChannels};
3031

@@ -815,20 +816,44 @@ fn std_rho_ai_processes() -> Vec<Definition> {
815816
]
816817
}
817818

819+
fn std_rho_chroma_processes() -> Vec<Definition> {
820+
vec![
821+
Definition {
822+
urn: "rho:chroma:collection:new".to_string(),
823+
fixed_channel: FixedChannels::chroma_create_collection(),
824+
// TODO (chase): How to define overloads?
825+
// This function can support 3 or 2 arguments (last one is optional).
826+
arity: 3,
827+
body_ref: BodyRefs::CHROMA_CREATE_COLLECTION,
828+
handler: Box::new(|ctx| {
829+
Box::new(move |args| {
830+
let ctx = ctx.clone();
831+
Box::pin(
832+
async move { ctx.system_processes.clone().chroma_create_collection(args).await },
833+
)
834+
})
835+
}),
836+
remainder: None,
837+
},
838+
]
839+
}
840+
818841
fn dispatch_table_creator(
819842
space: RhoISpace,
820843
dispatcher: RhoDispatch,
821844
block_data: Arc<tokio::sync::RwLock<BlockData>>,
822845
invalid_blocks: InvalidBlocks,
823846
extra_system_processes: &mut Vec<Definition>,
824847
openai_service: Arc<tokio::sync::Mutex<OpenAIService>>,
848+
chromadb_service: Arc<tokio::sync::Mutex<ChromaDBService>>,
825849
) -> RhoDispatchMap {
826850
let mut dispatch_table = HashMap::new();
827851

828852
for def in std_system_processes().iter_mut().chain(
829853
std_rho_crypto_processes()
830854
.iter_mut()
831855
.chain(std_rho_ai_processes().iter_mut())
856+
.chain(std_rho_chroma_processes().iter_mut())
832857
.chain(extra_system_processes.iter_mut()),
833858
) {
834859
// TODO: Remove cloning every time
@@ -838,6 +863,7 @@ fn dispatch_table_creator(
838863
block_data.clone(),
839864
invalid_blocks.clone(),
840865
openai_service.clone(),
866+
chromadb_service.clone(),
841867
));
842868

843869
dispatch_table.insert(tuple.0, tuple.1);
@@ -888,6 +914,7 @@ async fn setup_reducer(
888914
merge_chs: Arc<std::sync::RwLock<HashSet<Par>>>,
889915
mergeable_tag_name: Par,
890916
openai_service: Arc<tokio::sync::Mutex<OpenAIService>>,
917+
chromadb_service: Arc<tokio::sync::Mutex<ChromaDBService>>,
891918
cost: _cost,
892919
) -> DebruijnInterpreter {
893920
// println!("\nsetup_reducer");
@@ -906,6 +933,7 @@ async fn setup_reducer(
906933
invalid_blocks,
907934
extra_system_processes,
908935
openai_service,
936+
chromadb_service,
909937
);
910938

911939
let dispatcher = Arc::new(RholangAndScalaDispatcher {
@@ -941,10 +969,12 @@ fn setup_maps_and_refs(
941969
let system_binding = std_system_processes();
942970
let rho_crypto_binding = std_rho_crypto_processes();
943971
let rho_ai_binding = std_rho_ai_processes();
972+
let rho_chroma_binding = std_rho_chroma_processes();
944973
let combined_processes = system_binding
945974
.iter()
946975
.chain(rho_crypto_binding.iter())
947976
.chain(rho_ai_binding.iter())
977+
.chain(rho_chroma_binding.iter())
948978
.chain(extra_system_processes.iter())
949979
.collect::<Vec<&Definition>>();
950980

@@ -996,6 +1026,7 @@ where
9961026
)));
9971027

9981028
let openai_service = Arc::new(tokio::sync::Mutex::new(OpenAIService::new()));
1029+
let chromadb_service = Arc::new(tokio::sync::Mutex::new(ChromaDBService::new().await));
9991030
let reducer = setup_reducer(
10001031
charging_rspace,
10011032
block_data_ref.clone(),
@@ -1005,6 +1036,7 @@ where
10051036
merge_chs,
10061037
mergeable_tag_name,
10071038
openai_service,
1039+
chromadb_service,
10081040
cost,
10091041
)
10101042
.await;
@@ -1101,7 +1133,11 @@ where
11011133
/// # Returns
11021134
///
11031135
/// A configured `RhoRuntimeImpl` instance ready for executing Rholang code.
1104-
#[tracing::instrument(name = "create-play-runtime", target = "f1r3fly.rholang.runtime", skip_all)]
1136+
#[tracing::instrument(
1137+
name = "create-play-runtime",
1138+
target = "f1r3fly.rholang.runtime",
1139+
skip_all
1140+
)]
11051141
pub async fn create_rho_runtime<T>(
11061142
rspace: T,
11071143
mergeable_tag_name: Par,
@@ -1136,7 +1172,11 @@ where
11361172
/// # Returns
11371173
///
11381174
/// A configured `RhoRuntimeImpl` instance with replay capabilities.
1139-
#[tracing::instrument(name = "create-replay-runtime", target = "f1r3fly.rholang.runtime", skip_all)]
1175+
#[tracing::instrument(
1176+
name = "create-replay-runtime",
1177+
target = "f1r3fly.rholang.runtime",
1178+
skip_all
1179+
)]
11401180
pub async fn create_replay_rho_runtime<T>(
11411181
rspace: T,
11421182
mergeable_tag_name: Par,
@@ -1197,15 +1237,18 @@ where
11971237
(rho_runtime, replay_rho_runtime)
11981238
}
11991239

1200-
#[tracing::instrument(name = "create-play-runtime", target = "f1r3fly.rholang.runtime.create-play", skip_all)]
1240+
#[tracing::instrument(
1241+
name = "create-play-runtime",
1242+
target = "f1r3fly.rholang.runtime.create-play",
1243+
skip_all
1244+
)]
12011245
pub async fn create_runtime_from_kv_store(
12021246
stores: RSpaceStore,
12031247
mergeable_tag_name: Par,
12041248
init_registry: bool,
12051249
additional_system_processes: &mut Vec<Definition>,
12061250
matcher: Arc<Box<dyn Match<BindPattern, ListParWithRandom>>>,
12071251
) -> RhoRuntimeImpl {
1208-
12091252
let space: RSpace<Par, BindPattern, ListParWithRandom, TaggedContinuation> =
12101253
RSpace::create(stores, matcher).unwrap();
12111254

rholang/src/rust/interpreter/system_processes.rs

Lines changed: 110 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
use crate::rust::interpreter::chromadb_service::{ChromaDBService, CollectionMetadata};
2+
use crate::rust::interpreter::rho_type::Extractor;
3+
14
use super::contract_call::ContractCall;
25
use super::dispatch::RhoDispatch;
36
use super::errors::{illegal_argument_error, InterpreterError};
@@ -37,8 +40,10 @@ use std::sync::Arc;
3740
// NOTE: Not implementing Logger
3841
pub type RhoSysFunction = Box<
3942
dyn Fn(
40-
(Vec<ListParWithRandom>, bool, Vec<Par>),
41-
) -> Pin<Box<dyn Future<Output = Result<Vec<Par>, InterpreterError>> + Send>> + Send + Sync,
43+
(Vec<ListParWithRandom>, bool, Vec<Par>),
44+
) -> Pin<Box<dyn Future<Output = Result<Vec<Par>, InterpreterError>> + Send>>
45+
+ Send
46+
+ Sync,
4247
>;
4348
pub type RhoDispatchMap = Arc<tokio::sync::RwLock<HashMap<i64, RhoSysFunction>>>;
4449
pub type Name = Par;
@@ -170,6 +175,15 @@ impl FixedChannels {
170175
pub fn dev_null() -> Par {
171176
byte_name(24)
172177
}
178+
179+
// ChromaDB section start
180+
181+
// these bytes may need to change during finalization.
182+
pub fn chroma_create_collection() -> Par {
183+
byte_name(25)
184+
}
185+
186+
// ChromaDB section end
173187
}
174188

175189
pub struct BodyRefs;
@@ -196,6 +210,7 @@ impl BodyRefs {
196210
pub const RANDOM: i64 = 20;
197211
pub const GRPC_TELL: i64 = 21;
198212
pub const DEV_NULL: i64 = 22;
213+
pub const CHROMA_CREATE_COLLECTION: i64 = 25;
199214
}
200215

201216
pub fn non_deterministic_ops() -> HashSet<i64> {
@@ -204,6 +219,7 @@ pub fn non_deterministic_ops() -> HashSet<i64> {
204219
BodyRefs::DALLE3,
205220
BodyRefs::TEXT_TO_AUDIO,
206221
BodyRefs::RANDOM,
222+
BodyRefs::CHROMA_CREATE_COLLECTION,
207223
])
208224
}
209225

@@ -223,6 +239,7 @@ impl ProcessContext {
223239
block_data: Arc<tokio::sync::RwLock<BlockData>>,
224240
invalid_blocks: InvalidBlocks,
225241
openai_service: Arc<tokio::sync::Mutex<OpenAIService>>,
242+
chromadb_service: Arc<tokio::sync::Mutex<ChromaDBService>>,
226243
) -> Self {
227244
ProcessContext {
228245
space: space.clone(),
@@ -234,6 +251,7 @@ impl ProcessContext {
234251
space,
235252
block_data,
236253
openai_service,
254+
chromadb_service,
237255
),
238256
}
239257
}
@@ -246,13 +264,15 @@ pub struct Definition {
246264
pub body_ref: BodyRef,
247265
pub handler: Box<
248266
dyn FnMut(
249-
ProcessContext,
250-
) -> Box<
251-
dyn Fn(
252-
(Vec<ListParWithRandom>, bool, Vec<Par>),
253-
)
254-
-> Pin<Box<dyn Future<Output = Result<Vec<Par>, InterpreterError>> + Send>> + Send + Sync,
255-
> + Send,
267+
ProcessContext,
268+
) -> Box<
269+
dyn Fn(
270+
(Vec<ListParWithRandom>, bool, Vec<Par>),
271+
)
272+
-> Pin<Box<dyn Future<Output = Result<Vec<Par>, InterpreterError>> + Send>>
273+
+ Send
274+
+ Sync,
275+
> + Send,
256276
>,
257277
pub remainder: Remainder,
258278
}
@@ -265,13 +285,15 @@ impl Definition {
265285
body_ref: BodyRef,
266286
handler: Box<
267287
dyn FnMut(
268-
ProcessContext,
269-
) -> Box<
270-
dyn Fn(
271-
(Vec<ListParWithRandom>, bool, Vec<Par>),
272-
)
273-
-> Pin<Box<dyn Future<Output = Result<Vec<Par>, InterpreterError>> + Send>> + Send + Sync,
274-
> + Send,
288+
ProcessContext,
289+
) -> Box<
290+
dyn Fn(
291+
(Vec<ListParWithRandom>, bool, Vec<Par>),
292+
) -> Pin<
293+
Box<dyn Future<Output = Result<Vec<Par>, InterpreterError>> + Send>,
294+
> + Send
295+
+ Sync,
296+
> + Send,
275297
>,
276298
remainder: Remainder,
277299
) -> Self {
@@ -292,9 +314,11 @@ impl Definition {
292314
BodyRef,
293315
Box<
294316
dyn Fn(
295-
(Vec<ListParWithRandom>, bool, Vec<Par>),
296-
)
297-
-> Pin<Box<dyn Future<Output = Result<Vec<Par>, InterpreterError>> + Send>> + Send + Sync,
317+
(Vec<ListParWithRandom>, bool, Vec<Par>),
318+
)
319+
-> Pin<Box<dyn Future<Output = Result<Vec<Par>, InterpreterError>> + Send>>
320+
+ Send
321+
+ Sync,
298322
>,
299323
) {
300324
(self.body_ref, (self.handler)(context))
@@ -355,6 +379,7 @@ pub struct SystemProcesses {
355379
pub space: RhoISpace,
356380
pub block_data: Arc<tokio::sync::RwLock<BlockData>>,
357381
openai_service: Arc<tokio::sync::Mutex<OpenAIService>>,
382+
chromadb_service: Arc<tokio::sync::Mutex<ChromaDBService>>,
358383
pretty_printer: PrettyPrinter,
359384
}
360385

@@ -364,12 +389,14 @@ impl SystemProcesses {
364389
space: RhoISpace,
365390
block_data: Arc<tokio::sync::RwLock<BlockData>>,
366391
openai_service: Arc<tokio::sync::Mutex<OpenAIService>>,
392+
chromadb_service: Arc<tokio::sync::Mutex<ChromaDBService>>,
367393
) -> Self {
368394
SystemProcesses {
369395
dispatcher,
370396
space,
371397
block_data,
372398
openai_service,
399+
chromadb_service,
373400
pretty_printer: PrettyPrinter::new(),
374401
}
375402
}
@@ -1281,6 +1308,67 @@ impl SystemProcesses {
12811308
Err(illegal_argument_error("casper_invalid_blocks_set"))
12821309
}
12831310
}
1311+
1312+
// ChromaDB section start
1313+
1314+
/// This supports two overloads:
1315+
/// - (collection_name: &str, ignore_if_exists: bool)
1316+
/// - (collection_name: &str, update_if_exists: bool, metadata: CollectionMetadata)
1317+
pub async fn chroma_create_collection(
1318+
&self,
1319+
contract_args: (Vec<ListParWithRandom>, bool, Vec<Par>),
1320+
) -> Result<Vec<Par>, InterpreterError> {
1321+
let Some((produce, is_replay, previous_output, args)) =
1322+
self.is_contract_call().unapply(contract_args)
1323+
else {
1324+
return Err(illegal_argument_error("chroma_create_collection"));
1325+
};
1326+
1327+
let (collection_name, ignore_or_update_if_exists, metadata, ack) = match args.as_slice() {
1328+
[collection_name_par, update_if_exists_par, metadata_par, ack] => {
1329+
let (Some(collection_name), Some(update_if_exists), Some(metadata)) = (
1330+
RhoString::unapply(collection_name_par),
1331+
RhoBoolean::unapply(update_if_exists_par),
1332+
<CollectionMetadata as Extractor>::unapply(metadata_par),
1333+
) else {
1334+
return Err(illegal_argument_error("chroma_create_collection"));
1335+
};
1336+
Ok((collection_name, update_if_exists, Some(metadata), ack))
1337+
}
1338+
[collection_name_par, ignore_if_exists_par, ack] => {
1339+
let (Some(collection_name), Some(ignore_if_exists)) = (
1340+
RhoString::unapply(collection_name_par),
1341+
RhoBoolean::unapply(ignore_if_exists_par),
1342+
) else {
1343+
return Err(illegal_argument_error("chroma_create_collection"));
1344+
};
1345+
Ok((collection_name, ignore_if_exists, None, ack))
1346+
}
1347+
_ => Err(illegal_argument_error("chroma_create_collection")),
1348+
}?;
1349+
1350+
// Common piece of code.
1351+
if is_replay {
1352+
produce(&previous_output, ack).await?;
1353+
return Ok(previous_output);
1354+
}
1355+
1356+
let chromadb_service = self.chromadb_service.lock().await;
1357+
match chromadb_service
1358+
.create_collection(&collection_name, ignore_or_update_if_exists, metadata)
1359+
.await
1360+
{
1361+
Ok(_) => Ok(vec![]),
1362+
Err(e) => {
1363+
// TODO (chase): Is this right? It seems like other service methods do something similar.
1364+
let p = RhoString::create_par(collection_name);
1365+
produce(&[p], ack).await?;
1366+
return Err(e);
1367+
}
1368+
}
1369+
}
1370+
1371+
// ChromaDB section end
12841372
}
12851373

12861374
// See casper/src/test/scala/coop/rchain/casper/helper/RhoSpec.scala
@@ -1398,7 +1486,9 @@ pub fn test_framework_contracts() -> Vec<Definition> {
13981486
Box::new(move |args| {
13991487
let sp = sp.clone();
14001488
let invalid_blocks = invalid_blocks.clone();
1401-
Box::pin(async move { sp.casper_invalid_blocks_set(args, &invalid_blocks).await })
1489+
Box::pin(
1490+
async move { sp.casper_invalid_blocks_set(args, &invalid_blocks).await },
1491+
)
14021492
})
14031493
}),
14041494
remainder: None,

0 commit comments

Comments
 (0)