diff --git a/CHANGELOG.md b/CHANGELOG.md index 42746418..49379f32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. * New types for MessagePack extensions compatible with go-option (#459). * Added `box.MustNew` wrapper for `box.New` without an error (#448). +* Added Future.cond (sync.Cond) and Future.finished bool. Added Future.finish() marks Future as done (#496). ### Changed @@ -23,8 +24,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. * Removed deprecated `box.session.push()` support: Future.AppendPush() and Future.GetIterator() methods, ResponseIterator and TimeoutResponseIterator types, Future.pushes[], Future.ready (#480, #497). -* `LogAppendPushFailed` replaced with `LogBoxSessionPushUnsupported` (#480) -* Removed deprecated `Connection` methods, related interfaces and tests are updated (#479) +* `LogAppendPushFailed` replaced with `LogBoxSessionPushUnsupported` (#480). +* Removed deprecated `Connection` methods, related interfaces and tests are updated (#479). +* Future.done replaced with Future.cond (sync.Cond) + Future.finished bool (#496). ### Fixed diff --git a/MIGRATION.md b/MIGRATION.md index 0065f0b7..2a31878e 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -15,6 +15,7 @@ TODO * Removed `box.session.push()` support: Future.AppendPush() and Future.GetIterator() methods, ResponseIterator and TimeoutResponseIterator types. * Removed deprecated `Connection` methods, related interfaces and tests are updated. + *NOTE*: due to Future.GetTyped() doesn't decode SelectRequest into structure, substitute Connection.GetTyped() following the example: ```Go var singleTpl = Tuple{} @@ -30,6 +31,7 @@ TODO ).GetTyped(&tpl) singleTpl := tpl[0] ``` +* Future.done replaced with Future.cond (sync.Cond) + Future.finished bool. ## Migration from v1.x.x to v2.x.x diff --git a/connection.go b/connection.go index 3525bfda..8471643d 100644 --- a/connection.go +++ b/connection.go @@ -934,7 +934,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { ErrRateLimited, "Request is rate limited on client", } - fut.done = nil + fut.finish() return } } @@ -948,7 +948,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { ErrConnectionClosed, "using closed connection", } - fut.done = nil + fut.finish() shard.rmut.Unlock() return case connDisconnected: @@ -956,7 +956,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { ErrConnectionNotReady, "client connection is not ready", } - fut.done = nil + fut.finish() shard.rmut.Unlock() return case connShutdown: @@ -964,7 +964,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { ErrConnectionShutdown, "server shutdown in progress", } - fut.done = nil + fut.finish() shard.rmut.Unlock() return } @@ -993,7 +993,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { runtime.Gosched() select { case conn.rlimit <- struct{}{}: - case <-fut.done: + case <-fut.WaitChan(): if fut.err == nil { panic("fut.done is closed, but err is nil") } @@ -1007,12 +1007,12 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { // is "done" before the response is come. func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) { select { - case <-fut.done: + case <-fut.WaitChan(): case <-ctx.Done(): } select { - case <-fut.done: + case <-fut.WaitChan(): return default: conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w", @@ -1034,7 +1034,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future { conn.incrementRequestCnt() fut := conn.newFuture(req) - if fut.done == nil { + if fut.isDone() { conn.decrementRequestCnt() return fut } @@ -1057,12 +1057,12 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) { shardn := fut.requestId & (conn.opts.Concurrency - 1) shard := &conn.shard[shardn] shard.bufmut.Lock() - select { - case <-fut.done: + + if fut.isDone() { shard.bufmut.Unlock() return - default: } + firstWritten := shard.buf.Len() == 0 if shard.buf.Cap() == 0 { shard.buf.b = make([]byte, 0, 128) diff --git a/future.go b/future.go index 0f882014..17055ff6 100644 --- a/future.go +++ b/future.go @@ -3,6 +3,7 @@ package tarantool import ( "io" "sync" + "sync/atomic" "time" ) @@ -15,32 +16,43 @@ type Future struct { mutex sync.Mutex resp Response err error + cond sync.Cond + finished atomic.Bool done chan struct{} } func (fut *Future) wait() { - if fut.done == nil { - return + fut.mutex.Lock() + defer fut.mutex.Unlock() + + for !fut.isDone() { + fut.cond.Wait() } - <-fut.done } func (fut *Future) isDone() bool { - if fut.done == nil { - return true - } - select { - case <-fut.done: - return true - default: - return false + return fut.finished.Load() +} + +func (fut *Future) finish() { + fut.mutex.Lock() + defer fut.mutex.Unlock() + + fut.finished.Store(true) + + if fut.done != nil { + close(fut.done) } + + fut.cond.Broadcast() } // NewFuture creates a new empty Future for a given Request. func NewFuture(req Request) (fut *Future) { fut = &Future{} - fut.done = make(chan struct{}) + fut.done = nil + fut.finished.Store(false) + fut.cond = *sync.NewCond(&fut.mutex) fut.req = req return fut } @@ -60,7 +72,14 @@ func (fut *Future) SetResponse(header Header, body io.Reader) error { } fut.resp = resp - close(fut.done) + fut.finished.Store(true) + + if fut.done != nil { + close(fut.done) + } + + fut.cond.Broadcast() + return nil } @@ -74,7 +93,13 @@ func (fut *Future) SetError(err error) { } fut.err = err - close(fut.done) + fut.finished.Store(true) + + if fut.done != nil { + close(fut.done) + } + + fut.cond.Broadcast() } // GetResponse waits for Future to be filled and returns Response and error. @@ -122,8 +147,16 @@ func init() { // WaitChan returns channel which becomes closed when response arrived or error occurred. func (fut *Future) WaitChan() <-chan struct{} { - if fut.done == nil { + fut.mutex.Lock() + defer fut.mutex.Unlock() + + if fut.isDone() { return closedChan } + + if fut.done == nil { + fut.done = make(chan struct{}) + } + return fut.done }