Skip to content

Commit 866a5e4

Browse files
committed
processor: extract_file_text
1 parent e14d88c commit 866a5e4

File tree

3 files changed

+101
-1
lines changed

3 files changed

+101
-1
lines changed

coco.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,8 @@ pipeline:
323323
group: enriched_documents
324324
fetch_max_messages: 10
325325
processor:
326-
- read_file_content:
326+
- read_file_content: {}
327+
- extract_file_text:
327328
output_queue:
328329
name: "enriched_documents"
329330

core/document.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Document struct {
4343

4444
Lang string `json:"lang,omitempty" elastic_mapping:"lang:{type:keyword,copy_to:combined_fulltext}"` // Language code (e.g., "en", "fr")
4545
Content string `json:"content,omitempty" elastic_mapping:"content:{type:text,copy_to:combined_fulltext}"` // Document content for full-text indexing
46+
Text string `json:"text,omitempty" elastic_mapping:"content:{type:text,copy_to:combined_fulltext}"` // Document content in text for full-text indexing
4647

4748
Icon string `json:"icon,omitempty" elastic_mapping:"icon:{enabled:false}"` // Icon Key, need work with datasource's assets to get the icon url, if it is a full url, then use it directly
4849
Thumbnail string `json:"thumbnail,omitempty" elastic_mapping:"thumbnail:{enabled:false}"` // Thumbnail image URL, for preview purposes
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package extract_file_text
2+
3+
import (
4+
"fmt"
5+
"os/exec"
6+
7+
log "github.com/cihub/seelog"
8+
"infini.sh/coco/core"
9+
"infini.sh/coco/plugins/connectors"
10+
"infini.sh/framework/core/config"
11+
"infini.sh/framework/core/param"
12+
"infini.sh/framework/core/pipeline"
13+
"infini.sh/framework/core/queue"
14+
"infini.sh/framework/core/util"
15+
)
16+
17+
func init() {
18+
pipeline.RegisterProcessorPlugin("extract_file_text", New)
19+
}
20+
21+
type ExtractFileTextProcessor struct {
22+
config *Config
23+
outputQueue *queue.QueueConfig
24+
}
25+
26+
type Config struct {
27+
MessageField param.ParaKey `config:"message_field"`
28+
OutputQueue *queue.QueueConfig `config:"output_queue"`
29+
}
30+
31+
func New(c *config.Config) (pipeline.Processor, error) {
32+
cfg := Config{
33+
MessageField: "messages",
34+
}
35+
if err := c.Unpack(&cfg); err != nil {
36+
return nil, err
37+
}
38+
39+
p := &ExtractFileTextProcessor{config: &cfg}
40+
41+
if cfg.OutputQueue != nil {
42+
p.outputQueue = queue.SmartGetOrInitConfig(cfg.OutputQueue)
43+
}
44+
45+
return p, nil
46+
}
47+
48+
func (p *ExtractFileTextProcessor) Name() string {
49+
return "extract_file_text"
50+
}
51+
52+
func (p *ExtractFileTextProcessor) Process(ctx *pipeline.Context) error {
53+
obj := ctx.Get(p.config.MessageField)
54+
if obj == nil {
55+
return nil
56+
}
57+
58+
messages, ok := obj.([]queue.Message)
59+
if !ok {
60+
return nil
61+
}
62+
63+
for _, msg := range messages {
64+
doc := core.Document{}
65+
66+
docBytes := msg.Data
67+
err := util.FromJSONBytes(docBytes, &doc)
68+
if err != nil {
69+
log.Error("error on handle document:", err)
70+
continue
71+
}
72+
73+
if doc.Type == connectors.TypeFile {
74+
// Call extract-cli <doc.Url>
75+
cmd := exec.Command("extract-cli", doc.URL)
76+
output, err := cmd.Output()
77+
if err != nil {
78+
log.Errorf("failed to extract text from %s: %v", doc.URL, err)
79+
// We might want to continue even if extraction fails,
80+
// or maybe we just don't update the Text field.
81+
} else {
82+
doc.Text = string(output)
83+
fmt.Printf("DBG: extracted text length: %d\n", len(doc.Text))
84+
85+
// Update msg.Data with the new document content
86+
updatedDocBytes := util.MustToJSONBytes(doc)
87+
msg.Data = updatedDocBytes
88+
}
89+
}
90+
91+
if p.outputQueue != nil {
92+
if err := queue.Push(p.outputQueue, msg.Data); err != nil {
93+
log.Errorf("failed to push document to [%s]'s output queue: %v", p.Name(), err)
94+
}
95+
}
96+
}
97+
return nil
98+
}

0 commit comments

Comments
 (0)