Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(drain_filter)]
//! Reactive extensions library for Rust: a library for
//! [Reactive Programming](http://reactivex.io/) using
//! [Observable](crate::observable::Observable), to make
Expand Down
92 changes: 92 additions & 0 deletions src/observable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::ops::default_if_empty::DefaultIfEmptyOp;
use crate::ops::distinct::{DistinctKeyOp, DistinctUntilKeyChangedOp};
use crate::ops::on_error_map::OnErrorMapOp;
use crate::ops::pairwise::PairwiseOp;
use crate::ops::sliding_window::SlidingWindowWithTimeFunctionOperation;
use crate::ops::tap::TapOp;
use crate::scheduler::Instant;
use ops::{
Expand Down Expand Up @@ -1604,6 +1605,97 @@ pub trait Observable: Sized {
func: f,
}
}

/// The function `sliding_window(window_size, scanning_interval, time_function, scheduler)`
/// emits each `scanning_interval` a Vec<T> which contains all elements having a timestamp
/// `time_function(T)` larger than `now() - window_size`.
/// This means that each item can occur in multiple vectors if `window_size > scanning_interval`.
///
/// On complete, if the buffer is not empty,
/// it will be emitted.
/// On error, the buffer will be discarded.
///
/// The operator never returns an empty buffer.
///
/// #Example
/// ```
/// use std::sync::{Arc, Mutex};
/// use std::sync::atomic::{AtomicBool, Ordering};
/// use std::time::Duration;
/// use futures::executor::ThreadPool;
/// use rxrust::ops::sliding_window::get_now_duration;
/// use rxrust::prelude::*;
///
/// let pool = ThreadPool::new().unwrap();
///
/// let expected = vec![
/// vec![0],
/// vec![0, 1],
/// vec![0, 1],
/// vec![1, 2],
/// vec![1, 2],
/// vec![2, 3],
/// vec![2, 3],
/// vec![2, 3],
/// ];
/// let actual = Arc::new(Mutex::new(vec![]));
/// let actual_c = actual.clone();
///
/// let is_completed = Arc::new(AtomicBool::new(false));
/// let is_completed_c = is_completed.clone();
///
/// observable::create(|subscriber| {
/// let sleep = Duration::from_millis(100);
/// subscriber.next(0);
/// std::thread::sleep(sleep);
/// subscriber.next(1);
/// std::thread::sleep(sleep);
/// subscriber.next(2);
/// std::thread::sleep(sleep);
/// subscriber.next(3);
/// std::thread::sleep(sleep);
/// subscriber.complete();
/// })
/// .map(|i| (i, get_now_duration()))
/// .sliding_window(
/// Duration::from_millis(210),
/// Duration::from_millis(53),
/// |(_, duration)| duration,
/// pool,
/// )
/// .map(|window| window.iter().map(|(i, _)| *i).collect::<Vec<i32>>())
/// .into_shared()
/// .subscribe_all(
/// move |vec| {
/// let mut a = actual_c.lock().unwrap();
/// (*a).push(vec);
/// },
/// |()| {},
/// move || is_completed_c.store(true, Ordering::Relaxed),
/// );
///
/// std::thread::sleep(Duration::from_millis(450));
/// assert_eq!(expected, *actual.lock().unwrap());
/// assert!(is_completed.load(Ordering::Relaxed));
/// ```
fn sliding_window<TimeFunction, Scheduler>(
self,
window_size: Duration,
scanning_interval: Duration,
time_function: TimeFunction,
scheduler: Scheduler,
) -> SlidingWindowWithTimeFunctionOperation<Self, TimeFunction, Scheduler>
where
TimeFunction: Fn(Self::Item) -> Duration,
{
SlidingWindowWithTimeFunctionOperation {
source: self,
window_size,
scanning_interval,
time_function,
scheduler,
}
}
}

pub trait LocalObservable<'a>: Observable {
Expand Down
1 change: 1 addition & 0 deletions src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod skip;
pub mod skip_last;
pub mod skip_until;
pub mod skip_while;
pub mod sliding_window;
pub mod start_with;
pub mod subscribe_on;
pub mod take;
Expand Down
Loading