@@ -98,15 +98,15 @@ fn embedded_replica() {
9898 )
9999 . await ?;
100100
101- let n = db. sync ( ) . await ?;
101+ let n = db. sync ( ) . await ?. frame_no ( ) ;
102102 assert_eq ! ( n, None ) ;
103103
104104 let conn = db. connect ( ) ?;
105105
106106 conn. execute ( "CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)" , ( ) )
107107 . await ?;
108108
109- let n = db. sync ( ) . await ?;
109+ let n = db. sync ( ) . await ?. frame_no ( ) ;
110110 assert_eq ! ( n, Some ( 1 ) ) ;
111111
112112 let err = conn
@@ -171,15 +171,15 @@ fn execute_batch() {
171171 )
172172 . await ?;
173173
174- let n = db. sync ( ) . await ?;
174+ let n = db. sync ( ) . await ?. frame_no ( ) ;
175175 assert_eq ! ( n, None ) ;
176176
177177 let conn = db. connect ( ) ?;
178178
179179 conn. execute ( "CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)" , ( ) )
180180 . await ?;
181181
182- let n = db. sync ( ) . await ?;
182+ let n = db. sync ( ) . await ?. frame_no ( ) ;
183183 assert_eq ! ( n, Some ( 1 ) ) ;
184184
185185 conn. execute_batch (
@@ -224,15 +224,15 @@ fn stream() {
224224 )
225225 . await ?;
226226
227- let n = db. sync ( ) . await ?;
227+ let n = db. sync ( ) . await ?. frame_no ( ) ;
228228 assert_eq ! ( n, None ) ;
229229
230230 let conn = db. connect ( ) ?;
231231
232232 conn. execute ( "CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)" , ( ) )
233233 . await ?;
234234
235- let n = db. sync ( ) . await ?;
235+ let n = db. sync ( ) . await ?. frame_no ( ) ;
236236 assert_eq ! ( n, Some ( 1 ) ) ;
237237
238238 conn. execute_batch (
@@ -299,15 +299,15 @@ fn embedded_replica_with_encryption() {
299299 )
300300 . await ?;
301301
302- let n = db. sync ( ) . await ?;
302+ let n = db. sync ( ) . await ?. frame_no ( ) ;
303303 assert_eq ! ( n, None ) ;
304304
305305 let conn = db. connect ( ) ?;
306306
307307 conn. execute ( "CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)" , ( ) )
308308 . await ?;
309309
310- let n = db. sync ( ) . await ?;
310+ let n = db. sync ( ) . await ?. frame_no ( ) ;
311311 assert_eq ! ( n, Some ( 1 ) ) ;
312312
313313 let err = conn
@@ -461,7 +461,7 @@ fn replica_primary_reset() {
461461 )
462462 . await
463463 . unwrap ( ) ;
464- let replica_index = replica. sync ( ) . await . unwrap ( ) . unwrap ( ) ;
464+ let replica_index = replica. sync ( ) . await . unwrap ( ) . frame_no ( ) . unwrap ( ) ;
465465 let primary_index = Client :: new ( )
466466 . get ( "http://primary:9090/v1/namespaces/default/stats" )
467467 . await
@@ -520,7 +520,7 @@ fn replica_primary_reset() {
520520 )
521521 . await
522522 . unwrap ( ) ;
523- let replica_index = replica. sync ( ) . await . unwrap ( ) . unwrap ( ) ;
523+ let replica_index = replica. sync ( ) . await . unwrap ( ) . frame_no ( ) . unwrap ( ) ;
524524 let primary_index = Client :: new ( )
525525 . get ( "http://primary:9090/v1/namespaces/default/stats" )
526526 . await
@@ -625,7 +625,7 @@ fn replica_no_resync_on_restart() {
625625 )
626626 . await
627627 . unwrap ( ) ;
628- db. sync ( ) . await . unwrap ( ) . unwrap ( )
628+ db. sync ( ) . await . unwrap ( ) . frame_no ( ) . unwrap ( )
629629 } ;
630630 let first_sync = before. elapsed ( ) ;
631631
@@ -641,7 +641,7 @@ fn replica_no_resync_on_restart() {
641641 )
642642 . await
643643 . unwrap ( ) ;
644- db. sync ( ) . await . unwrap ( ) . unwrap ( )
644+ db. sync ( ) . await . unwrap ( ) . frame_no ( ) . unwrap ( )
645645 } ;
646646 let second_sync = before. elapsed ( ) ;
647647
@@ -725,7 +725,8 @@ fn replicate_with_snapshots() {
725725 . await
726726 . unwrap ( ) ;
727727
728- db. sync ( ) . await . unwrap ( ) ;
728+ let rep = db. sync ( ) . await . unwrap ( ) ;
729+ assert_eq ! ( rep. frames_synced( ) , 427 ) ;
729730
730731 let conn = db. connect ( ) . unwrap ( ) ;
731732
@@ -757,7 +758,8 @@ fn replicate_with_snapshots() {
757758
758759 assert_eq ! ( stat, 427 ) ;
759760
760- db. sync ( ) . await . unwrap ( ) ;
761+ let rep = db. sync ( ) . await . unwrap ( ) ;
762+ assert_eq ! ( rep. frames_synced( ) , 0 ) ;
761763
762764 let conn = db. connect ( ) . unwrap ( ) ;
763765
@@ -1226,3 +1228,132 @@ fn txn_bug_issue_1283() {
12261228
12271229 sim. run ( ) . unwrap ( ) ;
12281230}
1231+
1232+ #[ test]
1233+ fn replicated_return ( ) {
1234+ let tmp_embedded = tempdir ( ) . unwrap ( ) ;
1235+ let tmp_embedded_path = tmp_embedded. path ( ) . to_owned ( ) ;
1236+
1237+ let mut sim = Builder :: new ( )
1238+ . simulation_duration ( Duration :: from_secs ( 1000 ) )
1239+ . build ( ) ;
1240+ let tmp = tempdir ( ) . unwrap ( ) ;
1241+
1242+ let notify = Arc :: new ( Notify :: new ( ) ) ;
1243+ let notify_clone = notify. clone ( ) ;
1244+
1245+ init_tracing ( ) ;
1246+ sim. host ( "primary" , move || {
1247+ let notify = notify_clone. clone ( ) ;
1248+ let path = tmp. path ( ) . to_path_buf ( ) ;
1249+ async move {
1250+ let make_server = || async {
1251+ TestServer {
1252+ path : path. clone ( ) . into ( ) ,
1253+ user_api_config : UserApiConfig {
1254+ ..Default :: default ( )
1255+ } ,
1256+ admin_api_config : Some ( AdminApiConfig {
1257+ acceptor : TurmoilAcceptor :: bind ( ( [ 0 , 0 , 0 , 0 ] , 9090 ) ) . await . unwrap ( ) ,
1258+ connector : TurmoilConnector ,
1259+ disable_metrics : true ,
1260+ } ) ,
1261+ rpc_server_config : Some ( RpcServerConfig {
1262+ acceptor : TurmoilAcceptor :: bind ( ( [ 0 , 0 , 0 , 0 ] , 4567 ) ) . await . unwrap ( ) ,
1263+ tls_config : None ,
1264+ } ) ,
1265+ ..Default :: default ( )
1266+ }
1267+ } ;
1268+ let server = make_server ( ) . await ;
1269+ let shutdown = server. shutdown . clone ( ) ;
1270+
1271+ let fut = async move { server. start_sim ( 8080 ) . await } ;
1272+
1273+ tokio:: pin!( fut) ;
1274+
1275+ loop {
1276+ tokio:: select! {
1277+ res = & mut fut => {
1278+ res. unwrap( ) ;
1279+ break
1280+ }
1281+ _ = notify. notified( ) => {
1282+ shutdown. notify_waiters( ) ;
1283+ } ,
1284+ }
1285+ }
1286+
1287+ drop ( fut) ;
1288+
1289+ tokio:: fs:: File :: create ( path. join ( "dbs" ) . join ( "default" ) . join ( ".sentinel" ) )
1290+ . await
1291+ . unwrap ( ) ;
1292+
1293+ notify. notify_waiters ( ) ;
1294+ let server = make_server ( ) . await ;
1295+ server. start_sim ( 8080 ) . await . unwrap ( ) ;
1296+
1297+ Ok ( ( ) )
1298+ }
1299+ } ) ;
1300+
1301+ sim. client ( "client" , async move {
1302+ let path = tmp_embedded_path. join ( "embedded" ) ;
1303+ let db = Database :: open_with_remote_sync_connector (
1304+ path. to_str ( ) . unwrap ( ) ,
1305+ "http://primary:8080" ,
1306+ "" ,
1307+ TurmoilConnector ,
1308+ false ,
1309+ None ,
1310+ )
1311+ . await ?;
1312+
1313+ let rep = db. sync ( ) . await . unwrap ( ) ;
1314+ assert_eq ! ( rep. frame_no( ) , None ) ;
1315+ assert_eq ! ( rep. frames_synced( ) , 0 ) ;
1316+
1317+ let conn = db. connect ( ) ?;
1318+
1319+ conn. execute ( "CREATE TABLE user (id INTEGER)" , ( ) )
1320+ . await
1321+ . unwrap ( ) ;
1322+
1323+ let rep = db. sync ( ) . await . unwrap ( ) ;
1324+ assert_eq ! ( rep. frame_no( ) , Some ( 1 ) ) ;
1325+ assert_eq ! ( rep. frames_synced( ) , 2 ) ;
1326+
1327+ conn. execute_batch (
1328+ "
1329+ INSERT into user(id) values (randomblob(4096));
1330+ INSERT into user(id) values (randomblob(4096));
1331+ INSERT into user(id) values (randomblob(4096));
1332+ " ,
1333+ )
1334+ . await
1335+ . unwrap ( ) ;
1336+
1337+ let rep = db. sync ( ) . await . unwrap ( ) ;
1338+ assert_eq ! ( rep. frame_no( ) , Some ( 10 ) ) ;
1339+ assert_eq ! ( rep. frames_synced( ) , 9 ) ;
1340+
1341+ // Regenerate log
1342+ notify. notify_waiters ( ) ;
1343+ notify. notified ( ) . await ;
1344+
1345+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 5 ) ) . await ;
1346+
1347+ let rep = db. sync ( ) . await . unwrap ( ) ;
1348+ assert_eq ! ( rep. frame_no( ) , Some ( 4 ) ) ;
1349+ assert_eq ! ( rep. frames_synced( ) , 3 ) ;
1350+
1351+ let mut row = conn. query ( "select count(*) from user" , ( ) ) . await . unwrap ( ) ;
1352+ let count = row. next ( ) . await . unwrap ( ) . unwrap ( ) . get :: < u64 > ( 0 ) . unwrap ( ) ;
1353+ assert_eq ! ( count, 3 ) ;
1354+
1355+ Ok ( ( ) )
1356+ } ) ;
1357+
1358+ sim. run ( ) . unwrap ( ) ;
1359+ }
0 commit comments