-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprocessor.go
More file actions
67 lines (64 loc) · 1.5 KB
/
processor.go
File metadata and controls
67 lines (64 loc) · 1.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main
import (
`context`
`log`
`sync`
)
func ProcessRow(headerFields []string, rowFields []string, rowWg *sync.WaitGroup, row chan []Column) {
defer rowWg.Done()
var d = map[string]string{}
if len(headerFields) != len(rowFields) {
if len(headerFields) < len(rowFields) {
for i, r := range rowFields {
if i >= len(headerFields) || len(r) == 0 {
continue
}
d[headerFields[i]] = r
}
} else {
for i, h := range headerFields {
if i >= len(rowFields) || len(h) == 0 {
continue
}
d[h] = rowFields[i]
}
}
}
var rowData = []Column{}
if len(d) > 0 {
for h, v := range d {
rowData = append(rowData, Column{Header: h, Value: v})
}
} else {
for i := 0; i < len(rowFields); i++ {
value := rowFields[i]
if i == 0 && len(value) == 0 {
return
}
if len(headerFields) < i {
log.Printf("skipping rowField %v due to headerFields not matching up properly", rowFields[i])
continue
}
rowData = append(rowData, Column{headerFields[i], value})
}
}
row <- rowData
}
func ReceiveRows(ctx context.Context, row chan []Column, filename string, callback CallbackFunc, done chan struct{}) {
for {
select {
case <-ctx.Done():
return
case populatedRow, ok := <-row:
if !ok {
done <- struct{}{}
return
}
ctx := context.WithValue(ctx, CtxKey("csv_file"), filename)
callbackErr := callback(ctx, populatedRow)
if callbackErr != nil {
log.Printf("failed to insert row %v with error %v", populatedRow, callbackErr)
}
}
}
}