-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathdefault.go
More file actions
157 lines (120 loc) · 2.99 KB
/
default.go
File metadata and controls
157 lines (120 loc) · 2.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package stream
import "errors"
/*
DefaultClosable is a default implementation of the closing mechanism.
It is a simple closed flag which can be set to true or checked.
To use it, just include the stuct as promoted field.
Implements:
- Closable
*/
type DefaultClosable struct {
closed bool
}
func (ego *DefaultClosable) Closed() bool {
return ego.closed
}
func (ego *DefaultClosable) Close() {
ego.closed = true
}
/*
DefaultConsumer is a default partial implementation of the Consumer.
Implements:
- Consumer
*/
type DefaultConsumer[T any] struct {
source Producer[T]
}
func (ego *DefaultConsumer[T]) Consume() (value T, valid bool, err error) {
if ego.source == nil {
return *new(T), false, errors.New("no source to consume from")
}
return ego.source.Get()
}
func (ego *DefaultConsumer[T]) SetSource(s Producer[T]) error {
if !ego.CanSetSource() {
return errors.New("the source has already been set")
}
ego.source = s
return nil
}
func (ego *DefaultConsumer[T]) CanSetSource() bool {
return ego.source == nil
}
/*
DefaultProducer is a default partial implementation of the Producer.
It does not itself implement Producer interface, the Get method has to be defined by the full implementation.
To use it, include the struct as promoted field and initialize it with the provided constructor.
All methods work with a pointer to the embedding struct.
Implements:
- Producer (partially)
*/
type DefaultProducer[T any] struct {
producer Producer[T]
piped bool
}
/*
NewDefaultProducer is a constructor of the DefaultProducer.
Type parameters:
- T - type of the produced values.
Parameters:
- p - full implementation of the Producer (embedding this struct).
Returns:
- pointer to the new DefaultProducer.
*/
func NewDefaultProducer[T any](p Producer[T]) *DefaultProducer[T] {
return &DefaultProducer[T]{producer: p}
}
func (ego *DefaultProducer[T]) Pipe(c Consumer[T]) Consumer[T] {
if !c.CanSetSource() {
panic("the consumer does not accept new sources")
}
if err := c.SetSource(ego.producer); err != nil {
panic(err)
}
ego.piped = true
return c
}
func (ego *DefaultProducer[T]) Read(dst []T) (int, error) {
if ego.piped {
return 0, errors.New("the stream is piped")
}
if dst == nil {
return 0, errors.New("the input slice is not initialized")
}
n := len(dst)
for i := 0; i < n; i++ {
value, valid, err := ego.producer.Get()
if err != nil || !valid {
return i, err
}
dst[i] = value
}
return n, nil
}
func (ego *DefaultProducer[T]) Collect() ([]T, error) {
if ego.piped {
return nil, errors.New("the stream is piped")
}
output := make([]T, 0)
for {
value, valid, err := ego.producer.Get()
if err != nil || !valid {
return output, err
}
output = append(output, value)
}
}
func (ego *DefaultProducer[T]) ForEach(fn func(T) error) error {
if ego.piped {
return errors.New("the stream is piped")
}
for {
value, valid, err := ego.producer.Get()
if err != nil || !valid {
return err
}
if err := fn(value); err != nil {
return err
}
}
}