Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@ env:
GOTOOLCHAIN: local

jobs:
lint:
name: Golang-CI Lint
timeout-minutes: 10
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Install Go
uses: actions/setup-go@v6
with:
go-version: stable
- name: Install golangci-lint
uses: golangci/golangci-lint-action@v9
with:
args: --verbose --show-stats
test:
name: Unit test
timeout-minutes: 10
Expand Down
65 changes: 65 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
version: "2"

linters:
enable:
- bodyclose
- canonicalheader
- errcheck
- errorlint
- forcetypeassert
- gocritic
- godoclint
- nakedret
- perfsprint
- staticcheck
- thelper
- unconvert
- unparam
- whitespace
exclusions:
generated: disable
presets:
- comments
- std-error-handling
rules:
- path: spdy/
linters:
- nakedret
- nonamedreturns
- path: connection.go
linters:
- unparam
text: "result 0 \\(error\\) is always nil" # TODO(thaJeztah): check if errors should be returned
settings:
revive:
rules:
# https://github.com/mgechev/revive/blob/HEAD/RULES_DESCRIPTIONS.md#var-naming
- name: var-naming
severity: warning
disabled: false
exclude: [""]
arguments:
- ["Id"] # AllowList
- [] # DenyList
- - skip-initialism-name-checks: true
upper-case-const: true
staticcheck:
# Enable all options, with some exceptions.
# For defaults, see https://golangci-lint.run/usage/linters/#staticcheck
checks:
- all
- -QF1008 # Omit embedded fields from selector expression; https://staticcheck.dev/docs/checks/#QF1008
- -ST1003 # Poorly chosen identifier; https://staticcheck.dev/docs/checks/#ST1003

formatters:
enable:
- gofumpt
exclusions:
generated: disable

issues:
# Maximum issues count per one linter. Set to 0 to disable. Default is 50.
max-issues-per-linter: 0

# Maximum count of issues with the same text. Set to 0 to disable. Default is 3.
max-same-issues: 0
63 changes: 32 additions & 31 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
)

var (
ErrInvalidStreamId = errors.New("Invalid stream id")
ErrTimeout = errors.New("Timeout occurred")
ErrReset = errors.New("Stream reset")
ErrWriteClosedStream = errors.New("Write on closed stream")
ErrInvalidStreamId = errors.New("invalid stream id") // ErrInvalidStreamId is returned when an operation refers to a stream with an invalid or unknown stream ID.
ErrTimeout = errors.New("timeout occurred") // ErrTimeout is returned when a stream or connection operation exceeds its configured timeout.
ErrReset = errors.New("stream reset") // ErrReset is returned when a stream is reset by the peer or by the local endpoint.
ErrWriteClosedStream = errors.New("write on closed stream") // ErrWriteClosedStream is returned when attempting to write to a stream that has already been closed.
)

