diff --git a/cmd/database/main.go b/cmd/database/main.go index fa59f33..f27c74e 100644 --- a/cmd/database/main.go +++ b/cmd/database/main.go @@ -4,38 +4,17 @@ import ( "context" "fmt" "github.com/polyse/database/internal/api" + "github.com/polyse/database/internal/collection" "github.com/polyse/database/pkg/filters" "github.com/rs/zerolog" "github.com/xujiajun/nutsdb" "os" "strings" - "github.com/google/wire" - "github.com/polyse/database/internal/collection" - "github.com/rs/zerolog/log" "github.com/xlab/closer" ) -var ( - procSetter = wire.NewSet( - initDbConfig, - initConnection, - initTokenizer, - initFilters, - collection.NewSimpleProcessor, - ) - - dbSetter = wire.NewSet( - procSetter, - wire.Bind( - new(collection.Processor), - new(*collection.SimpleProcessor), - ), - collection.NewManagerWithProc, - ) -) - func main() { defer closer.Close() @@ -77,6 +56,10 @@ func main() { log.Debug().Msg("starting db") var connCLoser func() a.Manager, connCLoser, err = initProcessorManager(cfg, "default") + if err != nil { + log.Err(err).Msg("error while init wire manager") + return + } closer.Bind(connCLoser) log.Debug().Msg("starting web application") diff --git a/cmd/database/wire.go b/cmd/database/wire.go index 0e5cfef..fdb2a7c 100644 --- a/cmd/database/wire.go +++ b/cmd/database/wire.go @@ -11,6 +11,25 @@ import ( "github.com/polyse/database/internal/api" ) +var ( + procSetter = wire.NewSet( + initDbConfig, + initConnection, + initTokenizer, + initFilters, + collection.NewSimpleProcessor, + ) + + dbSetter = wire.NewSet( + procSetter, + wire.Bind( + new(collection.Processor), + new(*collection.SimpleProcessor), + ), + collection.NewManagerWithProc, + ) +) + func initWebApp(ctx context.Context, c *config) (*api.API, func(), error) { wire.Build(api.NewApp, initWebAppCfg) return nil, nil, nil diff --git a/cmd/database/wire_gen.go b/cmd/database/wire_gen.go index c2c60a0..cd3c1d5 100644 --- a/cmd/database/wire_gen.go +++ b/cmd/database/wire_gen.go @@ -7,7 +7,6 @@ package main import ( "context" - "github.com/polyse/database/internal/api" "github.com/polyse/database/internal/collection" ) @@ -19,12 +18,11 @@ func initWebApp(ctx context.Context, c *config) (*api.API, func(), error) { if err != nil { return nil, nil, err } - app, err := api.NewApp(ctx, appConfig) + apiAPI, err := api.NewApp(ctx, appConfig) if err != nil { return nil, nil, err } - return app, func() { - app.Close() + return apiAPI, func() { }, nil } @@ -36,8 +34,12 @@ func initProcessorManager(c *config, collName collection.Name) (*collection.Mana } tokenizer := initTokenizer() v := initFilters() - simpleProcessor := collection.NewSimpleProcessor(db, collName, tokenizer, v...) - manager := collection.NewManagerWithProc(simpleProcessor) + simpleProcessor, err := collection.NewSimpleProcessor(db, collName, tokenizer, v...) + if err != nil { + cleanup() + return nil, nil, err + } + manager := collection.NewManagerWithProc(db, simpleProcessor) return manager, func() { cleanup() }, nil diff --git a/internal/api/api.go b/internal/api/api.go index da1f400..6e9425b 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -65,6 +65,11 @@ func (v *Validator) Validate(i interface{}) error { return v.validator.Struct(i) } +type CollectionInfo struct { + Name string `json:"name"` + Metadata collection.Metadata `json:"metadata"` +} + // NewApp returns a new ready-to-launch API object with adjusted settings. func NewApp(ctx context.Context, appCfg AppConfig) (*API, error) { appCfg.checkConfig() @@ -95,6 +100,8 @@ func NewApp(ctx context.Context, appCfg AppConfig) (*API, error) { g := e.Group("/api") g.GET("/:collection/documents", a.handleSearch) g.POST("/:collection/documents", a.handleAddDocuments) + g.POST("/collections", a.handleAddCollection) + g.GET("/collections", a.handleGetAllCollections) log.Debug().Msg("endpoints registered") @@ -173,6 +180,36 @@ func (a *API) handleAddDocuments(c echo.Context) error { return c.JSON(http.StatusCreated, docs) } +func (a *API) handleAddCollection(c echo.Context) error { + cr := &CollectionInfo{} + if err := c.Bind(cr); err != nil { + log.Debug().Err(err).Msg("handleAddCollection Bind err") + return echo.ErrBadRequest + } + p, err := a.InitNewProc(cr.Name, cr.Metadata.Tokenizer, cr.Metadata.ColFilters...) + if err != nil { + return echo.ErrInternalServerError + } + a.AddProcessor(p) + return nil +} + +func (a *API) handleGetAllCollections(c echo.Context) error { + r, err := a.GetAllCollectionsInfo() + if err != nil { + log.Err(err).Msg("can not get all collections") + } + var ci []CollectionInfo + for k := range r { + ci = append(ci, CollectionInfo{ + Metadata: r[k], + Name: k, + }) + } + + return c.JSON(http.StatusOK, ci) +} + // Run start the server. func (a *API) Run() error { return a.e.Start(a.addr) diff --git a/internal/collection/manager.go b/internal/collection/manager.go index 17d7318..1abd0c6 100644 --- a/internal/collection/manager.go +++ b/internal/collection/manager.go @@ -3,10 +3,12 @@ package collection import ( + "encoding/json" "errors" - "sync" - + "github.com/polyse/database/pkg/filters" "github.com/rs/zerolog/log" + "github.com/xujiajun/nutsdb" + "sync" ) var ( @@ -18,16 +20,22 @@ var ( type Manager struct { sync.RWMutex processors map[string]Processor + db *nutsdb.DB } // NewManager function-constructor of Manager -func NewManager() *Manager { - return &Manager{processors: make(map[string]Processor)} +func NewManager(db *nutsdb.DB) *Manager { + m, err := loadCollections(db) + if err != nil { + log.Err(err).Msg("can not load collections") + m = make(map[string]Processor) + } + return &Manager{processors: m, db: db} } // NewManagerWithProc function-constructor of Manager with a given Processor. -func NewManagerWithProc(proc Processor) *Manager { - spm := NewManager() +func NewManagerWithProc(db *nutsdb.DB, proc Processor) *Manager { + spm := NewManager(db) spm.AddProcessor(proc) return spm } @@ -56,3 +64,78 @@ func (spm *Manager) GetProcessor(colName string) (Processor, error) { } return nil, ErrCollectionNotExist } + +func (spm *Manager) InitNewProc(colName string, tokenizer string, filter ...string) (Processor, error) { + return newSimpleProcessorByStrings(spm.db, Name(colName), tokenizer, filter...) +} + +func (spm *Manager) GetAllCollectionsInfo() (map[string]Metadata, error) { + log.Debug().Msg("getting all collections") + collections := make(map[string][]byte) + if err := spm.db.View(func(tx *nutsdb.Tx) error { + e, err := tx.GetAll(collectionBucket) + if err != nil { + return err + } + for i := range e { + collections[string(e[i].Key)] = e[i].Value + } + return nil + }); err != nil { + return nil, err + } + log.Debug().Msg("process get all collections data") + metaMap := make(map[string]Metadata) + for i := range collections { + var p Metadata + if err := json.Unmarshal(collections[i], &p); err != nil { + return nil, err + } + metaMap[i] = p + } + log.Debug().Interface("all collections", metaMap).Msg("getting collections done") + return metaMap, nil +} + +func loadCollections(db *nutsdb.DB) (map[string]Processor, error) { + log.Debug().Msg("start loading collections") + collections := make(map[string][]byte) + if err := db.View(func(tx *nutsdb.Tx) error { + e, err := tx.GetAll(collectionBucket) + if err != nil { + return err + } + for i := range e { + collections[string(e[i].Key)] = e[i].Value + } + return nil + }); err != nil { + return nil, err + } + procMap := make(map[string]Processor) + for i := range collections { + var p Metadata + if err := json.Unmarshal(collections[i], &p); err != nil { + return nil, err + } + log.Debug().Str("collection name", i).Interface("metadata", p).Msg("collection loaded") + var f []filters.Filter + for _, t := range p.ColFilters { + f = append(f, filterMap[t]) + } + sp, err := NewSimpleProcessor(db, Name(i), tokenizerMap[p.Tokenizer], f...) + if err != nil { + return nil, err + } + procMap[i] = sp + } + return procMap, nil +} + +var filterMap = map[string]filters.Filter{ + "github.com/polyse/database/pkg/filters.StemmAndToLower": filters.StemmAndToLower, + "github.com/polyse/database/pkg/filters.StopWords": filters.StopWords, +} +var tokenizerMap = map[string]filters.Tokenizer{ + "github.com/polyse/database/pkg/filters.FilterText": filters.FilterText, +} diff --git a/internal/collection/manager_test.go b/internal/collection/manager_test.go index aa4ed41..c6b2fc5 100644 --- a/internal/collection/manager_test.go +++ b/internal/collection/manager_test.go @@ -1,11 +1,12 @@ package collection import ( + "github.com/xujiajun/nutsdb" + "os" "testing" "github.com/polyse/database/pkg/filters" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) @@ -44,9 +45,10 @@ func TestSimpleProcessor_GetCollectionName(t *testing.T) { type processorManagerTestSuite struct { suite.Suite - prm *Manager - tr *MockProcessor - tr2 *MockProcessor + prm *Manager + tr *SimpleProcessor + tr2 *SimpleProcessor + nutsDb *nutsdb.DB } func TestStartProcessorManagerSuit(t *testing.T) { @@ -54,104 +56,69 @@ func TestStartProcessorManagerSuit(t *testing.T) { } func (pts *processorManagerTestSuite) SetupTest() { - testProc := new(MockProcessor) - - testProc. - On("ProcessAndInsertString", mock.Anything, mock.Anything). - Return(nil). - On("GetCollectionName"). - Return("testCollection") - - testProc2 := new(MockProcessor) - testProc2. - On("ProcessAndInsertString", mock.Anything, mock.Anything). - Return(nil). - On("GetCollectionName"). - Return("secondTestCollection") + opt := nutsdb.DefaultOptions + opt.Dir = dbDir + nutsDb, err := nutsdb.Open(opt) + if err != nil { + panic(err) + } + pts.nutsDb = nutsDb + testProc, err := NewSimpleProcessor(nutsDb, Name("test1"), filters.FilterText, filters.StemmAndToLower) + if err != nil { + panic(err) + } + testProc2, err := NewSimpleProcessor(nutsDb, Name("test2"), filters.FilterText, filters.StopWords) + if err != nil { + panic(err) + } - pts.prm = NewManagerWithProc(testProc) + pts.prm = NewManagerWithProc(nutsDb, testProc) pts.prm.AddProcessor(testProc2) pts.tr = testProc pts.tr2 = testProc2 } -func (pts *processorManagerTestSuite) TestSimpleProcessorManager_AddProcessors() { +func (pts *processorManagerTestSuite) TearDownSuite() { + _ = pts.nutsDb.Close() + if err := os.RemoveAll(dbDir); err != nil { + panic(err) + } +} + +func (pts *processorManagerTestSuite) TestSimpleProcessorManager_LoadCollection() { + testNormalProc, err := NewSimpleProcessor(pts.nutsDb, "test", filters.FilterText) + if err != nil { + panic(err) + } pts.Len(pts.prm.processors, 2) - pts.prm.AddProcessor( - NewSimpleProcessor( - nil, - "testCollection3", - filters.FilterText, - filters.StemmAndToLower, - filters.StopWords, - ), - ) + pts.prm.AddProcessor(testNormalProc) pts.Len(pts.prm.processors, 3) + if err := pts.nutsDb.Close(); err != nil { + panic(err) + } + opt := nutsdb.DefaultOptions + opt.Dir = dbDir + nutsDb, err := nutsdb.Open(opt) + pts.NoError(err) + man := NewManager(nutsDb) + p, err := man.GetProcessor("test") + pts.NoError(err) + pts.IsType(testNormalProc, p) + pts.Equal(testNormalProc.colName, p.GetCollectionName()) + meta, err := man.GetAllCollectionsInfo() + pts.NoError(err) + pts.Len(meta, 3) + _, ok := meta["test1"] + pts.True(ok) + _, ok = meta["test2"] + pts.True(ok) } func (pts *processorManagerTestSuite) TestSimpleProcessorManager_GetProcessor() { p, err := pts.prm.GetProcessor( - "testCollection", + "test1", ) pts.NoError(err) - pts.NoError(p.ProcessAndInsertString([]RawData{{Url: "test", Data: "data"}})) - pts.tr.AssertCalled(pts.T(), "ProcessAndInsertString", []RawData{{Url: "test", Data: "data"}}) - pts.tr2.AssertNotCalled(pts.T(), "ProcessAndInsertString", mock.Anything, mock.Anything) -} - -// Processor is an autogenerated mock type for the Processor type -type MockProcessor struct { - mock.Mock -} - -// GetCollectionName provides a mock function with given fields: -func (_m *MockProcessor) GetCollectionName() string { - ret := _m.Called() - - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - -// ProcessAndGet provides a mock function with given fields: query -func (_m *MockProcessor) ProcessAndGet(query string, limit, offset int) ([]ResponseData, error) { - ret := _m.Called(query, limit, offset) - - var r0 []ResponseData - if rf, ok := ret.Get(0).(func(string, int, int) []ResponseData); ok { - r0 = rf(query, limit, offset) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]ResponseData) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(string, int, int) error); ok { - r1 = rf(query, limit, offset) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ProcessAndInsertString provides a mock function with given fields: data -func (_m *MockProcessor) ProcessAndInsertString(data []RawData) error { - ret := _m.Called(data) - - var r0 error - if rf, ok := ret.Get(0).(func([]RawData) error); ok { - r0 = rf(data) - } else { - r0 = ret.Error(0) - } - - return r0 + pts.Equal("test1", p.GetCollectionName()) } diff --git a/internal/collection/proc.go b/internal/collection/proc.go index 93e3866..bf3b5f1 100644 --- a/internal/collection/proc.go +++ b/internal/collection/proc.go @@ -3,7 +3,10 @@ package collection import ( "bytes" "encoding/gob" + "encoding/json" "fmt" + "reflect" + "runtime" "sort" "strings" "sync" @@ -16,8 +19,9 @@ import ( ) var ( - dataPrefix = "d-" - sourceBucket = "sources" + dataPrefix = "d-" + sourceBucket = "sources" + collectionBucket = "collections" ) // Processor an interface designed to process and filter incoming data for subsequent @@ -70,20 +74,79 @@ type WordInfo struct { // Name is type to describe collection name in database type Name string +type Metadata struct { + ColFilters []string `json:"col_filters"` + Tokenizer string `json:"tokenizer"` +} + // NewProcessor function-constructor to SimpleProcessor func NewSimpleProcessor( db *nutsdb.DB, colName Name, tokenizer filters.Tokenizer, textFilters ...filters.Filter, -) *SimpleProcessor { +) (*SimpleProcessor, error) { + return newSimpleProcessorUtil(db, colName, tokenizer, Metadata{ + ColFilters: getFilterNames(textFilters), + Tokenizer: getTokenizerName(tokenizer), + }, textFilters...) +} + +func newSimpleProcessorByStrings( + db *nutsdb.DB, + colName Name, + tokenizer string, + textFilters ...string, +) (*SimpleProcessor, error) { + log.Debug(). + Str("tokenizer name", tokenizer). + Strs("filters name", textFilters). + Msg("init processor from string tokenizer and filters") + t, ok := tokenizerMap[tokenizer] + if !ok { + return nil, fmt.Errorf("incorrect tokenizer name: %s", tokenizer) + } + var f []filters.Filter + for _, fn := range textFilters { + tmp, ok := filterMap[fn] + if !ok { + return nil, fmt.Errorf("incorrect filter name: %s", fn) + } + f = append(f, tmp) + } + return newSimpleProcessorUtil(db, colName, t, Metadata{ + ColFilters: textFilters, + Tokenizer: tokenizer, + }, f...) +} + +func newSimpleProcessorUtil( + db *nutsdb.DB, + colName Name, + tokenizer filters.Tokenizer, + cm Metadata, + textFilters ...filters.Filter, +) (*SimpleProcessor, error) { + log.Debug().Interface("metadata", cm).Msg("init proc with metadata") + rawMeta, err := json.Marshal(cm) + if err != nil { + return nil, err + } + if err := db.Update(func(tx *nutsdb.Tx) error { + if err := tx.Put(collectionBucket, []byte(colName), rawMeta, 0); err != nil { + return err + } + return nil + }); err != nil { + return nil, err + } return &SimpleProcessor{ db: db, filters: textFilters, tokenizer: tokenizer, colName: string(colName), bucketName: dataPrefix + string(colName), - } + }, nil } // ProcessAndInsertString changes the input data using the filters specified in this processor, @@ -372,3 +435,15 @@ func (p *SimpleProcessor) saveData(ent map[string][]*WordInfo) error { return nil }) } + +func getFilterNames(fs []filters.Filter) []string { + var res []string + for _, f := range fs { + res = append(res, runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()) + } + return res +} + +func getTokenizerName(t filters.Tokenizer) string { + return runtime.FuncForPC(reflect.ValueOf(t).Pointer()).Name() +} diff --git a/internal/collection/proc_test.go b/internal/collection/proc_test.go index 2eea5e0..c6acc58 100644 --- a/internal/collection/proc_test.go +++ b/internal/collection/proc_test.go @@ -35,13 +35,16 @@ func (cts *processorTestSuite) SetupTest() { if err != nil { panic(err) } - proc := NewSimpleProcessor( + proc, err := NewSimpleProcessor( nutsDb, Name(nutColl), filters.FilterText, filters.StemmAndToLower, filters.StopWords, ) + if err != nil { + panic(err) + } cts.proc = proc cts.nutsDb = nutsDb }