-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDataStream.h
More file actions
100 lines (80 loc) · 3.1 KB
/
DataStream.h
File metadata and controls
100 lines (80 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#pragma once
#include <type_traits>
#include <Select.h>
#include <Flatten.h>
#include <Where.h>
#include <Take.h>
#include <GroupBy.h>
#include <OrderBy.h>
template<typename DataT>
class DataStream {
public:
using iterator = EmitterIterator<DataT>;
explicit DataStream(Emitter<DataT>* emitter):
externalEmitter(emitter)
{}
DataStream(const Emitter<DataT>* emitter):
emitter(std::move(emitter->Copy())),
externalEmitter(nullptr)
{}
DataStream(const DataStream& other):
emitter(other.emitter.get() == nullptr ? nullptr : std::move(other.emitter->Copy())),
externalEmitter(other.externalEmitter)
{}
DataStream(DataStream&& other):
emitter(std::move(other.emitter)),
externalEmitter(other.externalEmitter)
{
assert(other.externalEmitter == nullptr);
}
template<typename OutDataT>
DataStream<OutDataT> Select(std::function<OutDataT( const DataT )> filter) const {
return std::make_unique<SelectEmitter<DataT, OutDataT>>(GetEmitter(), filter).get();
}
template<typename InDataT, typename OutDataT, typename std::enable_if_t<std::is_same< DataT, DataStream<InDataT> >::value, int> = 0>
DataStream<OutDataT> Flatten(std::function<OutDataT( const InDataT )> filter) const {
return std::make_unique<FlattenEmitter<InDataT, OutDataT>>(GetEmitter(), filter).get();
}
DataStream<DataT> Where(std::function<bool( const DataT )> filter) const {
return std::make_unique<WhereEmitter<DataT>>(GetEmitter(), filter).get();
}
DataStream<DataT> Take(int toTake) const {
return std::make_unique<TakeEmitter<DataT>>(GetEmitter(), toTake).get();
}
template<typename KeyT>
DataStream<std::pair<KeyT, DataStream<DataT> > > GroupBy(std::function<const KeyT( const DataT )> filter) {
return std::make_unique<GroupByEmitter<DataT, KeyT>>(GetEmitter(), filter).get();
}
template<typename KeyT>
DataStream<DataT> OrderBy(std::function<const KeyT( const DataT )> filter) {
return std::make_unique<OrderByEmitter<DataT, KeyT>>(GetEmitter(), filter).get();
}
std::vector<DataT> ToVector() {
std::vector<DataT> result;
EmitterGuard<DataT> guard(*GetEmitter());
while(!GetEmitter()->Ended()) {
result.push_back(GetEmitter()->EmitNext());
}
return result;
}
iterator begin() const { return iterator(GetEmitter()); }
iterator end() const { return iterator(std::make_unique< EmptyEmitter<DataT> >().get()); }
DataStream<DataT>& operator=(const DataStream<DataT>& other) {
if(this != &other) {
if(other.emitter.get() == nullptr) {
emitter.reset(nullptr);
} else {
emitter = std::move(other.emitter->Copy());
}
externalEmitter = other.externalEmitter;
}
return *this;
}
protected:
Emitter<DataT>* GetEmitter() const {
return emitter.get() == nullptr ? externalEmitter : emitter.get();
}
private:
std::unique_ptr< Emitter<DataT> > emitter;
Emitter<DataT>* externalEmitter;
};