forked from trustmaster/goflow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgraph_iip.go
More file actions
93 lines (83 loc) · 2.15 KB
/
graph_iip.go
File metadata and controls
93 lines (83 loc) · 2.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package goflow
import (
"fmt"
"reflect"
)
// iip stands for Initial Information Packet representation
// within the network.
type iip struct {
data interface{}
addr address
}
// AddIIP adds an Initial Information packet to the network
func (n *Graph) AddIIP(processName, portName string, data interface{}) error {
addr := parseAddress(processName, portName)
if _, exists := n.procs[processName]; exists {
n.iips = append(n.iips, iip{data: data, addr: addr})
return nil
}
return fmt.Errorf("AddIIP: could not find '%s'", addr)
}
// RemoveIIP detaches an IIP from specific process and port
func (n *Graph) RemoveIIP(processName, portName string) error {
addr := parseAddress(processName, portName)
for i, p := range n.iips {
if p.addr == addr {
// Remove item from the slice
n.iips[len(n.iips)-1], n.iips[i], n.iips = iip{}, n.iips[len(n.iips)-1], n.iips[:len(n.iips)-1]
return nil
}
}
return fmt.Errorf("RemoveIIP: could not find IIP for '%s'", addr)
}
// sendIIPs sends Initial Information Packets upon network start
func (n *Graph) sendIIPs() error {
// Send initial IPs
for _, ip := range n.iips {
ip := ip
// Get the reciever port channel
var channel reflect.Value
found := false
shouldClose := false
// Try to find it among network inports
for _, inPort := range n.inPorts {
if inPort.addr == ip.addr {
channel = inPort.channel
found = true
break
}
}
if !found {
// Try to find among connections
for _, conn := range n.connections {
if conn.tgt == ip.addr {
channel = conn.channel
found = true
break
}
}
}
if !found {
// Try to find a proc and attach a new channel to it
recvPort, err := n.getProcPort(ip.addr.proc, ip.addr.port, reflect.RecvDir)
if err != nil {
return err
}
channel, err = attachPort(recvPort, ip.addr, reflect.RecvDir, reflect.ValueOf(nil), n.conf.BufferSize)
found = true
shouldClose = true
}
if found {
// Send data to the port
go func() {
channel.Send(reflect.ValueOf(ip.data))
if shouldClose {
channel.Close()
}
}()
} else {
return fmt.Errorf("IIP target not found: '%s'", ip.addr)
}
}
return nil
}