Skip to content

Commit e14d88c

Browse files
committed
processor: read_file_content
1 parent a2a5b71 commit e14d88c

File tree

5 files changed

+1284
-19
lines changed

5 files changed

+1284
-19
lines changed

coco.yml

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
env:
2-
ES_ENDPOINT: https://localhost:9200
2+
ES_ENDPOINT: http://192.168.3.119:9200
33
ES_USERNAME: admin
44
ES_PASSWORD: $[[keystore.ES_PASSWORD]]
55
WEB_BINDING: 0.0.0.0:9000
@@ -311,7 +311,7 @@ web:
311311
##background jobs
312312
pipeline:
313313
- name: enrich_documents
314-
auto_start: false
314+
auto_start: true
315315
keep_running: true
316316
processor:
317317
- consumer:
@@ -323,21 +323,25 @@ pipeline:
323323
group: enriched_documents
324324
fetch_max_messages: 10
325325
processor:
326-
- document_summarization:
327-
model: $[[env.ENRICHMENT_MODEL]]
328-
input_queue: "indexing_documents"
329-
min_input_document_length: 500
326+
- read_file_content:
330327
output_queue:
331328
name: "enriched_documents"
332-
label:
333-
tag: "enriched"
329+
330+
# - document_summarization:
331+
# model: $[[env.ENRICHMENT_MODEL]]
332+
# input_queue: "indexing_documents"
333+
# min_input_document_length: 500
334+
# output_queue:
335+
# name: "enriched_documents"
336+
# label:
337+
# tag: "enriched"
334338

335339
- name: merge_documents
336340
auto_start: true
337341
keep_running: true
338342
processor:
339343
- indexing_merge:
340-
input_queue: "indexing_documents"
344+
input_queue: "enriched_documents"
341345
idle_timeout_in_seconds: 1
342346
elasticsearch: "prod"
343347
index_name: "coco_document-v2"
@@ -362,6 +366,7 @@ pipeline:
362366
queues:
363367
type: indexing_merge
364368
tag: "merged"
369+
365370
- name: connector_dispatcher
366371
auto_start: true
367372
keep_running: true

plugins/connectors/local_fs/plugin.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,10 @@ func (p *Plugin) saveDocument(ctx *pipeline.Context, currentPath, basePath strin
154154
Source: core.DataSourceReference{ID: datasource.ID, Type: "connector", Name: datasource.Name},
155155
Type: connectors.TypeFile,
156156
Category: filepath.Dir(currentPath),
157-
Content: "", // skip content
158-
URL: currentPath,
159-
Size: int(fileInfo.Size()),
157+
// skip content here, which will be popluated by the `read_file_content` processor
158+
Content: "",
159+
URL: currentPath,
160+
Size: int(fileInfo.Size()),
160161
}
161162
doc.System = datasource.System
162163
if doc.System == nil {

plugins/processors/embedding/processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (processor *DocumentSummaryProcessor) Process(ctx *pipeline.Context) error
147147

148148
if !processor.config.IncludeSkippedDocumentToOutputQueue {
149149
continue
150-
}+
150+
}
151151

152152
outputBytes = pop
153153
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package read_file_content
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
log "github.com/cihub/seelog"
8+
"infini.sh/coco/core"
9+
"infini.sh/coco/plugins/connectors"
10+
"infini.sh/framework/core/config"
11+
"infini.sh/framework/core/param"
12+
"infini.sh/framework/core/pipeline"
13+
"infini.sh/framework/core/queue"
14+
"infini.sh/framework/core/util"
15+
)
16+
17+
func init() {
18+
pipeline.RegisterProcessorPlugin("read_file_content", New)
19+
}
20+
21+
type ReadFileContentProcessor struct {
22+
config *Config
23+
outputQueue *queue.QueueConfig
24+
}
25+
26+
type Config struct {
27+
MessageField param.ParaKey `config:"message_field"`
28+
OutputQueue *queue.QueueConfig `config:"output_queue"`
29+
}
30+
31+
func New(c *config.Config) (pipeline.Processor, error) {
32+
fmt.Printf("DBG: read_file_content.New invoked\n")
33+
34+
cfg := Config{
35+
MessageField: "messages",
36+
}
37+
if err := c.Unpack(&cfg); err != nil {
38+
return nil, err
39+
}
40+
41+
p := &ReadFileContentProcessor{config: &cfg}
42+
43+
if cfg.OutputQueue != nil {
44+
p.outputQueue = queue.SmartGetOrInitConfig(cfg.OutputQueue)
45+
}
46+
47+
return p, nil
48+
}
49+
50+
func (p *ReadFileContentProcessor) Name() string {
51+
return "read_file_content"
52+
}
53+
54+
func (p *ReadFileContentProcessor) Process(ctx *pipeline.Context) error {
55+
fmt.Printf("DBG: read_file_content.Process invoked.\n")
56+
57+
obj := ctx.Get(p.config.MessageField)
58+
if obj == nil {
59+
fmt.Printf("DBG: read_file_content.Process obj is nil for field: %s\n", p.config.MessageField)
60+
return nil
61+
}
62+
63+
messages, ok := obj.([]queue.Message)
64+
if !ok {
65+
return nil
66+
}
67+
68+
for _, msg := range messages {
69+
doc := core.Document{}
70+
71+
docBytes := msg.Data
72+
err := util.FromJSONBytes(docBytes, &doc)
73+
if err != nil {
74+
log.Error("error on handle document:", err)
75+
continue
76+
}
77+
78+
if doc.Type == connectors.TypeFile {
79+
content, err := os.ReadFile(doc.URL)
80+
if err != nil {
81+
log.Errorf("failed to read file content from %s: %v", doc.URL, err)
82+
continue
83+
}
84+
doc.Content = string(content)
85+
updatedDocBytes := util.MustToJSONBytes(doc)
86+
msg.Data = updatedDocBytes
87+
}
88+
89+
if p.outputQueue != nil {
90+
if err := queue.Push(p.outputQueue, msg.Data); err != nil {
91+
log.Error("failed to push document to [%s]'s output queue: %v\n", p.Name(), err)
92+
}
93+
}
94+
}
95+
return nil
96+
}

0 commit comments

Comments
 (0)