Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 90 additions & 25 deletions cmd/tsbs_run_queries_iotdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ var (
usingGroupByApi bool // if using group by api when executing query
singleDatabase bool // if using single database, e.g. only one database: root.db. root.db.cpu, root.db.mem belongs to this databse
useAlignedTimeseries bool // using aligned timeseries if set true.
sessionPoolSize int
)

// Global vars:
var (
runner *query.BenchmarkRunner
runner *query.BenchmarkRunner
sessionPool client.SessionPool
)

// Parse args:
Expand All @@ -41,6 +43,7 @@ func init() {
pflag.Bool("use-groupby", false, "Whether to use group by api")
pflag.Bool("single-database", false, "Whether to use single database")
pflag.Bool("aligned-timeseries", false, "Whether to use aligned time series")
pflag.Uint("session-pool-size", 0, "Session pool size")

pflag.Parse()

Expand All @@ -62,9 +65,10 @@ func init() {
usingGroupByApi = viper.GetBool("use-groupby")
singleDatabase = viper.GetBool("single-database")
useAlignedTimeseries = viper.GetBool("aligned-timeseries")
sessionPoolSize = viper.GetInt("session-pool-size")
timeoutInMs = 0

log.Printf("tsbs_run_queries_iotdb target: %s:%s. Loading with %d workers.\n", host, port, workers)
log.Printf("tsbs_run_queries_iotdb target: %s:%s. Loading with %d workers. session-pool-size: %d\n", host, port, workers, sessionPoolSize)
if workers < 5 {
log.Println("Insertion throughput is strongly related to the number of threads. Use more workers for better performance.")
}
Expand All @@ -76,6 +80,16 @@ func init() {
Password: password,
}

if sessionPoolSize > 0 {
poolConfig := &client.PoolConfig{
Host: host,
Port: port,
UserName: user,
Password: password,
}
sessionPool = client.NewSessionPool(poolConfig, sessionPoolSize, 60000, 60000, false)
}

runner = query.NewBenchmarkRunner(config)
}

Expand All @@ -92,16 +106,33 @@ type processor struct {
func newProcessor() query.Processor { return &processor{} }

func (p *processor) Init(workerNumber int) {
p.session = client.NewSession(&clientConfig)
p.printResponses = runner.DoPrintResponses()
if err := p.session.Open(false, int(timeoutInMs)); err != nil {
errMsg := fmt.Sprintf("query processor init error, session is not open: %v\n", err)
errMsg = errMsg + fmt.Sprintf("timeout setting: %d ms", timeoutInMs)
log.Fatal(errMsg)
}
_, err := p.session.ExecuteStatement("flush")
if err != nil {
log.Fatal(fmt.Sprintf("flush meets error: %v\n", err))

if sessionPoolSize <= 0 {
p.session = client.NewSession(&clientConfig)
if err := p.session.Open(false, int(timeoutInMs)); err != nil {
errMsg := fmt.Sprintf("query processor init error, session is not open: %v\n", err)
errMsg = errMsg + fmt.Sprintf("timeout setting: %d ms", timeoutInMs)
log.Fatal(errMsg)
}
if workerNumber == 0 {
_, err := p.session.ExecuteStatement("flush")
if err != nil {
log.Fatal(fmt.Sprintf("flush meets error: %v\n", err))
}
}
} else {
session, err := sessionPool.GetSession()
if err != nil {
log.Fatal(fmt.Sprintf("flush meets error: %v\n", err))
}
if workerNumber == 0 {
_, err = session.ExecuteStatement("flush")
if err != nil {
log.Fatal(fmt.Sprintf("flush meets error: %v\n", err))
}
}
sessionPool.PutBack(session)
}
}

Expand All @@ -116,16 +147,30 @@ func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) {
var legalNodes = true
var err error

start := time.Now().UnixNano()
start := time.Now()
if startTimeInMills > 0 {
if usingGroupByApi {
idx := strings.LastIndex(aggregatePaths[0], ".")
device := aggregatePaths[0][:idx]
measurement := aggregatePaths[0][idx+1:]
splits := strings.Split(aggregatePaths[0], ".")
db := splits[0] + "." + splits[1]
device := strings.Join(splits[:len(splits)-1], ".")
measurement := splits[len(splits)-1]
dataSet, err = p.session.ExecuteGroupByQueryIntervalQuery(&db, device, measurement,
common.TAggregationType_MAX_VALUE, 1,
&startTimeInMills, &endTimeInMills, &interval, &timeoutInMs, &useAlignedTimeseries)
var err error
if sessionPoolSize > 0 {
session, err := sessionPool.GetSession()
if err == nil {
dataSet, err = session.ExecuteGroupByQueryIntervalQuery(&db, device, measurement,
common.TAggregationType_MAX_VALUE, 1,
&startTimeInMills, &endTimeInMills, &interval, &timeoutInMs, &useAlignedTimeseries)
} else {
log.Printf("Get session meets error.\n")
}
sessionPool.PutBack(session)
} else {
dataSet, err = p.session.ExecuteGroupByQueryIntervalQuery(&db, device, measurement,
common.TAggregationType_MAX_VALUE, 1,
&startTimeInMills, &endTimeInMills, &interval, &timeoutInMs, &useAlignedTimeseries)
}

if err != nil {
fmt.Printf("ExecuteGroupByQueryIntervalQuery meets error, "+
Expand All @@ -142,9 +187,21 @@ func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) {
}
}
} else {
dataSet, err = p.session.ExecuteAggregationQueryWithLegalNodes(aggregatePaths,
[]common.TAggregationType{common.TAggregationType_MAX_VALUE},
&startTimeInMills, &endTimeInMills, &interval, &timeoutInMs, &legalNodes)
if sessionPoolSize > 0 {
session, err := sessionPool.GetSession()
if err == nil {
dataSet, err = session.ExecuteAggregationQueryWithLegalNodes(aggregatePaths,
[]common.TAggregationType{common.TAggregationType_MAX_VALUE},
&startTimeInMills, &endTimeInMills, &interval, &timeoutInMs, &legalNodes)
} else {
log.Printf("Get session meets error.\n")
}
sessionPool.PutBack(session)
} else {
dataSet, err = p.session.ExecuteAggregationQueryWithLegalNodes(aggregatePaths,
[]common.TAggregationType{common.TAggregationType_MAX_VALUE},
&startTimeInMills, &endTimeInMills, &interval, &timeoutInMs, &legalNodes)
}

if err != nil {
fmt.Printf("ExecuteAggregationQueryWithLegalNodes meets error, "+
Expand All @@ -161,19 +218,27 @@ func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) {
}
}
} else {
dataSet, err = p.session.ExecuteQueryStatement(sql, &timeoutInMs)
if sessionPoolSize > 0 {
session, err := sessionPool.GetSession()
if err == nil {
dataSet, err = session.ExecuteQueryStatement(sql, &timeoutInMs)
} else {
log.Printf("Get session meets error.\n")
}
sessionPool.PutBack(session)
} else {
dataSet, err = p.session.ExecuteQueryStatement(sql, &timeoutInMs)
}
}

if err != nil {
log.Printf("An error occurred while executing query SQL: %s\n", iotdbQ.SqlQuery)
return nil, err
}

took := time.Now().UnixNano() - start

lag := float64(took) / float64(time.Millisecond) // in milliseconds
took := float64(time.Since(start).Nanoseconds()) / 1e6
stat := query.GetStat()
stat.Init(q.HumanLabelName(), lag)
stat.Init(q.HumanLabelName(), took)
return []*query.Stat{stat}, err
}

Expand Down
Loading