Skip to content

Commit 3b671d8

Browse files
committed
wip
1 parent 67e26f9 commit 3b671d8

10 files changed

Lines changed: 550 additions & 293 deletions

File tree

Cargo.lock

Lines changed: 7 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ repository = "https://github.com/rpcpool/rust-etcd-utils"
1717
async-trait = "^0.1.83"
1818
etcd-client = "^0.17.0"
1919
futures = "^0.3.31"
20+
pin-project = "1.1.11"
2021
rand = "^0.8.5"
2122
retry = "2"
2223
serde = { version = "1", features = ["derive"] }
@@ -28,4 +29,4 @@ tonic = "^0.14.2"
2829
tracing = "^0.1.40"
2930

3031
[dev-dependencies]
31-
tracing-subscriber = { version = "^0.3.1", features = ["ansi", "env-filter"] }
32+
tracing-subscriber = { version = "^0.3.1", features = ["ansi", "env-filter"] }

compose.yaml

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,25 @@ version: '3'
22

33
services:
44
etcd:
5-
image: bitnamilegacy/etcd:3.5
5+
# image: bitnamilegacy/etcd:3.5
6+
# environment:
7+
# - ALLOW_NONE_AUTHENTICATION=yes
8+
# - ETCD_ADVERTISE_CLIENT_URLS=http://etcd:2379
9+
# ports:
10+
# - "2379:2379"
11+
# - "2380:2380"
12+
image: quay.io/coreos/etcd:v3.5.0
13+
container_name: etcd
614
environment:
7-
- ALLOW_NONE_AUTHENTICATION=yes
15+
- ETCD_NAME=etcd1
16+
- ETCD_DATA_DIR=/etcd-data
17+
- ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
818
- ETCD_ADVERTISE_CLIENT_URLS=http://etcd:2379
19+
- ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
20+
- ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd:2380
21+
- ETCD_INITIAL_CLUSTER=etcd1=http://etcd:2380
22+
- ETCD_INITIAL_CLUSTER_TOKEN=etcd-single
23+
- ETCD_INITIAL_CLUSTER_STATE=new
924
ports:
1025
- "2379:2379"
1126
- "2380:2380"

