Skip to content
599 changes: 532 additions & 67 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ conjure-runtime = "6"
nominal-api = "0.917.0"
prost = "0.13"
snap = "1"
tokio = { version = "1", features = ["full", "tracing"] }
futures = "0.3"
tokio = { version = "1", features = ["full", "tracing", "io-util"] }
url = "2.5.4"
parking_lot = "0.12"
tracing = "0.1"
Expand All @@ -23,6 +24,11 @@ serde_json = "1.0.140"
chrono = "0.4.41"
thiserror = "2"
crossbeam-channel = "0.5.15"
reqwest = "0.12.22"
log = "0.4.27"
async-channel = "2.5.0"
rand = "0.8.5"
bytes = "1.10.1"

[profile.release]
debug = true
Expand Down
103 changes: 79 additions & 24 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io::Write;
use std::sync::LazyLock;

use conjure_error::Error;
use conjure_http::client::AsyncClient;
use conjure_http::client::AsyncRequestBody;
use conjure_http::client::AsyncService;
use conjure_http::private::header::CONTENT_ENCODING;
use conjure_http::private::header::CONTENT_TYPE;
use conjure_http::private::Request;
Expand All @@ -18,8 +18,9 @@ use conjure_runtime::Client;
use conjure_runtime::Idempotency;
use conjure_runtime::ResponseBody;
use conjure_runtime::UserAgent;
use derive_more::From;
use nominal_api::api::rids::NominalDataSourceOrDatasetRid;
use nominal_api::ingest::api::IngestServiceAsyncClient;
use nominal_api::upload::api::UploadServiceAsyncClient;
use snap::write::FrameEncoder;
use url::Url;

Expand All @@ -32,27 +33,86 @@ pub mod conjure {
pub use conjure_runtime as runtime;
}

const PRODUCTION_API_URL: &str = "https://api.gov.nominal.io/api";
const STAGING_API_URL: &str = "https://api-staging.gov.nominal.io/api";
const USER_AGENT: &str = "nominal-streaming";

impl AuthProvider for BearerToken {
fn token(&self) -> Option<BearerToken> {
Some(self.clone())
}
}

pub static PRODUCTION_STREAMING_CLIENT: LazyLock<StreamingClient> = LazyLock::new(|| {
async_conjure_streaming_client("https://api.gov.nominal.io/api".try_into().unwrap())
.expect("Failed to create client")
#[derive(Debug, Clone)]
pub struct TokenAndWorkspaceRid {
pub token: BearerToken,
pub workspace_rid: Option<ResourceIdentifier>,
}

impl AuthProvider for TokenAndWorkspaceRid {
fn token(&self) -> Option<BearerToken> {
Some(self.token.clone())
}

fn workspace_rid(&self) -> Option<ResourceIdentifier> {
self.workspace_rid.clone()
}
}

#[derive(Clone)]
pub struct NominalApiClients {
pub streaming: Client,
pub upload: UploadServiceAsyncClient<Client>,
pub ingest: IngestServiceAsyncClient<Client>,
}

impl Debug for NominalApiClients {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NominalApiClients")
.field("streaming", &"Client")
.field("upload", &"UploadServiceAsyncClient<Client>")
.field("ingest", &"IngestServiceAsyncClient<Client>")
.finish()
}
}

impl NominalApiClients {
pub async fn send(&self, req: WriteRequest<'_>) -> Result<Response<ResponseBody>, Error> {
self.streaming.send(req).await
}
}

pub static PRODUCTION_CLIENTS: LazyLock<NominalApiClients> = LazyLock::new(|| NominalApiClients {
streaming: async_conjure_streaming_client(PRODUCTION_API_URL.try_into().unwrap())
.expect("Failed to create streaming client"),
upload: UploadServiceAsyncClient::new(
async_conjure_client("upload", PRODUCTION_API_URL.try_into().unwrap())
.expect("Failed to create upload client"),
),
ingest: IngestServiceAsyncClient::new(
async_conjure_client("ingest", PRODUCTION_API_URL.try_into().unwrap())
.expect("Failed to create ingest client"),
),
});

pub static STAGING_STREAMING_CLIENT: LazyLock<StreamingClient> = LazyLock::new(|| {
async_conjure_streaming_client("https://api-staging.gov.nominal.io/api".try_into().unwrap())
.expect("Failed to create client")
pub static STAGING_CLIENTS: LazyLock<NominalApiClients> = LazyLock::new(|| NominalApiClients {
streaming: async_conjure_streaming_client(STAGING_API_URL.try_into().unwrap())
.expect("Failed to create streaming client"),
upload: UploadServiceAsyncClient::new(
async_conjure_client("upload", STAGING_API_URL.try_into().unwrap())
.expect("Failed to create upload client"),
),
ingest: IngestServiceAsyncClient::new(
async_conjure_client("ingest", STAGING_API_URL.try_into().unwrap())
.expect("Failed to create ingest client"),
),
});

fn async_conjure_streaming_client(uri: Url) -> Result<StreamingClient, Error> {
fn async_conjure_streaming_client(uri: Url) -> Result<Client, Error> {
Client::builder()
.service("core-streaming-rs")
.user_agent(UserAgent::new(Agent::new(
"core-streaming-rs",
USER_AGENT,
env!("CARGO_PKG_VERSION"),
)))
.uri(uri)
Expand All @@ -64,22 +124,17 @@ fn async_conjure_streaming_client(uri: Url) -> Result<StreamingClient, Error> {
// enables retries for POST endpoints like the streaming ingest one
.idempotency(Idempotency::Always)
.build()
.map(|client| client.into())
}

#[derive(From, Clone)]
pub struct StreamingClient(Client);

impl Debug for StreamingClient {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "StreamingClient")
}
}

impl StreamingClient {
pub async fn send(&self, req: WriteRequest<'_>) -> Result<Response<ResponseBody>, Error> {
self.0.send(req).await
}
fn async_conjure_client(service: &'static str, uri: Url) -> Result<Client, Error> {
Client::builder()
.service(service)
.user_agent(UserAgent::new(Agent::new(
USER_AGENT,
env!("CARGO_PKG_VERSION"),
)))
.uri(uri)
.build()
}

pub type WriteRequest<'a> = Request<AsyncRequestBody<'a, BodyWriter>>;
Expand Down
Loading