Skip to content

Commit d884f26

Browse files
authored
feat(lazer): add state endpoint support to history client (#3175)
* feat(lazer): add state endpoint support to history client * chore: bump pyth-lazer-client version to 9.0.0
1 parent a52f61e commit d884f26

File tree

8 files changed

+521
-221
lines changed

8 files changed

+521
-221
lines changed

Cargo.lock

Lines changed: 81 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lazer/sdk/rust/client/Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
[package]
22
name = "pyth-lazer-client"
3-
version = "8.6.1"
3+
version = "9.0.0"
44
edition = "2021"
55
description = "A Rust client for Pyth Lazer"
66
license = "Apache-2.0"
77

88
[dependencies]
99
pyth-lazer-protocol = { path = "../protocol", version = "0.20.1" }
10+
pyth-lazer-publisher-sdk = { path = "../../../publisher_sdk/rust", version = "0.20.1" }
11+
1012
tokio = { version = "1", features = ["full"] }
13+
tokio-stream = "0.1.17"
1114
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
1215
futures-util = "0.3"
1316
serde = { version = "1.0", features = ["derive"] }
@@ -25,7 +28,9 @@ futures = "0.3.31"
2528
humantime-serde = "1.1.1"
2629
fs-err = "3.1.1"
2730
atomicwrites = "0.4.4"
28-
31+
protobuf-json-mapping = "3.7.2"
32+
serde_with = "3.15.1"
33+
async-trait = "0.1.89"
2934

3035
[dev-dependencies]
3136
bincode = "1.3.3"
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use {
2+
pyth_lazer_client::arc_swap::StreamIntoAutoUpdatedHandle,
3+
pyth_lazer_client::history_client::GetStateParams,
4+
pyth_lazer_client::history_client::{PythLazerHistoryClient, PythLazerHistoryClientConfig},
5+
std::{env, time::Duration},
6+
tokio::time::sleep,
7+
url::Url,
8+
};
9+
10+
#[tokio::main]
11+
async fn main() -> anyhow::Result<()> {
12+
tracing_subscriber::fmt::init();
13+
let urls = std::env::args()
14+
.skip(1)
15+
.map(|s| Url::parse(&s))
16+
.collect::<Result<Vec<_>, _>>()?;
17+
18+
let client = PythLazerHistoryClient::new(PythLazerHistoryClientConfig {
19+
urls,
20+
update_interval: Duration::from_secs(5),
21+
access_token: Some(env::var("ACCESS_TOKEN")?),
22+
..Default::default()
23+
});
24+
let state = client
25+
.state_stream(GetStateParams {
26+
publishers: true,
27+
..Default::default()
28+
})
29+
.await?
30+
.into_auto_updated_handle()
31+
.await?;
32+
33+
loop {
34+
println!("publishers len: {}", state.load().publishers.len());
35+
println!(
36+
"publisher 1: {:?}",
37+
state
38+
.load()
39+
.publishers
40+
.iter()
41+
.find(|p| p.publisher_id == Some(1))
42+
);
43+
sleep(Duration::from_secs(15)).await;
44+
}
45+
}

lazer/sdk/rust/client/examples/symbols.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::time::Duration;
1+
use {pyth_lazer_client::arc_swap::StreamIntoAutoUpdatedHandle, std::time::Duration};
22

33
use pyth_lazer_client::history_client::{PythLazerHistoryClient, PythLazerHistoryClientConfig};
44
use pyth_lazer_protocol::PriceFeedId;
@@ -18,11 +18,21 @@ async fn main() -> anyhow::Result<()> {
1818
update_interval: Duration::from_secs(5),
1919
..Default::default()
2020
});
21-
let symbols = client.all_symbols_metadata_handle().await?;
21+
let symbols = client
22+
.all_symbols_metadata_stream()
23+
.await?
24+
.into_auto_updated_handle()
25+
.await?;
2226

2327
loop {
24-
println!("symbols len: {}", symbols.symbols().len());
25-
println!("symbol 1: {:?}", symbols.symbols().get(&PriceFeedId(1)));
28+
println!("symbols len: {}", symbols.load().len());
29+
println!(
30+
"symbol 1: {:?}",
31+
symbols
32+
.load()
33+
.iter()
34+
.find(|feed| feed.pyth_lazer_id == PriceFeedId(1))
35+
);
2636
sleep(Duration::from_secs(15)).await;
2737
}
2838
}

lazer/sdk/rust/client/examples/symbols_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::time::Duration;
1+
use {futures::StreamExt, std::time::Duration};
22

33
use pyth_lazer_client::history_client::{PythLazerHistoryClient, PythLazerHistoryClientConfig};
44
use pyth_lazer_protocol::PriceFeedId;
@@ -19,7 +19,7 @@ async fn main() -> anyhow::Result<()> {
1919
});
2020
let mut symbols_stream = client.all_symbols_metadata_stream().await?;
2121

22-
while let Some(symbols) = symbols_stream.recv().await {
22+
while let Some(symbols) = symbols_stream.next().await {
2323
println!("symbols len: {}", symbols.len());
2424
println!(
2525
"symbol 1: {:?}",
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use {
2+
anyhow::Context as _,
3+
arc_swap::ArcSwap,
4+
futures::Stream,
5+
futures_util::StreamExt as _,
6+
std::sync::Arc,
7+
tracing::{info, Instrument as _},
8+
};
9+
10+
#[async_trait::async_trait]
11+
pub trait StreamIntoAutoUpdatedHandle: Stream + Unpin + Sized + 'static
12+
where
13+
Self::Item: Send + Sync,
14+
{
15+
/// Create an `ArcSwap` that provides access to the most recent value produced by the stream.
16+
async fn into_auto_updated_handle(mut self) -> anyhow::Result<Arc<ArcSwap<Self::Item>>> {
17+
let first_value = self
18+
.next()
19+
.await
20+
.context("cannot create auto updated handle from empty stream")?;
21+
let handle = Arc::new(ArcSwap::new(Arc::new(first_value)));
22+
let weak_handle = Arc::downgrade(&handle);
23+
tokio::spawn(
24+
async move {
25+
while let Some(value) = self.next().await {
26+
let Some(handle) = weak_handle.upgrade() else {
27+
info!("handle dropped, stopping auto handle update task");
28+
return;
29+
};
30+
handle.store(Arc::new(value));
31+
}
32+
}
33+
.in_current_span(),
34+
);
35+
Ok(handle)
36+
}
37+
}
38+
39+
impl<T: Stream + Unpin + 'static> StreamIntoAutoUpdatedHandle for T where T::Item: Send + Sync {}

0 commit comments

Comments
 (0)