src/lock.rs

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@
3434
/// ```
3535
use {
3636
super::{
37+
Revision,
3738
lease::{ManagedLease, ManagedLeaseFactory},
3839
retry::retry_etcd_legacy,
39-
Revision,
4040
},
4141
crate::{
4242
lease::{LeaseExpiredNotify, ManagedLeaseWeak},
@@ -45,7 +45,10 @@ use {
4545
},
4646
core::fmt,
4747
etcd_client::{Compare, CompareOp, GetOptions, LockOptions, Txn, TxnOp, TxnResponse},
48-
futures::{future::join_all, FutureExt},
48+
futures::{
49+
FutureExt, StreamExt,
50+
future::{BoxFuture, Shared, join_all},
51+
},
4952
retry::delay::Fixed,
5053
std::{
5154
future::Future,
@@ -55,7 +58,7 @@ use {
5558
},
5659
thiserror::Error,
5760
tokio::{
58-
sync::{broadcast, mpsc},
61+
sync::mpsc,
5962
task::{JoinError, JoinHandle},
6063
},
6164
tonic::Code,
@@ -142,14 +145,14 @@ impl Future for LockManagerHandle {
142145
/// ```
143146
///
144147
pub struct ManagedLockRevokeNotify {
145-
watch_lock_delete: broadcast::Receiver<Revision>,
148+
watch_lock_delete: Shared<BoxFuture<'static, ()>>,
146149
lease_expired_notify: LeaseExpiredNotify,
147150
}
148151

149152
impl Clone for ManagedLockRevokeNotify {
150153
fn clone(&self) -> Self {
151154
Self {
152-
watch_lock_delete: self.watch_lock_delete.resubscribe(),
155+
watch_lock_delete: self.watch_lock_delete.clone(),
153156
lease_expired_notify: self.lease_expired_notify.clone(),
154157
}
155158
}
@@ -159,14 +162,30 @@ impl ManagedLockRevokeNotify {
159162
///
160163
/// Wait for the lock to be revoked.
161164
///
162-
pub async fn wait_for_revoke(mut self) {
165+
pub async fn wait_for_revoke(self) {
166+
let watch_lock_delete = self.watch_lock_delete;
163167
tokio::select! {
164168
_ = self.lease_expired_notify.recv() => {}
165-
_ = self.watch_lock_delete.recv() => {}
169+
_ = watch_lock_delete => {}
166170
}
167171
}
168172
}
169173

174+
fn make_revoke_callback(
175+
etcd: etcd_client::Client,
176+
lock_key: Vec<u8>,
177+
revision: Revision,
178+
) -> Shared<BoxFuture<'static, ()>> {
179+
let mut watch_stream = etcd
180+
.watch_client()
181+
.watch_lock_key_change_stream(lock_key, revision);
182+
async move {
183+
let _ = watch_stream.next().await;
184+
}
185+
.boxed()
186+
.shared()
187+
}
188+
170189
///
171190
/// Creates a lock manager to create "managed" locks.
172191
///
@@ -402,17 +421,14 @@ impl LockManager {
402421
}
403422
};
404423

405-
let watch_lock_delete = self
406-
.etcd
407-
.watch_client()
408-
.watch_lock_key_change(lock_key.clone(), revision);
424+
let revoke_callback = make_revoke_callback(self.etcd.clone(), lock_key.clone(), revision);
409425
Ok(ManagedLock {
410426
lock_key,
411427
managed_lease,
412428
etcd: self.etcd.clone(),
413429
created_at_revision: revision,
414430
delete_signal_tx: self.delete_queue_tx.clone(),
415-
revoke_callback_rx: watch_lock_delete.subscribe(),
431+
revoke_callback,
416432
})
417433
}
418434

@@ -487,17 +503,14 @@ impl LockManager {
487503
}
488504
};
489505

490-
let watch_lock_delete = self
491-
.etcd
492-
.watch_client()
493-
.watch_lock_key_change(lock_key.clone(), revision);
506+
let revoke_callback = make_revoke_callback(self.etcd.clone(), lock_key.clone(), revision);
494507
Ok(ManagedLock {
495508
lock_key,
496509
managed_lease,
497510
etcd: self.etcd.clone(),
498511
created_at_revision: revision,
499512
delete_signal_tx: self.delete_queue_tx.clone(),
500-
revoke_callback_rx: watch_lock_delete.subscribe(),
513+
revoke_callback,
501514
})
502515
}
503516

@@ -562,18 +575,15 @@ impl LockManager {
562575
lock_response.key().to_vec(),
563576
);
564577

565-
let watch_lock_delete = self
566-
.etcd
567-
.watch_client()
568-
.watch_lock_key_change(lock_key.clone(), revision);
578+
let revoke_callback = make_revoke_callback(self.etcd.clone(), lock_key.clone(), revision);
569579

570580
let managed_lock = ManagedLock {
571581
lock_key,
572582
managed_lease,
573583
etcd: self.etcd.clone(),
574584
created_at_revision: revision,
575585
delete_signal_tx: self.delete_queue_tx.clone(),
576-
revoke_callback_rx: watch_lock_delete.subscribe(),
586+
revoke_callback,
577587
};
578588

579589
Ok(managed_lock)
@@ -589,7 +599,7 @@ pub struct ManagedLock {
589599
pub created_at_revision: Revision,
590600
pub(crate) etcd: etcd_client::Client,
591601
delete_signal_tx: tokio::sync::mpsc::UnboundedSender<DeleteQueueCommand>,
592-
revoke_callback_rx: broadcast::Receiver<Revision>,
602+
revoke_callback: Shared<BoxFuture<'static, ()>>,
593603
}
594604

595605
impl fmt::Debug for ManagedLock {
@@ -675,7 +685,7 @@ impl ManagedLock {
675685
///
676686
pub fn get_revoke_notify(&self) -> ManagedLockRevokeNotify {
677687
ManagedLockRevokeNotify {
678-
watch_lock_delete: self.revoke_callback_rx.resubscribe(),
688+
watch_lock_delete: self.revoke_callback.clone(),
679689
lease_expired_notify: self.managed_lease.get_lease_expire_notify(),
680690
}
681691
}
@@ -744,22 +754,10 @@ impl ManagedLock {
744754
F: FnOnce(ManagedLockGuard<'a>) -> Fut,
745755
Fut: Future<Output = T> + Send + 'a,
746756
{
747-
let mut rx = self.revoke_callback_rx.resubscribe();
748-
749-
match rx.try_recv() {
750-
Ok(_) => {
751-
tracing::trace!("Lock revoked");
752-
return Err(LockError::LockRevoked);
753-
}
754-
Err(broadcast::error::TryRecvError::Closed) => {
755-
tracing::trace!("Lock revoked");
756-
return Err(LockError::LockRevoked);
757-
}
758-
_ => {}
759-
}
757+
let revoke_callback = self.revoke_callback.clone();
760758
tokio::select! {
761759
result = func(ManagedLockGuard { managed_lock: self }) => Ok(result),
762-
_ = rx.recv() => Err(LockError::LockRevoked),
760+
_ = revoke_callback => Err(LockError::LockRevoked),
763761
}
764762
}
765763

src/log.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
use etcd_client::{Compare, CompareOp, Txn, TxnOp, WatchOptions};
2-
use serde::{de::DeserializeOwned, Serialize};
2+
use futures::StreamExt;
3+
use serde::{Serialize, de::DeserializeOwned};
34

4-
use crate::{lock::ManagedLockGuard, retry::retry_etcd_txn, sync::watch, watcher::WatchClientExt};
5+
use crate::{
6+
lock::ManagedLockGuard,
7+
retry::retry_etcd_txn,
8+
watcher::{EtcdJsonPutWatchStream, WatchClientExt},
9+
};
510

611
pub struct LogWatcher<T> {
7-
rx: watch::Receiver<T>,
12+
stream: EtcdJsonPutWatchStream<T>,
813
}
914

1015
pub struct ExclusiveLogUpdater<'a, T> {
@@ -112,26 +117,17 @@ where
112117
.max()
113118
.map(|max_mod_rev| WatchOptions::new().with_start_revision(max_mod_rev));
114119

115-
let mut rx = etcd
120+
let stream = etcd
116121
.watch_client()
117-
.json_put_watch_channel::<T>(log_name.as_ref(), maybe_watch_opts);
122+
.json_put_watch_stream::<T>(log_name.as_ref(), maybe_watch_opts);
118123

119-
let (mut wtx, wrx) = watch::watch::<T>();
120-
121-
let _channel_handle = tokio::spawn(async move {
122-
loop {
123-
let (_revision, val) = rx.recv().await.expect("watch channel closed");
124-
let _ = wtx.update(val).await;
125-
}
126-
});
127-
128-
Ok(Self { rx: wrx })
124+
Ok(Self { stream })
129125
}
130126

131127
///
132128
/// Observes the log for new entries.
133129
///
134130
pub async fn observe(&mut self) -> Option<T> {
135-
self.rx.recv().await
131+
self.stream.next().await.map(|(_revision, val)| val)
136132
}
137133
}

src/sync/watch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::{atomic::AtomicBool, Arc};
1+
use std::sync::{Arc, atomic::AtomicBool};
22
use tokio::sync::{Mutex, Notify};
33

44
struct Inner<T> {

0 commit comments

Comments
 (0)