Skip to content

Commit 394190d

Browse files
committed
libsql: Initial pass on offline writes
1 parent f959bc6 commit 394190d

File tree

8 files changed

+301
-11
lines changed

8 files changed

+301
-11
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsql/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ thiserror = "1.0.40"
1313
futures = { version = "0.3.28", optional = true }
1414
libsql-sys = { version = "0.6", path = "../libsql-sys", optional = true }
1515
libsql-hrana = { version = "0.2", path = "../libsql-hrana", optional = true }
16+
libsql_sync = { version = "0.4", path = "../libsql-sync", optional = true }
1617
tokio = { version = "1.29.1", features = ["sync"], optional = true }
1718
tokio-util = { version = "0.7", features = ["io-util", "codec"], optional = true }
1819
parking_lot = { version = "0.12.1", optional = true }
@@ -91,6 +92,7 @@ replication = [
9192
"dep:hyper-rustls",
9293
"dep:futures",
9394
"dep:libsql_replication",
95+
"dep:libsql_sync",
9496
]
9597
hrana = [
9698
"parser",

libsql/examples/offline_writes.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Example of using a offline writes with libSQL.
2+
3+
use libsql::{params, Builder};
4+
5+
#[tokio::main]
6+
async fn main() {
7+
tracing_subscriber::fmt::init();
8+
9+
// The local database path where the data will be stored.
10+
let db_path = std::env::var("LIBSQL_DB_PATH")
11+
.map_err(|_| {
12+
eprintln!(
13+
"Please set the LIBSQL_DB_PATH environment variable to set to local database path."
14+
)
15+
})
16+
.unwrap();
17+
18+
// The remote sync URL to use.
19+
let sync_url = std::env::var("LIBSQL_SYNC_URL")
20+
.map_err(|_| {
21+
eprintln!(
22+
"Please set the LIBSQL_SYNC_URL environment variable to set to remote sync URL."
23+
)
24+
})
25+
.unwrap();
26+
27+
let namespace = std::env::var("LIBSQL_NAMESPACE").ok();
28+
29+
// The authentication token to use.
30+
let auth_token = std::env::var("LIBSQL_AUTH_TOKEN").unwrap_or("".to_string());
31+
32+
let db_builder = if let Some(ns) = namespace {
33+
Builder::new_offline_replica(db_path, sync_url, auth_token).namespace(&ns)
34+
} else {
35+
Builder::new_offline_replica(db_path, sync_url, auth_token)
36+
};
37+
38+
let db = match db_builder.build().await {
39+
Ok(db) => db,
40+
Err(error) => {
41+
eprintln!("Error connecting to remote sync server: {}", error);
42+
return;
43+
}
44+
};
45+
46+
let conn = db.connect().unwrap();
47+
48+
conn.execute(
49+
r#"
50+
CREATE TABLE IF NOT EXISTS guest_book_entries (
51+
text TEXT
52+
)"#,
53+
(),
54+
)
55+
.await
56+
.unwrap();
57+
58+
let mut input = String::new();
59+
println!("Please write your entry to the guestbook:");
60+
match std::io::stdin().read_line(&mut input) {
61+
Ok(_) => {
62+
println!("You entered: {}", input);
63+
let params = params![input.as_str()];
64+
conn.execute("INSERT INTO guest_book_entries (text) VALUES (?)", params)
65+
.await
66+
.unwrap();
67+
}
68+
Err(error) => {
69+
eprintln!("Error reading input: {}", error);
70+
}
71+
}
72+
let mut results = conn
73+
.query("SELECT * FROM guest_book_entries", ())
74+
.await
75+
.unwrap();
76+
println!("Guest book entries:");
77+
while let Some(row) = results.next().await.unwrap() {
78+
let text: String = row.get(0).unwrap();
79+
println!(" {}", text);
80+
}
81+
82+
print!("Syncing database to remote...");
83+
db.sync().await.unwrap();
84+
println!(" done");
85+
}

libsql/src/database.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ enum DbType {
4646
db: crate::local::Database,
4747
encryption_config: Option<EncryptionConfig>,
4848
},
49+
#[cfg(feature = "replication")]
50+
Offline { db: crate::local::Database },
4951
#[cfg(feature = "remote")]
5052
Remote {
5153
url: String,
@@ -65,6 +67,8 @@ impl fmt::Debug for DbType {
6567
Self::File { .. } => write!(f, "File"),
6668
#[cfg(feature = "replication")]
6769
Self::Sync { .. } => write!(f, "Sync"),
70+
#[cfg(feature = "replication")]
71+
Self::Offline { .. } => write!(f, "Offline"),
6872
#[cfg(feature = "remote")]
6973
Self::Remote { .. } => write!(f, "Remote"),
7074
_ => write!(f, "no database type set"),
@@ -324,10 +328,10 @@ cfg_replication! {
324328
/// Sync database from remote, and returns the committed frame_no after syncing, if
325329
/// applicable.
326330
pub async fn sync(&self) -> Result<crate::replication::Replicated> {
327-
if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
328-
db.sync().await
329-
} else {
330-
Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
331+
match &self.db_type {
332+
DbType::Sync { db, encryption_config: _ } => db.sync().await,
333+
DbType::Offline { db } => db.push().await,
334+
_ => Err(Error::SyncNotSupported(format!("{:?}", self.db_type))),
331335
}
332336
}
333337

@@ -558,6 +562,17 @@ impl Database {
558562
Ok(Connection { conn })
559563
}
560564

565+
#[cfg(feature = "replication")]
566+
DbType::Offline { db } => {
567+
use crate::local::impls::LibsqlConnection;
568+
569+
let conn = db.connect()?;
570+
571+
let conn = std::sync::Arc::new(LibsqlConnection { conn });
572+
573+
Ok(Connection { conn })
574+
}
575+
561576
#[cfg(feature = "remote")]
562577
DbType::Remote {
563578
url,

libsql/src/database/builder.rs

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use super::DbType;
1212
/// it does no networking and does not connect to any remote database.
1313
/// - `new_remote_replica`/`RemoteReplica` creates an embedded replica database that will be able
1414
/// to sync from the remote url and delegate writes to the remote primary.
15+
/// - `new_offline_replica`/`OfflineReplica` creates an embedded replica database that supports
16+
/// offline writes.
1517
/// - `new_local_replica`/`LocalReplica` creates an embedded replica similar to the remote version
1618
/// except you must use `Database::sync_frames` to sync with the remote. This version also
1719
/// includes the ability to delegate writes to a remote primary.
@@ -66,6 +68,30 @@ impl Builder<()> {
6668
}
6769
}
6870

71+
cfg_replication! {
72+
/// Create a new offline embedded replica.
73+
pub fn new_offline_replica(
74+
path: impl AsRef<std::path::Path>,
75+
url: String,
76+
auth_token: String,
77+
) -> Builder<OfflineReplica> {
78+
Builder {
79+
inner: OfflineReplica {
80+
path: path.as_ref().to_path_buf(),
81+
flags: crate::OpenFlags::default(),
82+
remote: Remote {
83+
url,
84+
auth_token,
85+
connector: None,
86+
version: None,
87+
},
88+
http_request_callback: None,
89+
namespace: None
90+
},
91+
}
92+
}
93+
}
94+
6995
/// Create a new local replica.
7096
pub fn new_local_replica(path: impl AsRef<std::path::Path>) -> Builder<LocalReplica> {
7197
Builder {
@@ -170,6 +196,15 @@ cfg_replication! {
170196
namespace: Option<String>,
171197
}
172198

199+
/// Remote replica configuration type in [`Builder`].
200+
pub struct OfflineReplica {
201+
path: std::path::PathBuf,
202+
flags: crate::OpenFlags,
203+
remote: Remote,
204+
http_request_callback: Option<crate::util::HttpRequestCallback>,
205+
namespace: Option<String>,
206+
}
207+
173208
/// Local replica configuration type in [`Builder`].
174209
pub struct LocalReplica {
175210
path: std::path::PathBuf,
@@ -295,6 +330,90 @@ cfg_replication! {
295330
}
296331
}
297332

333+
impl Builder<OfflineReplica> {
334+
/// Provide a custom http connector that will be used to create http connections.
335+
pub fn connector<C>(mut self, connector: C) -> Builder<OfflineReplica>
336+
where
337+
C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
338+
C::Response: crate::util::Socket,
339+
C::Future: Send + 'static,
340+
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
341+
{
342+
self.inner.remote = self.inner.remote.connector(connector);
343+
self
344+
}
345+
346+
pub fn http_request_callback<F>(mut self, f: F) -> Builder<OfflineReplica>
347+
where
348+
F: Fn(&mut http::Request<()>) + Send + Sync + 'static
349+
{
350+
self.inner.http_request_callback = Some(std::sync::Arc::new(f));
351+
self
352+
353+
}
354+
355+
/// Set the namespace that will be communicated to remote replica in the http header.
356+
pub fn namespace(mut self, namespace: impl Into<String>) -> Builder<OfflineReplica>
357+
{
358+
self.inner.namespace = Some(namespace.into());
359+
self
360+
}
361+
362+
#[doc(hidden)]
363+
pub fn version(mut self, version: String) -> Builder<OfflineReplica> {
364+
self.inner.remote = self.inner.remote.version(version);
365+
self
366+
}
367+
368+
/// Build the remote embedded replica database.
369+
pub async fn build(self) -> Result<Database> {
370+
let OfflineReplica {
371+
path,
372+
flags,
373+
remote:
374+
Remote {
375+
url,
376+
auth_token,
377+
connector,
378+
version,
379+
},
380+
http_request_callback,
381+
namespace
382+
} = self.inner;
383+
384+
let connector = if let Some(connector) = connector {
385+
connector
386+
} else {
387+
let https = super::connector()?;
388+
use tower::ServiceExt;
389+
390+
let svc = https
391+
.map_err(|e| e.into())
392+
.map_response(|s| Box::new(s) as Box<dyn crate::util::Socket>);
393+
394+
crate::util::ConnectorService::new(svc)
395+
};
396+
397+
let path = path.to_str().ok_or(crate::Error::InvalidUTF8Path)?.to_owned();
398+
399+
let db = crate::local::Database::open_local_with_offline_writes(
400+
connector,
401+
path,
402+
flags,
403+
url,
404+
auth_token,
405+
version,
406+
http_request_callback,
407+
namespace,
408+
)
409+
.await?;
410+
411+
Ok(Database {
412+
db_type: DbType::Offline { db },
413+
})
414+
}
415+
}
416+
298417
impl Builder<LocalReplica> {
299418
/// Set [`OpenFlags`] for this database.
300419
pub fn flags(mut self, flags: crate::OpenFlags) -> Builder<LocalReplica> {

libsql/src/local/connection.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use super::{Database, Error, Result, Rows, RowsFuture, Statement, Transaction};
88

99
use crate::TransactionBehavior;
1010

11-
use libsql_sys::ffi;
11+
use libsql_sys::{ffi, wal};
1212
use std::{ffi::c_int, fmt, path::Path, sync::Arc};
1313

1414
/// A connection to a libSQL database.
@@ -57,13 +57,20 @@ impl Connection {
5757
)));
5858
}
5959
}
60-
61-
Ok(Connection {
60+
let conn = Connection {
6261
raw,
6362
drop_ref: Arc::new(()),
6463
#[cfg(feature = "replication")]
6564
writer: db.writer()?,
66-
})
65+
};
66+
if let Some(_) = db.sync_ctx {
67+
// We need to make sure database is in WAL mode with checkpointing
68+
// disabled so that we can sync our changes back to a remote
69+
// server.
70+
conn.query("PRAGMA journal_mode = WAL", Params::None)?;
71+
conn.query("PRAGMA wal_autocheckpoint = 0", Params::None)?;
72+
}
73+
Ok(conn)
6774
}
6875

6976
/// Get a raw handle to the underlying libSQL connection

0 commit comments

Comments
 (0)