Source file src/net/http/internal/http2/writesched_priority_rfc9218.go

     1  // Copyright 2025 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package http2
     6  
     7  import (
     8  	"fmt"
     9  	"math"
    10  )
    11  
    12  type streamMetadata struct {
    13  	location *writeQueue
    14  	priority PriorityParam
    15  }
    16  
    17  type priorityWriteSchedulerRFC9218 struct {
    18  	// control contains control frames (SETTINGS, PING, etc.).
    19  	control writeQueue
    20  
    21  	// heads contain the head of a circular list of streams.
    22  	// We put these heads within a nested array that represents urgency and
    23  	// incremental, as defined in
    24  	// https://www.rfc-editor.org/rfc/rfc9218.html#name-priority-parameters.
    25  	// 8 represents u=0 up to u=7, and 2 represents i=false and i=true.
    26  	heads [8][2]*writeQueue
    27  
    28  	// streams contains a mapping between each stream ID and their metadata, so
    29  	// we can quickly locate them when needing to, for example, adjust their
    30  	// priority.
    31  	streams map[uint32]streamMetadata
    32  
    33  	// queuePool are empty queues for reuse.
    34  	queuePool writeQueuePool
    35  
    36  	// prioritizeIncremental is used to determine whether we should prioritize
    37  	// incremental streams or not, when urgency is the same in a given Pop()
    38  	// call.
    39  	prioritizeIncremental bool
    40  
    41  	// priorityUpdateBuf is used to buffer the most recent PRIORITY_UPDATE we
    42  	// receive per https://www.rfc-editor.org/rfc/rfc9218.html#name-the-priority_update-frame.
    43  	priorityUpdateBuf struct {
    44  		// streamID being 0 means that the buffer is empty. This is a safe
    45  		// assumption as PRIORITY_UPDATE for stream 0 is a PROTOCOL_ERROR.
    46  		streamID uint32
    47  		priority PriorityParam
    48  	}
    49  }
    50  
    51  func newPriorityWriteSchedulerRFC9218() WriteScheduler {
    52  	ws := &priorityWriteSchedulerRFC9218{
    53  		streams: make(map[uint32]streamMetadata),
    54  	}
    55  	return ws
    56  }
    57  
    58  func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStreamOptions) {
    59  	if ws.streams[streamID].location != nil {
    60  		panic(fmt.Errorf("stream %d already opened", streamID))
    61  	}
    62  	if streamID == ws.priorityUpdateBuf.streamID {
    63  		ws.priorityUpdateBuf.streamID = 0
    64  		opt.priority = ws.priorityUpdateBuf.priority
    65  	}
    66  	q := ws.queuePool.get()
    67  	ws.streams[streamID] = streamMetadata{
    68  		location: q,
    69  		priority: opt.priority,
    70  	}
    71  
    72  	u, i := opt.priority.urgency, opt.priority.incremental
    73  	if ws.heads[u][i] == nil {
    74  		ws.heads[u][i] = q
    75  		q.next = q
    76  		q.prev = q
    77  	} else {
    78  		// Queues are stored in a ring.
    79  		// Insert the new stream before ws.head, putting it at the end of the list.
    80  		q.prev = ws.heads[u][i].prev
    81  		q.next = ws.heads[u][i]
    82  		q.prev.next = q
    83  		q.next.prev = q
    84  	}
    85  }
    86  
    87  func (ws *priorityWriteSchedulerRFC9218) CloseStream(streamID uint32) {
    88  	metadata := ws.streams[streamID]
    89  	q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
    90  	if q == nil {
    91  		return
    92  	}
    93  	if q.next == q {
    94  		// This was the only open stream.
    95  		ws.heads[u][i] = nil
    96  	} else {
    97  		q.prev.next = q.next
    98  		q.next.prev = q.prev
    99  		if ws.heads[u][i] == q {
   100  			ws.heads[u][i] = q.next
   101  		}
   102  	}
   103  	delete(ws.streams, streamID)
   104  	ws.queuePool.put(q)
   105  }
   106  
   107  func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority PriorityParam) {
   108  	metadata := ws.streams[streamID]
   109  	q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
   110  	if q == nil {
   111  		ws.priorityUpdateBuf.streamID = streamID
   112  		ws.priorityUpdateBuf.priority = priority
   113  		return
   114  	}
   115  
   116  	// Remove stream from current location.
   117  	if q.next == q {
   118  		// This was the only open stream.
   119  		ws.heads[u][i] = nil
   120  	} else {
   121  		q.prev.next = q.next
   122  		q.next.prev = q.prev
   123  		if ws.heads[u][i] == q {
   124  			ws.heads[u][i] = q.next
   125  		}
   126  	}
   127  
   128  	// Insert stream to the new queue.
   129  	u, i = priority.urgency, priority.incremental
   130  	if ws.heads[u][i] == nil {
   131  		ws.heads[u][i] = q
   132  		q.next = q
   133  		q.prev = q
   134  	} else {
   135  		// Queues are stored in a ring.
   136  		// Insert the new stream before ws.head, putting it at the end of the list.
   137  		q.prev = ws.heads[u][i].prev
   138  		q.next = ws.heads[u][i]
   139  		q.prev.next = q
   140  		q.next.prev = q
   141  	}
   142  
   143  	// Update the metadata.
   144  	ws.streams[streamID] = streamMetadata{
   145  		location: q,
   146  		priority: priority,
   147  	}
   148  }
   149  
   150  func (ws *priorityWriteSchedulerRFC9218) Push(wr FrameWriteRequest) {
   151  	if wr.isControl() {
   152  		ws.control.push(wr)
   153  		return
   154  	}
   155  	q := ws.streams[wr.StreamID()].location
   156  	if q == nil {
   157  		// This is a closed stream.
   158  		// wr should not be a HEADERS or DATA frame.
   159  		// We push the request onto the control queue.
   160  		if wr.DataSize() > 0 {
   161  			panic("add DATA on non-open stream")
   162  		}
   163  		ws.control.push(wr)
   164  		return
   165  	}
   166  	q.push(wr)
   167  }
   168  
   169  func (ws *priorityWriteSchedulerRFC9218) Pop() (FrameWriteRequest, bool) {
   170  	// Control and RST_STREAM frames first.
   171  	if !ws.control.empty() {
   172  		return ws.control.shift(), true
   173  	}
   174  
   175  	// On the next Pop(), we want to prioritize incremental if we prioritized
   176  	// non-incremental request of the same urgency this time. Vice-versa.
   177  	// i.e. when there are incremental and non-incremental requests at the same
   178  	// priority, we give 50% of our bandwidth to the incremental ones in
   179  	// aggregate and 50% to the first non-incremental one (since
   180  	// non-incremental streams do not use round-robin writes).
   181  	ws.prioritizeIncremental = !ws.prioritizeIncremental
   182  
   183  	// Always prioritize lowest u (i.e. highest urgency level).
   184  	for u := range ws.heads {
   185  		for i := range ws.heads[u] {
   186  			// When we want to prioritize incremental, we try to pop i=true
   187  			// first before i=false when u is the same.
   188  			if ws.prioritizeIncremental {
   189  				i = (i + 1) % 2
   190  			}
   191  			q := ws.heads[u][i]
   192  			if q == nil {
   193  				continue
   194  			}
   195  			for {
   196  				if wr, ok := q.consume(math.MaxInt32); ok {
   197  					if i == 1 {
   198  						// For incremental streams, we update head to q.next so
   199  						// we can round-robin between multiple streams that can
   200  						// immediately benefit from partial writes.
   201  						ws.heads[u][i] = q.next
   202  					} else {
   203  						// For non-incremental streams, we try to finish one to
   204  						// completion rather than doing round-robin. However,
   205  						// we update head here so that if q.consume() is !ok
   206  						// (e.g. the stream has no more frame to consume), head
   207  						// is updated to the next q that has frames to consume
   208  						// on future iterations. This way, we do not prioritize
   209  						// writing to unavailable stream on next Pop() calls,
   210  						// preventing head-of-line blocking.
   211  						ws.heads[u][i] = q
   212  					}
   213  					return wr, true
   214  				}
   215  				q = q.next
   216  				if q == ws.heads[u][i] {
   217  					break
   218  				}
   219  			}
   220  
   221  		}
   222  	}
   223  	return FrameWriteRequest{}, false
   224  }
   225  

View as plain text