diff --git a/cmd/database/config.go b/cmd/database/config.go index b5d7151..55e2dba 100644 --- a/cmd/database/config.go +++ b/cmd/database/config.go @@ -10,11 +10,12 @@ import ( // Config is main application configuration structure. type config struct { - Listen string `env:"LISTEN" envDefault:"localhost:9000"` - Timeout time.Duration `env:"TIMEOUT" envDefault:"10ms"` - LogLevel string `env:"LOG_LEVEL" envDefault:"info"` - LogFmt string `env:"LOG_FMT" envDefault:"console"` - DbFile string `env:"DB_FILE" envDefault:"./tmp/nutsdb"` + Listen string `env:"LISTEN" envDefault:"localhost:9000"` + Timeout time.Duration `env:"TIMEOUT" envDefault:"10ms"` + MergeTimeout time.Duration `env:"TIMEOUT" envDefault:"1h"` + LogLevel string `env:"LOG_LEVEL" envDefault:"info"` + LogFmt string `env:"LOG_FMT" envDefault:"console"` + DbFile string `env:"DB_FILE" envDefault:"./tmp/nutsdb"` } func load() (*config, error) { diff --git a/cmd/database/main.go b/cmd/database/main.go index fa59f33..6f1fb78 100644 --- a/cmd/database/main.go +++ b/cmd/database/main.go @@ -9,33 +9,14 @@ import ( "github.com/xujiajun/nutsdb" "os" "strings" + "time" - "github.com/google/wire" "github.com/polyse/database/internal/collection" "github.com/rs/zerolog/log" "github.com/xlab/closer" ) -var ( - procSetter = wire.NewSet( - initDbConfig, - initConnection, - initTokenizer, - initFilters, - collection.NewSimpleProcessor, - ) - - dbSetter = wire.NewSet( - procSetter, - wire.Bind( - new(collection.Processor), - new(*collection.SimpleProcessor), - ), - collection.NewManagerWithProc, - ) -) - func main() { defer closer.Close() @@ -77,6 +58,10 @@ func main() { log.Debug().Msg("starting db") var connCLoser func() a.Manager, connCLoser, err = initProcessorManager(cfg, "default") + if err != nil { + log.Err(err).Msg("error while init wire manager") + return + } closer.Bind(connCLoser) log.Debug().Msg("starting web application") @@ -105,7 +90,24 @@ func initConnection(cfg collection.Config) (*nutsdb.DB, func(), error) { return nil, nil, err } log.Info().Msg("connection opened") + + ctx, cancel := context.WithCancel(context.Background()) + + go func(ctx context.Context, db *nutsdb.DB) { + for { + select { + case <-time.After(cfg.MergeTimeout): + if err = db.Merge(); err != nil { + log.Warn().Err(err).Msg("can not merge database") + } + case <-ctx.Done(): + return + } + } + }(ctx, nutsDb) + return nutsDb, func() { + cancel() log.Info().Msg("start closing database connection") if err = nutsDb.Merge(); err != nil { log.Err(err).Msg("can not merge database") @@ -117,7 +119,7 @@ func initConnection(cfg collection.Config) (*nutsdb.DB, func(), error) { } func initDbConfig(c *config) collection.Config { - return collection.Config{File: c.DbFile} + return collection.Config{File: c.DbFile, MergeTimeout: c.MergeTimeout} } func initWebAppCfg(c *config) (api.AppConfig, error) { diff --git a/cmd/database/wire.go b/cmd/database/wire.go index 0e5cfef..fdb2a7c 100644 --- a/cmd/database/wire.go +++ b/cmd/database/wire.go @@ -11,6 +11,25 @@ import ( "github.com/polyse/database/internal/api" ) +var ( + procSetter = wire.NewSet( + initDbConfig, + initConnection, + initTokenizer, + initFilters, + collection.NewSimpleProcessor, + ) + + dbSetter = wire.NewSet( + procSetter, + wire.Bind( + new(collection.Processor), + new(*collection.SimpleProcessor), + ), + collection.NewManagerWithProc, + ) +) + func initWebApp(ctx context.Context, c *config) (*api.API, func(), error) { wire.Build(api.NewApp, initWebAppCfg) return nil, nil, nil diff --git a/internal/collection/proc.go b/internal/collection/proc.go index 93e3866..424f8e0 100644 --- a/internal/collection/proc.go +++ b/internal/collection/proc.go @@ -40,7 +40,8 @@ type SimpleProcessor struct { // Config describes the basic database configuration. type Config struct { - File string + File string + MergeTimeout time.Duration } // Source structure for domain\article\site\source description