-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathchanneled.go
More file actions
84 lines (68 loc) · 1.44 KB
/
channeled.go
File metadata and controls
84 lines (68 loc) · 1.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package stream
import (
"errors"
"fmt"
"sync"
)
/*
Implements:
- ChanneledInput
*/
type channeledInput[T any] struct {
closed bool
DefaultProducer[T]
channel chan T
closingLock sync.Mutex
}
/*
NewChanneledInput is a constructor of the channeled input.
Type parameters:
- T - type of the produced values.
Parameters:
- capacity - size of the channel buffer.
Returns:
- pointer to the new channeled input.
*/
func NewChanneledInput[T any](capacity int) ChanneledInput[T] {
ego := &channeledInput[T]{channel: make(chan T, capacity), closingLock: sync.Mutex{}}
ego.DefaultProducer = *NewDefaultProducer[T](ego)
return ego
}
func (ego *channeledInput[T]) Channel() chan T {
return ego.channel
}
func (ego *channeledInput[T]) Get() (value T, valid bool, err error) {
value, valid = <-ego.channel
return
}
func (ego *channeledInput[T]) Close() {
ego.closingLock.Lock()
if !ego.closed {
close(ego.channel)
ego.closed = true
}
ego.closingLock.Unlock()
}
func (ego *channeledInput[T]) Closed() bool {
return ego.closed && len(ego.channel) == 0
}
func (ego *channeledInput[T]) Write(values ...T) (n int, err error) {
if values == nil {
return 0, errors.New("input slice is not initialized")
}
defer func() {
if r := recover(); r != nil {
switch e := r.(type) {
case error:
err = e
default:
err = errors.New(fmt.Sprint(e))
}
}
}()
for _, v := range values {
ego.Channel() <- v
}
n = len(values)
return
}