Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 5 additions & 22 deletions cmd/database/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,17 @@ import (
"context"
"fmt"
"github.com/polyse/database/internal/api"
"github.com/polyse/database/internal/collection"
"github.com/polyse/database/pkg/filters"
"github.com/rs/zerolog"
"github.com/xujiajun/nutsdb"
"os"
"strings"

"github.com/google/wire"
"github.com/polyse/database/internal/collection"

"github.com/rs/zerolog/log"
"github.com/xlab/closer"
)

var (
procSetter = wire.NewSet(
initDbConfig,
initConnection,
initTokenizer,
initFilters,
collection.NewSimpleProcessor,
)

dbSetter = wire.NewSet(
procSetter,
wire.Bind(
new(collection.Processor),
new(*collection.SimpleProcessor),
),
collection.NewManagerWithProc,
)
)

func main() {

defer closer.Close()
Expand Down Expand Up @@ -77,6 +56,10 @@ func main() {
log.Debug().Msg("starting db")
var connCLoser func()
a.Manager, connCLoser, err = initProcessorManager(cfg, "default")
if err != nil {
log.Err(err).Msg("error while init wire manager")
return
}
closer.Bind(connCLoser)

log.Debug().Msg("starting web application")
Expand Down
19 changes: 19 additions & 0 deletions cmd/database/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,25 @@ import (
"github.com/polyse/database/internal/api"
)

var (
procSetter = wire.NewSet(
initDbConfig,
initConnection,
initTokenizer,
initFilters,
collection.NewSimpleProcessor,
)

dbSetter = wire.NewSet(
procSetter,
wire.Bind(
new(collection.Processor),
new(*collection.SimpleProcessor),
),
collection.NewManagerWithProc,
)
)

func initWebApp(ctx context.Context, c *config) (*api.API, func(), error) {
wire.Build(api.NewApp, initWebAppCfg)
return nil, nil, nil
Expand Down
14 changes: 8 additions & 6 deletions cmd/database/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func (v *Validator) Validate(i interface{}) error {
return v.validator.Struct(i)
}

type CollectionInfo struct {
Name string `json:"name"`
Metadata collection.Metadata `json:"metadata"`
}

// NewApp returns a new ready-to-launch API object with adjusted settings.
func NewApp(ctx context.Context, appCfg AppConfig) (*API, error) {
appCfg.checkConfig()
Expand Down Expand Up @@ -95,6 +100,8 @@ func NewApp(ctx context.Context, appCfg AppConfig) (*API, error) {
g := e.Group("/api")
g.GET("/:collection/documents", a.handleSearch)
g.POST("/:collection/documents", a.handleAddDocuments)
g.POST("/collections", a.handleAddCollection)
g.GET("/collections", a.handleGetAllCollections)

log.Debug().Msg("endpoints registered")

Expand Down Expand Up @@ -173,6 +180,36 @@ func (a *API) handleAddDocuments(c echo.Context) error {
return c.JSON(http.StatusCreated, docs)
}

func (a *API) handleAddCollection(c echo.Context) error {
cr := &CollectionInfo{}
if err := c.Bind(cr); err != nil {
log.Debug().Err(err).Msg("handleAddCollection Bind err")
return echo.ErrBadRequest
}
p, err := a.InitNewProc(cr.Name, cr.Metadata.Tokenizer, cr.Metadata.ColFilters...)
if err != nil {
return echo.ErrInternalServerError
}
a.AddProcessor(p)
return nil
}

func (a *API) handleGetAllCollections(c echo.Context) error {
r, err := a.GetAllCollectionsInfo()
if err != nil {
log.Err(err).Msg("can not get all collections")
}
var ci []CollectionInfo
for k := range r {
ci = append(ci, CollectionInfo{
Metadata: r[k],
Name: k,
})
}

return c.JSON(http.StatusOK, ci)
}

// Run start the server.
func (a *API) Run() error {
return a.e.Start(a.addr)
Expand Down
95 changes: 89 additions & 6 deletions internal/collection/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
package collection

import (
"encoding/json"
"errors"
"sync"

"github.com/polyse/database/pkg/filters"
"github.com/rs/zerolog/log"
"github.com/xujiajun/nutsdb"
"sync"
)

var (
Expand All @@ -18,16 +20,22 @@ var (
type Manager struct {
sync.RWMutex
processors map[string]Processor
db *nutsdb.DB
}

// NewManager function-constructor of Manager
func NewManager() *Manager {
return &Manager{processors: make(map[string]Processor)}
func NewManager(db *nutsdb.DB) *Manager {
m, err := loadCollections(db)
if err != nil {
log.Err(err).Msg("can not load collections")
m = make(map[string]Processor)
}
return &Manager{processors: m, db: db}
}

// NewManagerWithProc function-constructor of Manager with a given Processor.
func NewManagerWithProc(proc Processor) *Manager {
spm := NewManager()
func NewManagerWithProc(db *nutsdb.DB, proc Processor) *Manager {
spm := NewManager(db)
spm.AddProcessor(proc)
return spm
}
Expand Down Expand Up @@ -56,3 +64,78 @@ func (spm *Manager) GetProcessor(colName string) (Processor, error) {
}
return nil, ErrCollectionNotExist
}

func (spm *Manager) InitNewProc(colName string, tokenizer string, filter ...string) (Processor, error) {
return newSimpleProcessorByStrings(spm.db, Name(colName), tokenizer, filter...)
}

func (spm *Manager) GetAllCollectionsInfo() (map[string]Metadata, error) {
log.Debug().Msg("getting all collections")
collections := make(map[string][]byte)
if err := spm.db.View(func(tx *nutsdb.Tx) error {
e, err := tx.GetAll(collectionBucket)
if err != nil {
return err
}
for i := range e {
collections[string(e[i].Key)] = e[i].Value
}
return nil
}); err != nil {
return nil, err
}
log.Debug().Msg("process get all collections data")
metaMap := make(map[string]Metadata)
for i := range collections {
var p Metadata
if err := json.Unmarshal(collections[i], &p); err != nil {
return nil, err
}
metaMap[i] = p
}
log.Debug().Interface("all collections", metaMap).Msg("getting collections done")
return metaMap, nil
}

func loadCollections(db *nutsdb.DB) (map[string]Processor, error) {
log.Debug().Msg("start loading collections")
collections := make(map[string][]byte)
if err := db.View(func(tx *nutsdb.Tx) error {
e, err := tx.GetAll(collectionBucket)
if err != nil {
return err
}
for i := range e {
collections[string(e[i].Key)] = e[i].Value
}
return nil
}); err != nil {
return nil, err
}
procMap := make(map[string]Processor)
for i := range collections {
var p Metadata
if err := json.Unmarshal(collections[i], &p); err != nil {
return nil, err
}
log.Debug().Str("collection name", i).Interface("metadata", p).Msg("collection loaded")
var f []filters.Filter
for _, t := range p.ColFilters {
f = append(f, filterMap[t])
}
sp, err := NewSimpleProcessor(db, Name(i), tokenizerMap[p.Tokenizer], f...)
if err != nil {
return nil, err
}
procMap[i] = sp
}
return procMap, nil
}

var filterMap = map[string]filters.Filter{
"github.com/polyse/database/pkg/filters.StemmAndToLower": filters.StemmAndToLower,
"github.com/polyse/database/pkg/filters.StopWords": filters.StopWords,
}
var tokenizerMap = map[string]filters.Tokenizer{
"github.com/polyse/database/pkg/filters.FilterText": filters.FilterText,
}
Loading