diff --git a/targets/spiperf/fsm/client.go b/targets/spiperf/fsm/client.go index 93cbbbb7b..f42a8d827 100644 --- a/targets/spiperf/fsm/client.go +++ b/targets/spiperf/fsm/client.go @@ -13,8 +13,6 @@ import ( "github.com/go-openapi/strfmt/conv" ) -const TimeFormatString = time.RFC3339Nano - // Client FSM that handles client mode. type Client struct { // PeerCmdOut sends commands to the peer's Server FSM. @@ -293,7 +291,7 @@ Done: case msg.PeerDisconnectLocalType, msg.PeerDisconnectRemoteType: // Return an error here. Peer should not have disconnected. - err := processUnexpectedPeerDisconnect(notif) + err := c.processUnexpectedPeerDisconnect(notif) return (*Client).cleanup, err default: @@ -408,7 +406,7 @@ Done: case msg.PeerDisconnectLocalType, msg.PeerDisconnectRemoteType: // Return an error here. Peer should not have disconnected. - err := processUnexpectedPeerDisconnect(notif) + err := c.processUnexpectedPeerDisconnect(notif) return (*Client).cleanup, err default: @@ -616,7 +614,7 @@ func (c *Client) waitForPeerResponse(expectedType string) (*msg.Message, error) switch notif.Type { case msg.PeerDisconnectLocalType, msg.PeerDisconnectRemoteType: - return nil, processUnexpectedPeerDisconnect(notif) + return nil, c.processUnexpectedPeerDisconnect(notif) case msg.StatsNotificationType: // Sometimes upstream-only tests can have an in-flight stats notification from @@ -666,7 +664,7 @@ func (c *Client) handleStatsNotification(notif *msg.Message) error { return nil } -func processUnexpectedPeerDisconnect(notif *msg.Message) error { +func (c *Client) processUnexpectedPeerDisconnect(notif *msg.Message) error { switch notif.Type { case msg.PeerDisconnectLocalType: diff --git a/targets/spiperf/fsm/const.go b/targets/spiperf/fsm/const.go index 6062b0f92..39436af85 100644 --- a/targets/spiperf/fsm/const.go +++ b/targets/spiperf/fsm/const.go @@ -8,4 +8,9 @@ const ( DefaultStartDelay = 3 * time.Second DefaultStatsPollInterval = 1 * time.Second DefaultGeneratorPollInterval = 1 * time.Second + TimeFormatString = time.RFC3339Nano + + // MaximumStartTimeDelta maximum time in the future to start traffic. + // This value is somewhat arbitrary. + MaximumStartTimeDelta = 3 * time.Minute ) diff --git a/targets/spiperf/fsm/server.go b/targets/spiperf/fsm/server.go new file mode 100644 index 000000000..f0b5afa70 --- /dev/null +++ b/targets/spiperf/fsm/server.go @@ -0,0 +1,983 @@ +package fsm + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" + + "github.com/Spirent/openperf/targets/spiperf/msg" + "github.com/Spirent/openperf/targets/spiperf/openperf" + "github.com/rs/zerolog" +) + +// Server FSM that handles server mode. +type Server struct { + // PeerCmdIn receives commands from the peer's Client FSM. + PeerCmdIn <-chan *msg.Message + + // PeerRespOut sends responses to the peer's Client FSM. + PeerRespOut chan<- *msg.Message + + // PeerNotifOut sends notifications to the peer's Client FSM. + PeerNotifOut chan<- *msg.Message + + // OpenperfCmdOut sends commands to Openperf. + OpenperfCmdOut chan<- *openperf.Command + + // PeerTimeout specifies the maximum time the FSM will wait for responses from the peer's Server FSM. + PeerTimeout time.Duration + + // OpenperfTimeout specifies the maximum time the FSM will wait for responses from Openperf. + OpenperfTimeout time.Duration + + // StartDelay added to start time so server and client start traffic generation and analysis at the same time. + StartDelay time.Duration + + // TestConfiguration keeps track of the test configuration. + TestConfiguration Configuration + + // StatsPollInterval how often to poll local Openperf statistics. This does not impact results output. + StatsPollInterval time.Duration + + // GeneratorPollInterval how often to poll local Openperf generator resource to see if it's still transmitting. + GeneratorPollInterval time.Duration + + // Logger log output facility + Logger *zerolog.Logger + + // state tracks the state machine's current state as a string. + state atomic.Value + + // startTime is the time at which traffic will begin transmitting. + startTime time.Time + + // errorAfterCleanup should the FSM terminate after the cleanup state? + errorAfterCleanup bool +} + +func (s *Server) enter(state string) { + s.Logger.Trace().Msg(state) + s.state.Store(state) +} + +func (s *Server) State() (state string) { + state, _ = s.state.Load().(string) + return +} + +type serverStateFunc func(*Server, context.Context) (serverStateFunc, error) + +func (s *Server) Run(ctx context.Context) error { + + if s.Logger == nil { + return &InvalidParamError{What: "Logger", Actual: "nil", Expected: "non-nil"} + } + if s.PeerCmdIn == nil { + return &InvalidParamError{What: "PeerCmdIn", Actual: "nil", Expected: "non-nil"} + } + if s.PeerRespOut == nil { + return &InvalidParamError{What: "PeerRespOut", Actual: "nil", Expected: "non-nil"} + } + if s.PeerNotifOut == nil { + return &InvalidParamError{What: "PeerNotifOut", Actual: "nil", Expected: "non-nil"} + } + if s.OpenperfCmdOut == nil { + return &InvalidParamError{What: "OpenperfCmdOut", Actual: "nil", Expected: "non-nil"} + } + if s.PeerTimeout < 0 { + return &InvalidParamError{What: "PeerTimeout", Actual: fmt.Sprintf("%s", s.PeerTimeout), Expected: ">= 0"} + } + if s.PeerTimeout == 0 { + s.PeerTimeout = DefaultPeerTimeout + s.Logger.Debug(). + Dur("PeerTimeout", DefaultPeerTimeout). + Msg("using default value for server FSM parameter") + } + if s.OpenperfTimeout < 0 { + return &InvalidParamError{What: "OpenperfTimeout", Actual: fmt.Sprintf("%s", s.OpenperfTimeout), Expected: ">= 0"} + } + if s.OpenperfTimeout == 0 { + s.OpenperfTimeout = DefaultOpenperfTimeout + s.Logger.Debug(). + Dur("OpenperfTimeout", DefaultOpenperfTimeout). + Msg("using default value for server FSM parameter") + } + if s.StartDelay < 0 { + return &InvalidParamError{What: "StartDelay", Actual: fmt.Sprintf("%s", s.StartDelay), Expected: ">= 0"} + } + if s.StartDelay == 0 { + s.StartDelay = DefaultStartDelay + s.Logger.Debug(). + Dur("StartDelay", DefaultStartDelay). + Msg("using default value for server FSM parameter") + } + if s.StatsPollInterval < 0 { + return &InvalidParamError{What: "StatsPollInterval", Actual: fmt.Sprintf("%s", s.StatsPollInterval), Expected: ">= 0"} + } + if s.StatsPollInterval == 0 { + s.StatsPollInterval = DefaultStatsPollInterval + s.Logger.Debug(). + Dur("StatsPollInterval", DefaultStatsPollInterval). + Msg("using default value for server FSM parameter") + } + if s.GeneratorPollInterval < 0 { + return &InvalidParamError{What: "GeneratorPollInterval", Actual: fmt.Sprintf("%s", s.GeneratorPollInterval), Expected: ">= 0"} + } + if s.GeneratorPollInterval == 0 { + s.GeneratorPollInterval = DefaultGeneratorPollInterval + s.Logger.Debug(). + Dur("GeneratorPollInterval", DefaultGeneratorPollInterval). + Msg("using default value for server FSM parameter") + } + + defer close(s.PeerRespOut) + + var retVal error + f := (*Server).connect + for f != nil { + var err error + //Goal here is to keep the first error since any subsequent errors are probably a result of that. + //Note this is only for fatal errors. Non-fatal errors are logged and the FSM resets. + if f, err = f(s, ctx); err != nil && retVal == nil { + s.errorAfterCleanup = true + retVal = err + } + } + + return retVal +} + +func (s *Server) connect(ctx context.Context) (serverStateFunc, error) { + s.enter("connect") + + // Wait for client to connect and send a Hello message. + // Doing this inline since we have no idea when a client will connect, + // and thus can't set a timeout here. + select { + case cmd, ok := <-s.PeerCmdIn: + if !ok { + s.Logger.Error().Msg("error reading from peer command channel.") + return nil, &PeerError{What: "error reading from peer command channel."} + } + + helloMsg, ok := cmd.Value.(*msg.Hello) + if !ok { + s.Logger.Warn(). + Str("actual type", cmd.Type). + Str("expected type", "HelloType"). + Msg("got unexpected message from peer. Connection terminated.") + + s.PeerRespOut <- &msg.Message{ + Type: msg.ErrorType, + Value: fmt.Sprintf("unexpected message type. got %s, expected %s", cmd.Type, msg.HelloType), + } + + return (*Server).connect, nil + } + + if helloMsg.PeerProtocolVersion != msg.Version { + + s.Logger.Warn(). + Str("local version", msg.Version). + Str("remote version", helloMsg.PeerProtocolVersion). + Msg("peer message version mismatch. Connection terminated.") + + s.PeerRespOut <- &msg.Message{ + Type: msg.ErrorType, + Value: fmt.Sprintf("peer version mismatch. got %s, expected %s", helloMsg.PeerProtocolVersion, msg.Version), + } + + return (*Server).connect, nil + } + + case <-ctx.Done(): + // We've been canceled. This is not an error. + s.Logger.Info(). + Msg("Server FSM exiting due to caller cancellation") + + return nil, nil + } + + s.PeerRespOut <- &msg.Message{ + Type: msg.HelloType, + Value: &msg.Hello{ + PeerProtocolVersion: msg.Version, + }, + } + + return (*Server).configure, nil +} + +func (s *Server) configure(ctx context.Context) (serverStateFunc, error) { + s.enter("configure") + + cmd, cmdErr := s.waitForPeerCommand(msg.GetServerParametersType) + if cmdErr != nil { + // was the error fatal? + if cmd == nil { + s.Logger.Error(). + Err(cmdErr). + Msg("fatal server FSM error. terminating.") + + return nil, cmdErr + } + + s.Logger.Error(). + Err(cmdErr). + Str("expected command type", msg.GetServerParametersType). + Msg("error while waiting for peer command") + + // error was not fatal for the FSM, jump back to the connect state. + // but first send an error to the peer. + s.PeerRespOut <- &msg.Message{ + Type: msg.ErrorType, + Value: cmdErr.Error(), + } + + return (*Server).connect, nil + } + + s.PeerRespOut <- &msg.Message{ + Type: msg.ServerParametersType, + Value: &msg.ServerParametersResponse{ + //XXX hardcoding a value here for now. This is temporary. + OpenperfURL: "http://localhost:9000", + }, + } + + cmd, cmdErr = s.waitForPeerCommand(msg.SetConfigType) + if cmdErr != nil { + if cmd == nil { + s.Logger.Error(). + Err(cmdErr). + Msg("fatal server FSM error. terminating.") + + return nil, cmdErr + } + + s.Logger.Error(). + Err(cmdErr). + Str("expected command type", msg.SetConfigType). + Msg("error while waiting for peer command") + + s.PeerRespOut <- &msg.Message{ + Type: msg.ErrorType, + Value: cmdErr.Error(), + } + + return (*Server).connect, nil + } + + // Sanity check our inputs. The more we can validate here the less we'd have to + // clean up from Openperf if it all goes wrong. + serverCfg, ok := cmd.Value.(*msg.ServerConfiguration) + if !ok { + s.Logger.Error(). + Interface("actual type", cmd.Value). + Interface("expected type", &msg.ServerConfiguration{}). + Msg("unexpected peer message value") + + s.PeerRespOut <- &msg.Message{ + Type: msg.ErrorType, + Value: "unexpected peer message value", + } + + return (*Server).connect, nil + } + + if err := s.validateTestConfig(serverCfg); err != nil { + s.Logger.Error(). + Err(err). + Msg("error with test configuration") + + s.PeerRespOut <- &msg.Message{ + Type: msg.ErrorType, + Value: err, + } + + return (*Server).connect, nil + } + + s.TestConfiguration.UpstreamRateBps = serverCfg.UpstreamRateBps + s.TestConfiguration.DownstreamRateBps = serverCfg.DownstreamRateBps + + //XXX: local Openperf will be configured here. + + //From here on out any error must jump to the cleanup state before exiting or restarting. + + // Send ACK to the client + s.PeerRespOut <- &msg.Message{ + Type: msg.AckType, + } + + // Switch to Ready state. + return (*Server).ready, nil +} + +func (s *Server) ready(ctx context.Context) (serverStateFunc, error) { + s.enter("ready") + + cmd, cmdErr := s.waitForPeerCommand(msg.StartCommandType) + if cmdErr != nil { + if cmd == nil { + return (*Server).cleanup, cmdErr + } + + s.Logger.Error(). + Err(cmdErr). + Str("expected command type", msg.StartCommandType). + Msg("error while waiting for peer command") + + s.PeerRespOut <- &msg.Message{ + Type: msg.ErrorType, + Value: cmdErr.Error(), + } + + return (*Server).cleanup, nil + } + + startCmd, ok := cmd.Value.(*msg.StartCommand) + if !ok { + s.Logger.Error(). + Interface("actual type", cmd.Value). + Interface("expected type", &msg.StartCommand{}). + Msg("unexpected peer message value") + + s.PeerRespOut <- &msg.Message{ + Type: msg.ErrorType, + Value: "unexpected peer message value", + } + + return (*Server).cleanup, nil + } + + startTime, err := time.Parse(TimeFormatString, startCmd.StartTime) + if err != nil { + s.Logger.Error(). + Err(err). + Msg("error parsing test start time") + + s.PeerRespOut <- &msg.Message{ + Type: msg.ErrorType, + Value: err.Error(), + } + + return (*Server).cleanup, nil + } + + // Is start time in the past? + if startTime.Before(time.Now()) { + s.Logger.Error(). + Time("requested start time", startTime). + Time("now", time.Now()). + Msg("requested start time is in the past") + + s.PeerRespOut <- &msg.Message{ + Type: msg.ErrorType, + Value: "requested start time is in the past", + } + + return (*Server).cleanup, nil + } + + // How about too far in the future? + if !startTime.Before(time.Now().Add(MaximumStartTimeDelta)) { + s.Logger.Error(). + Time("requested start time", startTime). + Time("now", time.Now()). + Dur("maximum delta", MaximumStartTimeDelta). + Msg("requested start time is too far in the future") + + s.PeerRespOut <- &msg.Message{ + Type: msg.ErrorType, + Value: "requested start time is too far in the future", + } + + return (*Server).cleanup, nil + } + + s.startTime = startTime + + // Send ACK to the client + s.PeerRespOut <- &msg.Message{ + Type: msg.AckType, + } + + // switch to armed state + return (*Server).armed, nil +} + +// armed FSM state that waits for the start time to arrive before switching to running state. +func (s *Server) armed(ctx context.Context) (serverStateFunc, error) { + s.enter("armed") + + // Wait for then to become now (aka test to start.) + // Also make sure peer didn't disappear on us. +Done: + for { + select { + case <-time.After(time.Until(s.startTime)): + break Done + + // There's a chance our peer will try to tell us something while we're waiting. + case notif, ok := <-s.PeerCmdIn: + if !ok { + s.Logger.Error().Msg("error reading peer command channel") + return (*Server).cleanup, &PeerError{What: "error reading peer command."} + } + + switch notif.Type { + + case msg.PeerDisconnectLocalType, msg.PeerDisconnectRemoteType: + s.Logger.Error(). + Msg("unexpected peer disconnection") + + return (*Server).cleanup, nil + + default: + s.Logger.Error(). + Str("command type", notif.Type). + Msg("unexpected command from peer while waiting for test to start") + + return (*Server).cleanup, nil + + } + + } + } + + return (*Server).running, nil +} + +// running tracks FSM state where data stream(s) are transmitting (aka test is running.) +func (s *Server) running(ctx context.Context) (serverStateFunc, error) { + s.enter("running") + + // generatorPollResp channel to receive responses from polling Openperf generator resource. + var generatorPollResp chan interface{} + // generatorPollCancel stop generator polling. + var generatorPollCancel context.CancelFunc + + // txStatsPollResp channel to receive responses from polling local Openperf transmit stats. + var txStatsPollResp chan interface{} + // txStatsPollCancel stop polling local Openperf transmit stats. + var txStatsPollCancel context.CancelFunc + + // rxStatsPollResp channel to receive responses from polling local Openperf receive stats. + var rxStatsPollResp chan interface{} + // rxStatsPollCancel stop polling local Openperf receive stats. + var rxStatsPollCancel context.CancelFunc + + var ( + clientRunning bool + serverRunning bool + ) + + switch { + // client <--> server traffic + case s.TestConfiguration.UpstreamRateBps > 0 && s.TestConfiguration.DownstreamRateBps > 0: + // Start up generator polling (to check runstate) + generatorPollResp, generatorPollCancel = s.startOpenperfCmdRepeater(ctx, &openperf.Command{Request: &openperf.GetGeneratorRequest{}}, s.GeneratorPollInterval) + + defer generatorPollCancel() + + // Start up transmit stats polling + txStatsPollResp, txStatsPollCancel = s.startOpenperfCmdRepeater(ctx, &openperf.Command{Request: &openperf.GetTxStatsRequest{}}, s.StatsPollInterval) + + defer txStatsPollCancel() + + // Start up receive stats polling + rxStatsPollResp, rxStatsPollCancel = s.startOpenperfCmdRepeater(ctx, &openperf.Command{Request: &openperf.GetRxStatsRequest{}}, s.StatsPollInterval) + + defer rxStatsPollCancel() + + clientRunning = true + serverRunning = true + + // server -> client traffic + case s.TestConfiguration.DownstreamRateBps > 0: + // Start up generator polling (to check runstate) + generatorPollResp, generatorPollCancel = s.startOpenperfCmdRepeater(ctx, &openperf.Command{Request: &openperf.GetGeneratorRequest{}}, s.GeneratorPollInterval) + + defer generatorPollCancel() + + // Start up transmit stats polling + txStatsPollResp, txStatsPollCancel = s.startOpenperfCmdRepeater(ctx, &openperf.Command{Request: &openperf.GetTxStatsRequest{}}, s.StatsPollInterval) + + defer txStatsPollCancel() + + clientRunning = false + serverRunning = true + + // client -> server traffic + case s.TestConfiguration.UpstreamRateBps > 0: + // Start up receive stats polling + rxStatsPollResp, rxStatsPollCancel = s.startOpenperfCmdRepeater(ctx, &openperf.Command{Request: &openperf.GetRxStatsRequest{}}, s.StatsPollInterval) + + defer rxStatsPollCancel() + + clientRunning = true + serverRunning = false + } + +Done: + for { + select { + case cmd, ok := <-s.PeerCmdIn: + if !ok { + s.Logger.Error().Msg("error reading from peer command channel.") + return (*Server).cleanup, &PeerError{Err: errors.New("error reading peer notifications.")} + } + + switch cmd.Type { + case msg.TransmitDoneType: + + clientRunning = false + rxStatsPollCancel() + + if !serverRunning { + break Done + } + + case msg.PeerDisconnectLocalType, msg.PeerDisconnectRemoteType: + // Return an error here. Peer should not have disconnected. + //XXX: this will be updated when the low level peer interface is implemented. + s.Logger.Error().Msg("unexpected peer disconnection") + return (*Server).cleanup, nil + + default: + s.Logger.Error(). + Str("actual type", cmd.Type). + Str("expected type", "TransmitDoneType"). + Msg("got unexpected message from peer. Connection terminated.") + + return (*Server).cleanup, nil + } + + case txStat, ok := <-txStatsPollResp: + if !ok { + txStatsPollResp = nil + continue + } + + switch stats := txStat.(type) { + case *openperf.GetTxStatsResponse: + s.PeerNotifOut <- &msg.Message{ + Type: msg.StatsNotificationType, + Value: &msg.DataStreamStats{ + TxStats: stats, + }, + } + + case error: + s.Logger.Error(). + Err(stats). + Msg("error occurred while polling Openperf transmit stats.") + return (*Server).cleanup, nil + + default: + s.Logger.Error(). + Interface("received", txStat). + Interface("expected", &openperf.GetTxStatsResponse{}). + Msg("got an unexpected message type while polling Openperf transmit stats") + + return (*Server).cleanup, &OpenperfError{ + Actual: fmt.Sprintf("%T", txStat), + Expected: "openperf.GetTxStatsResponse", + What: "sent an openperf.GetTxStatsRequest and got an unexpected response. ", + } + } + + case rxStat, ok := <-rxStatsPollResp: + if !ok { + rxStatsPollResp = nil + continue + } + + switch stats := rxStat.(type) { + case *openperf.GetRxStatsResponse: + s.PeerNotifOut <- &msg.Message{ + Type: msg.StatsNotificationType, + Value: &msg.DataStreamStats{ + RxStats: stats, + }, + } + + case error: + s.Logger.Error(). + Err(stats). + Msg("error occurred while polling Openperf receive stats.") + return (*Server).cleanup, nil + + default: + s.Logger.Error(). + Interface("received", rxStat). + Interface("expected", &openperf.GetRxStatsResponse{}). + Msg("got an unexpected message type while polling Openperf receive stats") + + return (*Server).cleanup, &OpenperfError{ + Actual: fmt.Sprintf("%T", rxStat), + Expected: "openperf.GetRxStatsResponse", + What: "sent an openperf.GetRxStatsRequest and got an unexpected response. ", + } + + } + + case gen, ok := <-generatorPollResp: + if !ok { + generatorPollResp = nil + continue + } + + switch generator := gen.(type) { + case *openperf.GetGeneratorResponse: + + if generator.Running { + continue + } + + if !serverRunning { + // Handle case where an extra poll response comes through + // even after canceling. This is not an error. + continue + } + + //Stop local tx stats polling. + txStatsPollCancel() + + //Stop runstate polling. + generatorPollCancel() + + // Check if there's an in-flight GetTxStatsResponse waiting for us. + // Not having it is not an error. + lastStat, ok := <-txStatsPollResp + if ok { + switch stat := lastStat.(type) { + case *openperf.GetTxStatsResponse: + s.PeerNotifOut <- &msg.Message{ + Type: msg.StatsNotificationType, + Value: &msg.DataStreamStats{ + TxStats: stat, + }, + } + + case error: + s.Logger.Error(). + Err(stat). + Msg("error occurred while polling Openperf transmit stats.") + return (*Server).cleanup, nil + + default: + s.Logger.Error(). + Interface("received", lastStat). + Interface("expected", &openperf.GetTxStatsResponse{}). + Msg("got an unexpected message type while polling Openperf transmit stats") + + return (*Server).cleanup, &OpenperfError{ + Actual: fmt.Sprintf("%T", lastStat), + Expected: "openperf.GetTxStatsResponse", + What: "sent an openperf.GetTxStatsRequest and got an unexpected response. ", + } + + } + } + + // Tell client we're done transmitting. + s.PeerNotifOut <- &msg.Message{ + Type: msg.TransmitDoneType, + } + + serverRunning = false + if !clientRunning { + break Done + } + + case error: + s.Logger.Error(). + Err(generator). + Msg("error occurred while polling Openperf generator.") + return (*Server).cleanup, nil + + default: + s.Logger.Error(). + Interface("received", generator). + Interface("expected", &openperf.GetGeneratorResponse{}). + Msg("got an unexpected message type while polling Openperf generator") + + return (*Server).cleanup, &OpenperfError{ + Actual: fmt.Sprintf("%T", gen), + Expected: "openperf.GetGeneratorResponse", + What: "sent an openperf.GetGeneratorRequest and got an unexpected response. ", + } + } + } + } + + return (*Server).done, nil +} + +// done tracks the FSM state immediately after traffic has stopped. Used to collate results. +func (s *Server) done(ctx context.Context) (serverStateFunc, error) { + s.enter("done") + + finalStatsValue := &msg.FinalStats{} + + //Get Tx results + if s.TestConfiguration.DownstreamRateBps > 0 { + stats, err := s.getFinalServerTxStats(ctx) + + if err != nil { + s.Logger.Error().Err(err).Msg("error getting final transmit stats") + return (*Server).cleanup, nil + } + finalStatsValue.TxStats = stats + } + + //Get Rx results + if s.TestConfiguration.UpstreamRateBps > 0 { + stats, err := s.getFinalServerRxStats(ctx) + + if err != nil { + s.Logger.Error().Err(err).Msg("error getting final receive stats") + return (*Server).cleanup, nil + } + finalStatsValue.RxStats = stats + } + + //Wait for get final results command. + cmd, err := s.waitForPeerCommand(msg.GetFinalStatsType) + if err != nil { + if cmd == nil { + return (*Server).cleanup, err + } + + s.Logger.Error(). + Err(err). + Str("expected command type", msg.GetFinalStatsType). + Msg("error while waiting for peer command") + + s.PeerRespOut <- &msg.Message{ + Type: msg.ErrorType, + Value: err.Error(), + } + + return (*Server).cleanup, nil + } + + //Send response. + s.PeerRespOut <- &msg.Message{ + Type: msg.FinalStatsType, + Value: finalStatsValue, + } + + return (*Server).cleanup, nil +} + +func (s *Server) cleanup(ctx context.Context) (serverStateFunc, error) { + s.enter("cleanup") + + if s.TestConfiguration.DownstreamRateBps > 0 { + //Delete the generator we created. + reqDone := make(chan struct{}) + req := &openperf.Command{ + Request: &openperf.DeleteGeneratorRequest{ + Id: "Generator-one"}, //XXX: hardcoded for now as a placeholder. + Done: reqDone, + Ctx: ctx, + } + + s.OpenperfCmdOut <- req + + // Wait for request to finish one way or another. OP controller ensures this never gets stuck. + <-reqDone + + //XXX: this is used for unit testing right now. Once the Openperf interface + // is ready this is likely to change. + if req.Response != nil { + return (*Server).error, &OpenperfError{Err: req.Response.(error)} + } + + } + + if s.errorAfterCleanup { + return (*Server).error, nil + } + + return (*Server).connect, nil +} + +// error tracks state where Server FSM has to terminate due to an error. +func (s *Server) error(ctx context.Context) (serverStateFunc, error) { + s.enter("error") + + s.Logger.Error().Msg("Server FSM exiting due to error") + + return nil, nil +} + +func (s *Server) validateTestConfig(cfg *msg.ServerConfiguration) error { + + if cfg.UpstreamRateBps == 0 && cfg.DownstreamRateBps == 0 { + return errors.New("test has upstream and downstream rates of zero") + } + + return nil +} + +// waitForPeerCommand waits for either a command from our peer or a timeout. +// on successful receive of the expected type, return . +// on non-fatal error condition, returns . +// on fatal error condition, returns . +func (s *Server) waitForPeerCommand(expectedType string) (*msg.Message, error) { + + select { + case cmd, ok := <-s.PeerCmdIn: + if !ok { + s.Logger.Error(). + Str("expected command type", expectedType). + Msg("error reading from peer command channel.") + + return nil, &PeerError{What: "error reading from peer command channel."} + } + + switch cmd.Type { + case expectedType: + s.Logger.Trace(). + Str("message type", cmd.Type). + Msg("received message from peer") + + return cmd, nil + + case msg.ErrorType: + val, ok := cmd.Value.(error) + if !ok { + s.Logger.Error(). + Msg("received error command with unexpected content from peer") + + return cmd, &PeerError{} + } + s.Logger.Error(). + Str("error", val.Error()). + Msg("received error command from peer") + + return cmd, &PeerError{Err: val} + + case msg.PeerDisconnectLocalType, msg.PeerDisconnectRemoteType: + //XXX this will be updated once the lower-level peer interface is implemented. + return nil, &PeerError{} + } + + s.Logger.Error(). + Str("actual", cmd.Type). + Str("expected", expectedType). + Msg("unexpected command from peer") + + return cmd, &PeerError{ + What: "unexpected command from peer", + Actual: cmd.Type, + Expected: expectedType} + + case <-time.After(s.PeerTimeout): + s.Logger.Error(). + Str("expected command", expectedType). + Msg("timed out waiting for peer command") + + return &msg.Message{}, &TimeoutError{Operation: "waiting for a reply from peer."} + } +} + +// getFinalServerTxStats gets the final set of server Tx stats after the test has completed. +// returns results, nil on success and nil, error otherwise. +// Errors from this method receiver are not considered fatal to the FSM. +func (s *Server) getFinalServerTxStats(ctx context.Context) (*openperf.GetTxStatsResponse, error) { + + // Do we need to get TxStats? + if s.TestConfiguration.DownstreamRateBps <= 0 { + return nil, nil + } + + reqDone := make(chan struct{}) + opCtx, opCancel := context.WithTimeout(ctx, s.OpenperfTimeout) + defer opCancel() + + req := &openperf.Command{ + Request: &openperf.GetTxStatsRequest{}, + Done: reqDone, + Ctx: opCtx, + } + + s.OpenperfCmdOut <- req + + <-reqDone + + switch resp := req.Response.(type) { + case *openperf.GetTxStatsResponse: + return resp, nil + + case error: + return nil, resp + + default: + return nil, &OpenperfError{ + Actual: fmt.Sprintf("%T", req.Response), + Expected: "openperf.GetTxStatsResponse", + What: "sent an openperf.GetTxStatsRequest and got an unexpected response. ", + } + } + +} + +func (s *Server) getFinalServerRxStats(ctx context.Context) (*openperf.GetRxStatsResponse, error) { + + // Do we need to get RxStats? + if s.TestConfiguration.UpstreamRateBps <= 0 { + return nil, nil + } + + reqDone := make(chan struct{}) + opCtx, opCancel := context.WithTimeout(ctx, s.OpenperfTimeout) + defer opCancel() + + req := &openperf.Command{ + Request: &openperf.GetRxStatsRequest{}, + Done: reqDone, + Ctx: opCtx, + } + + s.OpenperfCmdOut <- req + + <-reqDone + + switch resp := req.Response.(type) { + case *openperf.GetRxStatsResponse: + return resp, nil + + case error: + return nil, resp + + default: + return nil, &OpenperfError{ + Actual: fmt.Sprintf("%T", req.Response), + Expected: "openperf.GetRxStatsResponse", + What: "sent an openperf.GetRxStatsRequest and got an unexpected response. ", + } + } +} + +func (s *Server) startOpenperfCmdRepeater(ctx context.Context, cmd *openperf.Command, interval time.Duration) (responses chan interface{}, cancelFn context.CancelFunc) { + var requestCtx context.Context + requestCtx, cancelFn = context.WithCancel(ctx) + responses = make(chan interface{}) + repeater := &openperf.CommandRepeater{ + Command: cmd, + Interval: interval, + OpenperfCmdOut: s.OpenperfCmdOut, + Responses: responses} + + go repeater.Run(requestCtx) + + return + +} diff --git a/targets/spiperf/fsm/server_test.go b/targets/spiperf/fsm/server_test.go new file mode 100644 index 000000000..8dad11cfa --- /dev/null +++ b/targets/spiperf/fsm/server_test.go @@ -0,0 +1,1201 @@ +package fsm_test + +import ( + "context" + "errors" + "os" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/rs/zerolog" + + . "github.com/Spirent/openperf/targets/spiperf/fsm" + "github.com/Spirent/openperf/targets/spiperf/msg" + "github.com/Spirent/openperf/targets/spiperf/openperf" +) + +var _ = Describe("Server FSM,", func() { + //Test-wide constants + const assertEpsilon = time.Second * 15 + + var ( + peerCmdIn chan *msg.Message + peerRespOut chan *msg.Message + peerNotifOut chan *msg.Message + opCmdOut chan *openperf.Command + fsmReturn chan error + fsm *Server + ctx context.Context + cancelFunc context.CancelFunc + logger zerolog.Logger + ) + + BeforeEach(func() { + peerCmdIn = make(chan *msg.Message, 1) + peerRespOut = make(chan *msg.Message, 1) + peerNotifOut = make(chan *msg.Message, 1) + opCmdOut = make(chan *openperf.Command) + fsmReturn = make(chan error) + // For more verbose output FatalLevel -> TraceLevel or InfoLevel + logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr}). + With().Logger().Level(zerolog.FatalLevel) + + fsm = &Server{ + Logger: &logger, + PeerCmdIn: peerCmdIn, + PeerRespOut: peerRespOut, + PeerNotifOut: peerNotifOut, + OpenperfCmdOut: opCmdOut, + PeerTimeout: 500 * time.Millisecond, + OpenperfTimeout: 500 * time.Millisecond, + StartDelay: 1 * time.Second, + StatsPollInterval: 500 * time.Millisecond, + GeneratorPollInterval: 500 * time.Millisecond, + } + + ctx, cancelFunc = context.WithCancel(context.Background()) + + go func() { + fsmReturn <- fsm.Run(ctx) + }() + + }) + + Context("starts in the connect state, when peer sends a different version, ", func() { + It("returns an error to peer, switches to connect state", func(done Done) { + peerCmdIn <- &msg.Message{ + Type: msg.HelloType, + Value: &msg.Hello{ + PeerProtocolVersion: "9.90", + }, + } + + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ErrorType)) + Expect(resp.Value).ToNot(BeNil()) + Expect(fsm.State()).To(Equal("connect")) + + close(done) + }) + + }) + + Context("starts in the connect state, when FSM is canceled via context, ", func() { + It("terminates without error", func(done Done) { + // Wait for FSM to start up. + Eventually(func() string { return fsm.State() }).Should(Equal("connect")) + + cancelFunc() + + ret := <-fsmReturn + Expect(ret).To(BeNil()) + Expect(fsm.State()).To(Equal("connect")) + + close(done) + }) + }) + + Context("starts in the connect state, when peer sends an unexpected message type, ", func() { + It("returns an error to peer, switches to connect state", func(done Done) { + peerCmdIn <- &msg.Message{ + Type: msg.HelloType, + Value: &msg.FinalStats{}, + } + + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ErrorType)) + Expect(resp.Value).ToNot(BeNil()) + Expect(fsm.State()).To(Equal("connect")) + + close(done) + }) + }) + + Context("starts in the connect state, when peer command channel returns an error, ", func() { + It("terminates with an error", func(done Done) { + // Wait for FSM to start up. + Eventually(func() string { return fsm.State() }).Should(Equal("connect")) + + close(peerCmdIn) + + Eventually(fsmReturn).Should(Receive(BeAssignableToTypeOf(&PeerError{}))) + + close(done) + }) + + }) + + Context("starts in the connect state, peer receives hello message, peer sends hello reply, it transitions to the configure state, ", func() { + BeforeEach(func(done Done) { + peerCmdIn <- &msg.Message{ + Type: msg.HelloType, + Value: &msg.Hello{ + PeerProtocolVersion: msg.Version, + }, + } + + resp := <-peerRespOut + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.HelloType)) + Expect(resp.Value).ToNot(BeNil()) + Expect(resp.Value).To(BeAssignableToTypeOf(&msg.Hello{})) + Expect(resp.Value.(*msg.Hello).PeerProtocolVersion).To(Equal(msg.Version)) + + Eventually(func() string { return fsm.State() }).Should(Equal("configure")) + + close(done) + }) + + Context("when client sends a command that's not GetConfigType, ", func() { + It("returns to the connect state", func(done Done) { + peerCmdIn <- &msg.Message{ + Type: msg.HelloType, + Value: &msg.FinalStats{}, + } + + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ErrorType)) + Expect(resp.Value).ToNot(BeNil()) + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2).Should(Equal("connect")) + + close(done) + }) + + }) + + Context("when client does not send get param message, ", func() { + It("returns to the connect state", func(done Done) { + + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ErrorType)) + Expect(resp.Value).ToNot(BeNil()) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2).Should(Equal("connect")) + + close(done) + }) + }) + + Context("client requests server's parameters, server returns parameters, ", func() { + BeforeEach(func(done Done) { + peerCmdIn <- &msg.Message{ + Type: msg.GetServerParametersType, + } + + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ServerParametersType)) + Expect(resp.Value).ToNot(BeNil()) + Expect(resp.Value).To(BeAssignableToTypeOf(&msg.ServerParametersResponse{})) + + close(done) + }, assertEpsilon.Seconds()) + + Context("when client does not send set config message, ", func() { + It("returns to connect state", func(done Done) { + + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ErrorType)) + Expect(resp.Value).ToNot(BeNil()) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2).Should(Equal("connect")) + + close(done) + + }, assertEpsilon.Seconds()) + + }) + + Context("when client sends a command that's not SetConfigType, ", func() { + It("returns to the connect state", func(done Done) { + peerCmdIn <- &msg.Message{ + Type: msg.HelloType, + Value: &msg.FinalStats{}, + } + + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ErrorType)) + Expect(resp.Value).ToNot(BeNil()) + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2).Should(Equal("connect")) + + close(done) + }) + + }) + + // Note: this context exercises the server FSM code that sanity checks + // values before sending them to Openperf. + Context("when client sends invalid configuration, ", func() { + It("returns to the connect state", func(done Done) { + peerCmdIn <- &msg.Message{ + Type: msg.SetConfigType, + Value: &msg.ServerConfiguration{ + DownstreamRateBps: 0, + UpstreamRateBps: 0}, + } + + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ErrorType)) + Expect(resp.Value).ToNot(BeNil()) + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2).Should(Equal("connect")) + + close(done) + }) + }) + + // At this point the test diverges along three paths based on test traffic flow: + // * server -> client + // * client -> server + // * server <-> client + // As an optimization the common error paths are only exercised on the + // server -> client path. + // This needs to be done prior to returning the server parameters since + // the client builds the configuration immediately after getting this response. + + Context("client sends configuration, test has server -> client traffic, ", func() { + BeforeEach(func(done Done) { + + peerCmdIn <- &msg.Message{ + Type: msg.SetConfigType, + Value: &msg.ServerConfiguration{ + DownstreamRateBps: 100, + UpstreamRateBps: 0}, + } + + close(done) + }) + + XContext("server configures Openperf, when an error occurs, ", func() { + It("returns to the connect state", func(done Done) { + //XXX: implement this once the server OP configuration commands get + // implemented. + + close(done) + }) + + }) + + Context("server configures Openperf, server ACKs configuration from client, it transitions to ready state, ", func() { + BeforeEach(func(done Done) { + + Done: + for { + select { + case opCmd := <-opCmdOut: + close(opCmd.Done) + + case resp := <-peerRespOut: + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.AckType)) + + break Done + } + } + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2).Should(Equal("ready")) + + close(done) + }) + + Context("when client does not send the start command, ", func() { + It("returns to the connect state", func(done Done) { + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ErrorType)) + Expect(resp.Value).ToNot(BeNil()) + + drainServerOpenperfCommands(opCmdOut, fsm) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + }, assertEpsilon.Seconds()) + }) + + Context("when there's an error instead of start command, ", func() { + It("exits with an error", func(done Done) { + + close(peerCmdIn) + + drainServerOpenperfCommands(opCmdOut, fsm) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("error"), "timed out waiting for FSM to switch to error state") + + Eventually(fsmReturn).Should(Receive(BeAssignableToTypeOf(&PeerError{}))) + + close(done) + }, assertEpsilon.Seconds()) + + }) + + Context("when the start command has an invalid time format, ", func() { + It("returns to the connect state", func(done Done) { + peerCmdIn <- &msg.Message{ + Type: msg.StartCommandType, + Value: &msg.StartCommand{ + StartTime: "hello world"}, + } + + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ErrorType)) + Expect(resp.Value).ToNot(BeNil()) + + drainServerOpenperfCommands(opCmdOut, fsm) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + }) + + }) + + Context("when the start command has a time that's passed, ", func() { + It("returns to the connect state, ", func(done Done) { + + peerCmdIn <- &msg.Message{ + Type: msg.StartCommandType, + Value: &msg.StartCommand{ + StartTime: time.Now().Add(time.Hour * -1).Format(TimeFormatString)}, + } + + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ErrorType)) + Expect(resp.Value).ToNot(BeNil()) + + drainServerOpenperfCommands(opCmdOut, fsm) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + }) + + }) + + Context("when the start command has a time that's too far in the future, ", func() { + It("returns to the connect state, ", func(done Done) { + + peerCmdIn <- &msg.Message{ + Type: msg.StartCommandType, + Value: &msg.StartCommand{ + StartTime: time.Now().Add(time.Hour * 1).Format(TimeFormatString)}, + } + + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ErrorType)) + Expect(resp.Value).ToNot(BeNil()) + + drainServerOpenperfCommands(opCmdOut, fsm) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + }) + + }) + + Context("client sends start command, it transitions to armed state, ", func() { + BeforeEach(func(done Done) { + + peerCmdIn <- &msg.Message{ + Type: msg.StartCommandType, + Value: &msg.StartCommand{ + StartTime: time.Now().Add(time.Second * 2).Format(TimeFormatString)}, + } + + resp := <-peerRespOut + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.AckType)) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("armed"), "timed out waiting for FSM to switch to armed state") + + close(done) + }) + + Context("when peer disconnects during armed state, ", func() { + It("exits with an error", func(done Done) { + + peerCmdIn <- &msg.Message{ + Type: msg.PeerDisconnectRemoteType, + Value: &msg.PeerDisconnectRemoteNotif{Err: "aborting test."}, + } + + drainServerOpenperfCommands(opCmdOut, fsm) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + }, assertEpsilon.Seconds()) + + }) + + Context(" when local peer interface errors during armed state, ", func() { + It("exits with an error", func(done Done) { + + peerCmdIn <- &msg.Message{ + Type: msg.PeerDisconnectLocalType, + Value: &msg.PeerDisconnectLocalNotif{ + Err: errors.New("unexpected local peer communcation error"), + }, + } + + drainServerOpenperfCommands(opCmdOut, fsm) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + }, assertEpsilon.Seconds()) + + }) + + // Remember, server -> client traffic here. + Context("sleeps until start time, it transitions to running state, server is transmitting, ", func() { + + // At this point the test is running and we need to respond to + // async requests from the server. Prior to this the Openperf requests + // were done synchronously and the async responder wasn't necessary. + // Also, the async responder will "drain" any requests that aren't poll + // requests. Calling drainOpenperfCommands() is not needed and would + // introduce timing issues. + var ( + runstateStatsResponder *openperfResponder + txStatsResponderError chan struct{} + runstateResponderError chan struct{} + genDoneResponderError chan struct{} + runstateStatsResponderCancel context.CancelFunc + ) + const runstatePollCount = 3 + + BeforeEach(func(done Done) { + + txStatsResponderError = make(chan struct{}) + runstateResponderError = make(chan struct{}) + genDoneResponderError = make(chan struct{}) + runstateStatsResponder = &openperfResponder{ + OPCmdIn: opCmdOut, + ErrorRunstate: runstateResponderError, + ErrorTxStats: txStatsResponderError, + RunstatePollCount: runstatePollCount, + ErrorDeleteGen: genDoneResponderError, + } + var responderCtx context.Context + responderCtx, runstateStatsResponderCancel = context.WithCancel(context.Background()) + go func() { + defer GinkgoRecover() + handleOpenperfResponses(responderCtx, runstateStatsResponder) + }() + + // Server switches to running state here. + + Eventually(func() string { + return fsm.State() + }, time.Second*5).Should(Equal("running")) + + close(done) + }, assertEpsilon.Seconds()) + + Context("when client disconnects while test is running, ", func() { + It("exits with an error", func(done Done) { + peerCmdIn <- &msg.Message{ + Type: msg.PeerDisconnectRemoteType, + Value: &msg.PeerDisconnectRemoteNotif{ + Err: "aborting test.", + }, + } + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + + }, assertEpsilon.Seconds()) + + }) + + Context("when openperf returns an error while polling generator, ", func() { + It("exits with an error", func(done Done) { + runstateResponderError <- struct{}{} + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + + }, assertEpsilon.Seconds()) + + }) + + Context("when openperf returns an error while polling Tx stats, ", func() { + It("exits with an error", func(done Done) { + txStatsResponderError <- struct{}{} + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + + }, assertEpsilon.Seconds()) + + }) + + Context("server sends notifications, test completes, it switches to the done state, ", func() { + BeforeEach(func(done Done) { + + Done: + for { + notif := <-peerNotifOut + Expect(notif).ToNot(BeNil()) + + switch notif.Type { + case msg.StatsNotificationType: + //noop + + case msg.TransmitDoneType: + break Done + + } + + } + + Eventually(func() string { + return fsm.State() + }).Should(Equal("done"), "timed out waiting for test to complete") + + close(done) + }, assertEpsilon.Seconds()) + + Context("when client does not send get final stats message, ", func() { + It("returns to the connect state", func(done Done) { + resp := <-peerRespOut + + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.ErrorType)) + Expect(resp.Value).ToNot(BeNil()) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + }) + + }) + + Context("client requests final stats, server sends final stats, it switches to the cleanup state, when Openperf errors with cleanup, ", func() { + It("exits with an error", func(done Done) { + + genDoneResponderError <- struct{}{} + + peerCmdIn <- &msg.Message{ + Type: msg.GetFinalStatsType, + } + + finalStats := <-peerRespOut + + Expect(finalStats).ToNot(BeNil()) + Expect(finalStats.Type).To(Equal(msg.FinalStatsType)) + Expect(finalStats.Value).To(BeAssignableToTypeOf(&msg.FinalStats{})) + + Eventually(fsmReturn).Should(Receive( + BeAssignableToTypeOf(&OpenperfError{}))) + + Expect(fsm.State()).To(Equal("error")) + + close(done) + + }) + }) + + Context("server sends final stats, it switches to the cleanup state, when it cleans up, ", func() { + It("returns without error.", func(done Done) { + + peerCmdIn <- &msg.Message{ + Type: msg.GetFinalStatsType, + } + + finalStats := <-peerRespOut + + Expect(finalStats).ToNot(BeNil()) + Expect(finalStats.Type).To(Equal(msg.FinalStatsType)) + Expect(finalStats.Value).To(BeAssignableToTypeOf(&msg.FinalStats{})) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + }, assertEpsilon.Seconds()) + }) + }) + + AfterEach(func() { + runstateStatsResponderCancel() + }) + + }) + }) + }) + }) + + Context("server returns valid parameters, test has client -> server traffic, ", func() { + BeforeEach(func(done Done) { + + peerCmdIn <- &msg.Message{ + Type: msg.SetConfigType, + Value: &msg.ServerConfiguration{ + DownstreamRateBps: 0, + UpstreamRateBps: 100}, + } + + close(done) + }) + + Context("server configures Openperf, server ACKs configuration from client, it transitions to ready state, ", func() { + BeforeEach(func(done Done) { + + Done: + for { + select { + case opCmd := <-opCmdOut: + close(opCmd.Done) + + case resp := <-peerRespOut: + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.AckType)) + + break Done + } + } + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2).Should(Equal("ready")) + + close(done) + }) + + Context("client sends start command, it transitions to armed state, ", func() { + BeforeEach(func(done Done) { + + peerCmdIn <- &msg.Message{ + Type: msg.StartCommandType, + Value: &msg.StartCommand{ + StartTime: time.Now().Add(time.Second * 2).Format(TimeFormatString)}, + } + + resp := <-peerRespOut + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.AckType)) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("armed"), "timed out waiting for FSM to switch to armed state") + + close(done) + }) + + // Remember, client -> server traffic here. + Context("sleeps until start time, it transitions to running state, client is transmitting, ", func() { + var runstateStatsResponder *openperfResponder + var rxStatsResponderError chan struct{} + const runstatePollCount = 3 + var runstateStatsResponderCancel context.CancelFunc + //server starts up a runstate poll command here. + BeforeEach(func(done Done) { + + rxStatsResponderError = make(chan struct{}) + runstateStatsResponder = &openperfResponder{ + OPCmdIn: opCmdOut, + ErrorRxStats: rxStatsResponderError, + RunstatePollCount: runstatePollCount, + } + var responderCtx context.Context + responderCtx, runstateStatsResponderCancel = context.WithCancel(context.Background()) + go func() { + defer GinkgoRecover() + handleOpenperfResponses(responderCtx, runstateStatsResponder) + }() + // Server switches to running state here. + + Eventually(func() string { return fsm.State() }, fsm.PeerTimeout.Seconds()*8).Should(Equal("running"), "timed out waiting for FSM to switch to running state") + + close(done) + }, assertEpsilon.Seconds()) + + Context("when openperf returns an error while polling Rx stats, ", func() { + It("exits with an error", func(done Done) { + rxStatsResponderError <- struct{}{} + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + + }, assertEpsilon.Seconds()) + + }) + + Context("server sends notifications, test completes, it switches to the done state, ", func() { + BeforeEach(func(done Done) { + + peerNotifCount, expectedNotifCount := 0, 3 + + Done: + for { + notif := <-peerNotifOut + Expect(notif).ToNot(BeNil()) + Expect(notif.Type).To(Equal(msg.StatsNotificationType)) + Expect(notif.Value).ToNot(BeNil()) + Expect(notif.Value).To(BeAssignableToTypeOf(&msg.DataStreamStats{})) + + peerNotifCount++ + + if peerNotifCount == expectedNotifCount { + peerCmdIn <- &msg.Message{ + Type: msg.TransmitDoneType} + + break Done + } + + } + + Eventually(func() string { + return fsm.State() + }).Should(Equal("done"), "timed out waiting for FSM to transition to done state") + + close(done) + }, assertEpsilon.Seconds()) + + Context("server sends final stats, ", func() { + It("switches to the cleanup state", func(done Done) { + peerCmdIn <- &msg.Message{ + Type: msg.GetFinalStatsType, + } + + finalStats := <-peerRespOut + + Expect(finalStats).ToNot(BeNil()) + Expect(finalStats.Type).To(Equal(msg.FinalStatsType)) + Expect(finalStats.Value).To(BeAssignableToTypeOf(&msg.FinalStats{})) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + close(done) + }, assertEpsilon.Seconds()) + + }) + }) + + AfterEach(func() { + runstateStatsResponderCancel() + }) + }) + }) + }) + }) + + Context("server returns valid parameters, test has server <-> client traffic, ", func() { + BeforeEach(func(done Done) { + + peerCmdIn <- &msg.Message{ + Type: msg.SetConfigType, + Value: &msg.ServerConfiguration{ + DownstreamRateBps: 100, + UpstreamRateBps: 100}, + } + + close(done) + }) + Context("server configures Openperf, server ACKs configuration from client, it transitions to ready state, ", func() { + + BeforeEach(func(done Done) { + Done: + for { + select { + case opCmd := <-opCmdOut: + close(opCmd.Done) + + case resp := <-peerRespOut: + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.AckType)) + + break Done + } + } + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2).Should(Equal("ready")) + + close(done) + }) + + Context("client sends start command, it transitions to armed state, ", func() { + BeforeEach(func(done Done) { + + peerCmdIn <- &msg.Message{ + Type: msg.StartCommandType, + Value: &msg.StartCommand{ + StartTime: time.Now().Add(time.Second * 2).Format(TimeFormatString)}, + } + + resp := <-peerRespOut + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.AckType)) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("armed"), "timed out waiting for FSM to switch to armed state") + + close(done) + }) + + // Remember, client <-> server traffic here. + Context("sleeps until start time, it transitions to running state, server and client are transmitting, ", func() { + var runstateStatsResponder *openperfResponder + var setRunstatePollCount chan int + const runstatePollCount = 30 //This will be reduced by the channel above. + var runstateStatsResponderCancel context.CancelFunc + //client starts up a runstate poll command here. + BeforeEach(func(done Done) { + + setRunstatePollCount = make(chan int) + runstateStatsResponder = &openperfResponder{ + OPCmdIn: opCmdOut, + RunstatePollCount: runstatePollCount, + NewRunstatePollCount: setRunstatePollCount, + } + var responderCtx context.Context + responderCtx, runstateStatsResponderCancel = context.WithCancel(context.Background()) + go func() { + defer GinkgoRecover() + handleOpenperfResponses(responderCtx, runstateStatsResponder) + }() + // Client switches to running state here. + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*8). + Should(Equal("running"), "timed out waiting for FSM to switch to running state") + + close(done) + }, assertEpsilon.Seconds()) + + // At this point either server or client will stop first. + // Both are tested below for completeness + + // client stops first + Context("client sends notifications, client stops transmitting, ", func() { + BeforeEach(func(done Done) { + + peerNotifCount, expectedNotifCount := 0, 3 + + Done: + for { + notif := <-peerNotifOut + Expect(notif).ToNot(BeNil()) + Expect(notif.Type).To(Equal(msg.StatsNotificationType)) + Expect(notif.Value).ToNot(BeNil()) + Expect(notif.Value).To(BeAssignableToTypeOf(&msg.DataStreamStats{})) + + peerNotifCount++ + + if peerNotifCount == expectedNotifCount { + peerCmdIn <- &msg.Message{ + Type: msg.TransmitDoneType} + + break Done + } + + } + + Expect(fsm.State()).To(Equal("running")) + + close(done) + }, assertEpsilon.Seconds()) + + Context("server continues to run, server stops, ", func() { + BeforeEach(func(done Done) { + + // Let test run a bit more. + timerChan := time.After(1 * time.Second) + Done: + for { + select { + case notif := <-peerNotifOut: + Expect(notif).ToNot(BeNil()) + + switch notif.Type { + case msg.StatsNotificationType: + + case msg.TransmitDoneType: + break Done + + } + case <-timerChan: + setRunstatePollCount <- int(0) + } + } + + Eventually(func() string { + return fsm.State() + }).Should(Equal("done")) + + close(done) + }, assertEpsilon.Seconds()) + + Context("server sends final stats, ", func() { + It("switches to the cleanup state", func(done Done) { + + peerCmdIn <- &msg.Message{ + Type: msg.GetFinalStatsType, + } + + finalStats := <-peerRespOut + + Expect(finalStats).ToNot(BeNil()) + Expect(finalStats.Type).To(Equal(msg.FinalStatsType)) + Expect(finalStats.Value).To(BeAssignableToTypeOf(&msg.FinalStats{})) + + Eventually(func() string { + return fsm.State() + }, fsm.PeerTimeout.Seconds()*2). + Should(Equal("connect"), "timed out waiting for FSM to return to connect state") + + runstateStatsResponderCancel() + + close(done) + + }, assertEpsilon.Seconds()) + + }) + + }) + }) + + // server stops first. + Context("server sends notifications, server stops transmitting, ", func() { + BeforeEach(func(done Done) { + peerNotifCount, expectedNotifCount := 0, 3 + + Done: + for { + notif := <-peerNotifOut + Expect(notif).ToNot(BeNil()) + Expect(notif.Type).To(Equal(msg.StatsNotificationType)) + Expect(notif.Value).ToNot(BeNil()) + Expect(notif.Value).To(BeAssignableToTypeOf(&msg.DataStreamStats{})) + + peerNotifCount++ + + if peerNotifCount == expectedNotifCount { + setRunstatePollCount <- int(0) + + break Done + } + + } + + setRunstatePollCount <- int(0) + + DoneAgain: + for { + notif := <-peerNotifOut + Expect(notif).NotTo(BeNil()) + + switch notif.Type { + case msg.StatsNotificationType: + //noop + + case msg.TransmitDoneType: + break DoneAgain + + } + } + + Expect(fsm.State()).To(Equal("running")) + + close(done) + }, assertEpsilon.Seconds()) + + Context("client continues to run, client stops, ", func() { + BeforeEach(func(done Done) { + + // Run for a bit more. + timerChan := time.After(1 * time.Second) + Done: + for { + select { + case notif := <-peerNotifOut: + Expect(notif).ToNot(BeNil()) + Expect(notif.Type).To(Equal(msg.StatsNotificationType)) + + case <-timerChan: + peerCmdIn <- &msg.Message{ + Type: msg.TransmitDoneType} + + break Done + } + } + + close(done) + }, assertEpsilon.Seconds()) + + Context("client requests final stats, server sends final stats, ", func() { + It("switches to the cleanup state", func(done Done) { + + peerCmdIn <- &msg.Message{Type: msg.GetFinalStatsType} + + Done: + for { + select { + case notif := <-peerNotifOut: + Expect(notif).ToNot(BeNil()) + Expect(notif.Type).To(Equal(msg.StatsNotificationType)) + + case resp := <-peerRespOut: + Expect(resp).ToNot(BeNil()) + Expect(resp.Type).To(Equal(msg.FinalStatsType)) + Expect(resp.Value).To(BeAssignableToTypeOf(&msg.FinalStats{})) + + break Done + } + } + + Eventually(func() string { + return fsm.State() + }).Should(Equal("connect")) + + close(done) + + }, assertEpsilon.Seconds()) + + }) + }) + }) + + AfterEach(func(done Done) { + runstateStatsResponderCancel() + close(done) + }) + }) + }) + }) + }) + }) + }) + + AfterEach(func() { + cancelFunc() + }) +}) + +// Unit test utilities + +func drainServerOpenperfCommands(opCmd chan *openperf.Command, fsm *Server) { + + //Rather than having a static time window for OP commands, or waiting for a specific cleanup + // message, poll the FSM state to see if we've gotten through the cleanup state. + //Server behaves slightly differently in that it will not terminate in the cleanup state. + fsmDone := make(chan struct{}) + go func() { + for { + state := fsm.State() + + switch state { + case "connect": + close(fsmDone) + return + case "error": + close(fsmDone) + return + default: + + } + + time.Sleep(10 * time.Millisecond) + } + + }() + + for { + select { + case <-fsmDone: + return + case cmd := <-opCmd: + close(cmd.Done) + } + } +} diff --git a/targets/spiperf/msg/spiperf.go b/targets/spiperf/msg/spiperf.go index 1009de08d..ab8bc4d7e 100644 --- a/targets/spiperf/msg/spiperf.go +++ b/targets/spiperf/msg/spiperf.go @@ -62,8 +62,10 @@ type DataStreamStats struct { // ServerConfiguration sends the server's view of the test configuration to the server. type ServerConfiguration struct { - TransmitDuration uint `json:"transmit_duration"` - FixedFrameSize uint `json:"fixed_frame_size"` + TransmitDuration uint `json:"transmit_duration"` + FixedFrameSize uint `json:"fixed_frame_size"` + UpstreamRateBps uint64 `json:"upstream_rate_bps"` + DownstreamRateBps uint64 `json:"downstream_rate_bps"` } // ServerParametersResponse conveys the server's parameters to the client.