Skip to content

Commit 39113d5

Browse files
authored
Merge pull request #3 from beamline/v2
Fixed integration page
2 parents 7bde75f + 35fe871 commit 39113d5

File tree

1 file changed

+42
-21
lines changed

1 file changed

+42
-21
lines changed

docs/pybeamline/integration.md

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,44 +22,65 @@ In this case, we built two logs (`log_original` and `log_after_drift`) which inc
2222
After that we can use the capabilities of pyBeamline and reactivex to construct a pipeline that produce a sequence of frequencies corresponding to the frequency of directly follows relation `BC` in window with length 40 (which is chosen as all our traces have length 4). Also note that we leverage the fact that in all our events when `B` and `C` appear they are always in the same trace (because of how `log_source` generates the observable). We will later define a function `check_for_drift`:
2323

2424
```python
25-
import reactivex
2625
from reactivex import operators as ops
2726

2827
log_with_drift.pipe(
29-
ops.buffer_with_count(40),
30-
ops.flat_map(lambda events: reactivex.from_iterable(events).pipe(
28+
RxOperator(ops.buffer_with_count(40)),
29+
RxOperator(ops.flat_map(lambda events: reactivex.from_iterable(events).pipe(
3130
ops.pairwise(),
3231
ops.filter(lambda x: x[0].get_trace_name() == x[1].get_trace_name() and x[0].get_event_name() == "B" and x[1].get_event_name() == "C"),
3332
ops.count()
3433
)
35-
)
36-
).subscribe(lambda x: print(x))
34+
)),
35+
).sink(print_sink())
3736
```
37+
3838
After this we can define our function for drift detection and collection of points and drift indexes using:
3939
```python
40-
from reactivex import operators as ops
40+
import reactivex
41+
42+
from pybeamline.stream.rx_operator import RxOperator
43+
from pybeamline.stream.base_sink import BaseSink
44+
from typing import Optional, List
45+
from pybeamline.stream.base_map import BaseMap
4146
from river import drift
47+
from reactivex import operators as ops
4248

43-
drift_detector = drift.ADWIN()
44-
data = []
45-
drifts = []
49+
class CheckForDrift(BaseMap[int, int]):
50+
def __init__(self):
51+
self.drift_detector = drift.ADWIN()
52+
self.drifts = []
53+
self.index = 0
4654

47-
def check_for_drift():
48-
index = 0
55+
def transform(self, value: int) -> Optional[List[int]]:
56+
self.drift_detector.update(value)
57+
self.index += 1
58+
if self.drift_detector.drift_detected:
59+
self.drifts.append(self.index)
60+
return [value]
4961

50-
def _process(x):
51-
nonlocal index
52-
drift_detector.update(x)
53-
index = index + 1
54-
if drift_detector.drift_detected:
55-
drifts.append(index)
62+
class CollectSink(BaseSink[int]):
63+
def __init__(self):
64+
self.data = []
5665

57-
def _check_for_drift(obs):
58-
return obs.pipe(ops.do_action(lambda value: _process(value)))
66+
def consume(self, item: int) -> None:
67+
self.data.append(item)
5968

60-
return _check_for_drift
69+
70+
drift_detector = CheckForDrift()
71+
collector = CollectSink()
72+
73+
log_with_drift.pipe(RxOperator(ops.buffer_with_count(40)),
74+
RxOperator(ops.flat_map(lambda events: reactivex.from_iterable(events).pipe(
75+
ops.pairwise(),
76+
ops.filter(lambda x: x[0].get_trace_name() == x[1].get_trace_name() and x[0].get_event_name() == "B" and x[1].get_event_name() == "C"),
77+
ops.count()
78+
)
79+
)),
80+
drift_detector
81+
).sink(collector)
6182
```
62-
With this function available, `check_for_drift` can now be piped to the previous computation. Plotting the frequencies and the concept drifts will result in the following:
83+
With this class available, `CheckForDrift` can now be piped to the previous computation. Plotting the frequencies and the concept drifts will result in the following:
6384

6485
![](https://github.com/beamline/docs/blob/main/site/img/drifts.png?raw=true)
6586

0 commit comments

Comments
 (0)