const (
Expand Down Expand Up @@ -101,9 +101,9 @@ Loop:
i.conn.streamCond.L.Unlock()
go func() {
for _, stream := range streams {
stream.resetStream()
_ = stream.resetStream()
}
i.conn.Close()
_ = i.conn.Close()
}()
case <-i.conn.closeChan:
if timer != nil {
Expand Down Expand Up @@ -277,9 +277,9 @@ func (s *Connection) Ping() (time.Duration, error) {
pid := s.pingId
s.pingLock.Lock()
if s.pingId > 0x7ffffffe {
s.pingId = s.pingId - 0x7ffffffe
s.pingId -= 0x7ffffffe
} else {
s.pingId = s.pingId + 2
s.pingId += 2
}
pingChan := make(chan error)
s.pingChans[pid] = pingChan
Expand Down Expand Up @@ -309,14 +309,14 @@ func (s *Connection) Ping() (time.Duration, error) {
}

// Serve handles frames sent from the server, including reply frames
// which are needed to fully initiate connections. Both clients and servers
// which are needed to fully initiate connections. Both clients and servers
// should call Serve in a separate goroutine before creating streams.
func (s *Connection) Serve(newHandler StreamHandler) {
// use a WaitGroup to wait for all frames to be drained after receiving
// go-away.
var wg sync.WaitGroup

// Parition queues to ensure stream frames are handled
// Partition queues to ensure stream frames are handled
// by the same worker, ensuring order is maintained
frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS)
for i := 0; i < FRAME_WORKERS; i++ {
Expand Down Expand Up @@ -345,7 +345,7 @@ Loop:
for {
readFrame, err := s.framer.ReadFrame()
if err != nil {
if err != io.EOF {
if !errors.Is(err, io.EOF) {
debugMessage("frame read error: %s", err)
} else {
debugMessage("(%p) EOF received", s)
Expand Down Expand Up @@ -399,7 +399,7 @@ Loop:
wg.Wait()

if goAwayFrame != nil {
s.handleGoAwayFrame(goAwayFrame)
_ = s.handleGoAwayFrame(goAwayFrame)
}

// now it's safe to close remote channels and empty s.streams
Expand Down Expand Up @@ -506,7 +506,7 @@ func (s *Connection) checkStreamFrame(frame *spdy.SynStreamFrame) bool {
func (s *Connection) handleStreamFrame(frame *spdy.SynStreamFrame, newHandler StreamHandler) error {
stream, ok := s.getStream(frame.StreamId)
if !ok {
return fmt.Errorf("Missing stream: %d", frame.StreamId)
return fmt.Errorf("missing stream: %d", frame.StreamId)
}

newHandler(stream)
Expand Down Expand Up @@ -666,9 +666,9 @@ func (s *Connection) remoteStreamFinish(stream *Stream) {
}

// CreateStream creates a new spdy stream using the parameters for
// creating the stream frame. The stream frame will be sent upon
// creating the stream frame. The stream frame will be sent upon
// calling this function, however this function does not wait for
// the reply frame. If waiting for the reply is desired, use
// the reply frame. If waiting for the reply is desired, use
// the stream Wait or WaitTimeout function on the stream returned
// by this function.
func (s *Connection) CreateStream(headers http.Header, parent *Stream, fin bool) (*Stream, error) {
Expand All @@ -679,7 +679,7 @@ func (s *Connection) CreateStream(headers http.Header, parent *Stream, fin bool)

streamId := s.getNextStreamId()
if streamId == 0 {
return nil, fmt.Errorf("Unable to get new stream id")
return nil, errors.New("unable to get new stream id")
}

stream := &Stream{
Expand Down Expand Up @@ -762,7 +762,8 @@ func (s *Connection) shutdown(closeTimeout time.Duration) {
close(s.shutdownChan)
}

// Closes spdy connection by sending GoAway frame and initiating shutdown
// Close closes the spdy connection by sending a [spdy.GoAwayFrame] frame
// with status [spdy.GoAwayOK] and initiating shutdown.
func (s *Connection) Close() error {
s.receiveIdLock.Lock()
if s.goneAway {
Expand Down Expand Up @@ -792,7 +793,7 @@ func (s *Connection) Close() error {
}

// CloseWait closes the connection and waits for shutdown
// to finish. Note the underlying network Connection
// to finish. Note the underlying network Connection
// is not closed until the end of shutdown.
func (s *Connection) CloseWait() error {
closeErr := s.Close()
Expand All @@ -807,11 +808,11 @@ func (s *Connection) CloseWait() error {
}

// Wait waits for the connection to finish shutdown or for
// the wait timeout duration to expire. This needs to be
// called either after Close has been called or the GOAWAYFRAME
// has been received. If the wait timeout is 0, this function
// will block until shutdown finishes. If wait is never called
// and a shutdown error occurs, that error will be logged as an
// the wait timeout duration to expire. This needs to be
// called either after Close has been called or the GOAWAY frame
// has been received. If the wait timeout is 0, this function
// blocks until shutdown finishes. If wait is never called
// and a shutdown error occurs, that error is logged as an
// unhandled error.
func (s *Connection) Wait(waitTimeout time.Duration) error {
var timeout <-chan time.Time
Expand All @@ -833,9 +834,9 @@ func (s *Connection) Wait(waitTimeout time.Duration) error {
}

// NotifyClose registers a channel to be called when the remote
// peer inidicates connection closure. The last stream to be
// received by the remote will be sent on the channel. The notify
// timeout will determine the duration between go away received
// peer indicates connection closure. The last stream to be
// received by the remote will be sent on the channel. The notify
// timeout determines the duration between go away received
// and the connection being closed.
func (s *Connection) NotifyClose(c chan<- *Stream, timeout time.Duration) {
s.goAwayTimeout = timeout
Expand All @@ -844,7 +845,7 @@ func (s *Connection) NotifyClose(c chan<- *Stream, timeout time.Duration) {

// SetCloseTimeout sets the amount of time close will wait for
// streams to finish before terminating the underlying network
// connection. Setting the timeout to 0 will cause close to
// connection. Setting the timeout to 0 will cause close to
// wait forever, which is the default.
func (s *Connection) SetCloseTimeout(timeout time.Duration) {
s.closeTimeout = timeout
Expand Down Expand Up @@ -912,8 +913,8 @@ func (s *Connection) sendStream(stream *Stream, fin bool) error {
}

streamFrame := &spdy.SynStreamFrame{
StreamId: spdy.StreamId(stream.streamId),
AssociatedToStreamId: spdy.StreamId(parentId),
StreamId: stream.streamId,
AssociatedToStreamId: parentId,
Headers: stream.headers,
CFHeader: spdy.ControlFrameHeader{Flags: flags},
}
Expand All @@ -928,7 +929,7 @@ func (s *Connection) getNextStreamId() spdy.StreamId {
if sid > 0x7fffffff {
return 0
}
s.nextStreamId = s.nextStreamId + 2
s.nextStreamId += 2
return sid
}

Expand Down Expand Up @@ -966,7 +967,7 @@ func (s *Connection) getStream(streamId spdy.StreamId) (stream *Stream, ok bool)
s.streamLock.RLock()
stream, ok = s.streams[streamId]
s.streamLock.RUnlock()
return
return stream, ok
}

// FindStream looks up the given stream id and either waits for the
Expand Down
8 changes: 4 additions & 4 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func MirrorStreamHandler(stream *Stream) {
}

go func() {
io.Copy(stream, stream)
stream.Close()
_, _ = io.Copy(stream, stream)
_ = stream.Close()
}()
go func() {
for {
Expand All @@ -46,7 +46,7 @@ func MirrorStreamHandler(stream *Stream) {
}()
}

// NoopStreamHandler does nothing when stream connects.
// NoOpStreamHandler does nothing when stream connects.
func NoOpStreamHandler(stream *Stream) {
stream.SendReply(http.Header{}, false)
_ = stream.SendReply(http.Header{}, false)
}
6 changes: 3 additions & 3 deletions priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (fq frameQueue) Swap(i, j int) {
}

func (fq *frameQueue) Push(x interface{}) {
*fq = append(*fq, x.(*prioritizedFrame))
*fq = append(*fq, x.(*prioritizedFrame)) //nolint:forcetypeassert
}

func (fq *frameQueue) Pop() interface{} {
Expand Down Expand Up @@ -87,7 +87,7 @@ func (q *PriorityFrameQueue) Push(frame spdy.Frame, priority uint8) {
priority: priority,
insertId: q.nextInsertId,
}
q.nextInsertId = q.nextInsertId + 1
q.nextInsertId++
heap.Push(q.queue, pFrame)
q.c.Signal()
}
Expand All @@ -101,7 +101,7 @@ func (q *PriorityFrameQueue) Pop() spdy.Frame {
}
q.c.Wait()
}
frame := heap.Pop(q.queue).(*prioritizedFrame).frame
frame := heap.Pop(q.queue).(*prioritizedFrame).frame //nolint:forcetypeassert
q.c.Signal()
return frame
}
Expand Down
12 changes: 6 additions & 6 deletions priority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func TestPriorityQueueOrdering(t *testing.T) {
}

for i := spdy.StreamId(0); i < 150; i++ {
frame := queue.Pop()
if frame.(*spdy.DataFrame).StreamId != i {
t.Fatalf("Wrong frame\nActual: %d\nExpecting: %d", frame.(*spdy.DataFrame).StreamId, i)
frame := queue.Pop().(*spdy.DataFrame) //nolint:forcetypeassert
if frame.StreamId != i {
t.Fatalf("Wrong frame\nActual: %d\nExpecting: %d", frame.StreamId, i)
}
}
}
Expand All @@ -85,9 +85,9 @@ func TestPriorityQueueSync(t *testing.T) {

wg.Wait()
for i := spdy.StreamId(0); i < 150; i++ {
frame := queue.Pop()
if frame.(*spdy.DataFrame).StreamId != i {
t.Fatalf("Wrong frame\nActual: %d\nExpecting: %d", frame.(*spdy.DataFrame).StreamId, i)
frame := queue.Pop().(*spdy.DataFrame) //nolint:forcetypeassert
if frame.StreamId != i {
t.Fatalf("Wrong frame\nActual: %d\nExpecting: %d", frame.StreamId, i)
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions spdy/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package spdy
import (
"compress/zlib"
"encoding/binary"
"errors"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -134,7 +135,7 @@ func (f *Framer) uncorkHeaderDecompressor(payloadSize int64) error {
return nil
}
f.headerReader = io.LimitedReader{R: f.r, N: payloadSize}
decompressor, err := zlib.NewReaderDict(&f.headerReader, []byte(headerDictionary))
decompressor, err := zlib.NewReaderDict(&f.headerReader, headerDictionary)
if err != nil {
return err
}
Expand Down Expand Up @@ -241,7 +242,7 @@ func (f *Framer) readSynStreamFrame(h ControlFrameHeader, frame *SynStreamFrame)
reader = f.headerDecompressor
}
frame.Headers, err = parseHeaderValueBlock(reader, frame.StreamId)
if !f.headerCompressionDisabled && (err == io.EOF && f.headerReader.N == 0 || f.headerReader.N != 0) {
if !f.headerCompressionDisabled && ((errors.Is(err, io.EOF) && f.headerReader.N == 0) || f.headerReader.N != 0) {
err = &Error{WrongCompressedPayloadSize, 0}
}
if err != nil {
Expand Down Expand Up @@ -273,7 +274,7 @@ func (f *Framer) readSynReplyFrame(h ControlFrameHeader, frame *SynReplyFrame) e
reader = f.headerDecompressor
}
frame.Headers, err = parseHeaderValueBlock(reader, frame.StreamId)
if !f.headerCompressionDisabled && (err == io.EOF && f.headerReader.N == 0 || f.headerReader.N != 0) {
if !f.headerCompressionDisabled && ((errors.Is(err, io.EOF) && f.headerReader.N == 0) || f.headerReader.N != 0) {
err = &Error{WrongCompressedPayloadSize, 0}
}
if err != nil {
Expand Down Expand Up @@ -305,7 +306,7 @@ func (f *Framer) readHeadersFrame(h ControlFrameHeader, frame *HeadersFrame) err
reader = f.headerDecompressor
}
frame.Headers, err = parseHeaderValueBlock(reader, frame.StreamId)
if !f.headerCompressionDisabled && (err == io.EOF && f.headerReader.N == 0 || f.headerReader.N != 0) {
if !f.headerCompressionDisabled && ((errors.Is(err, io.EOF) && f.headerReader.N == 0) || f.headerReader.N != 0) {
err = &Error{WrongCompressedPayloadSize, 0}
}
if err != nil {
Expand Down
Loading
Loading