Source file src/net/http/internal/http2/writesched_priority_rfc7540.go

     1  // Copyright 2016 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package http2
     6  
     7  import (
     8  	"fmt"
     9  	"math"
    10  	"sort"
    11  )
    12  
    13  // RFC 7540, Section 5.3.5: the default weight is 16.
    14  const priorityDefaultWeightRFC7540 = 15 // 16 = 15 + 1
    15  
    16  // PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
    17  type PriorityWriteSchedulerConfig struct {
    18  	// MaxClosedNodesInTree controls the maximum number of closed streams to
    19  	// retain in the priority tree. Setting this to zero saves a small amount
    20  	// of memory at the cost of performance.
    21  	//
    22  	// See RFC 7540, Section 5.3.4:
    23  	//   "It is possible for a stream to become closed while prioritization
    24  	//   information ... is in transit. ... This potentially creates suboptimal
    25  	//   prioritization, since the stream could be given a priority that is
    26  	//   different from what is intended. To avoid these problems, an endpoint
    27  	//   SHOULD retain stream prioritization state for a period after streams
    28  	//   become closed. The longer state is retained, the lower the chance that
    29  	//   streams are assigned incorrect or default priority values."
    30  	MaxClosedNodesInTree int
    31  
    32  	// MaxIdleNodesInTree controls the maximum number of idle streams to
    33  	// retain in the priority tree. Setting this to zero saves a small amount
    34  	// of memory at the cost of performance.
    35  	//
    36  	// See RFC 7540, Section 5.3.4:
    37  	//   Similarly, streams that are in the "idle" state can be assigned
    38  	//   priority or become a parent of other streams. This allows for the
    39  	//   creation of a grouping node in the dependency tree, which enables
    40  	//   more flexible expressions of priority. Idle streams begin with a
    41  	//   default priority (Section 5.3.5).
    42  	MaxIdleNodesInTree int
    43  
    44  	// ThrottleOutOfOrderWrites enables write throttling to help ensure that
    45  	// data is delivered in priority order. This works around a race where
    46  	// stream B depends on stream A and both streams are about to call Write
    47  	// to queue DATA frames. If B wins the race, a naive scheduler would eagerly
    48  	// write as much data from B as possible, but this is suboptimal because A
    49  	// is a higher-priority stream. With throttling enabled, we write a small
    50  	// amount of data from B to minimize the amount of bandwidth that B can
    51  	// steal from A.
    52  	ThrottleOutOfOrderWrites bool
    53  }
    54  
    55  // NewPriorityWriteScheduler constructs a WriteScheduler that schedules
    56  // frames by following HTTP/2 priorities as described in RFC 7540 Section 5.3.
    57  // If cfg is nil, default options are used.
    58  func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
    59  	return newPriorityWriteSchedulerRFC7540(cfg)
    60  }
    61  
    62  func newPriorityWriteSchedulerRFC7540(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
    63  	if cfg == nil {
    64  		// For justification of these defaults, see:
    65  		// https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY
    66  		cfg = &PriorityWriteSchedulerConfig{
    67  			MaxClosedNodesInTree:     10,
    68  			MaxIdleNodesInTree:       10,
    69  			ThrottleOutOfOrderWrites: false,
    70  		}
    71  	}
    72  
    73  	ws := &priorityWriteSchedulerRFC7540{
    74  		nodes:                make(map[uint32]*priorityNodeRFC7540),
    75  		maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
    76  		maxIdleNodesInTree:   cfg.MaxIdleNodesInTree,
    77  		enableWriteThrottle:  cfg.ThrottleOutOfOrderWrites,
    78  	}
    79  	ws.nodes[0] = &ws.root
    80  	if cfg.ThrottleOutOfOrderWrites {
    81  		ws.writeThrottleLimit = 1024
    82  	} else {
    83  		ws.writeThrottleLimit = math.MaxInt32
    84  	}
    85  	return ws
    86  }
    87  
    88  type priorityNodeStateRFC7540 int
    89  
    90  const (
    91  	priorityNodeOpenRFC7540 priorityNodeStateRFC7540 = iota
    92  	priorityNodeClosedRFC7540
    93  	priorityNodeIdleRFC7540
    94  )
    95  
    96  // priorityNodeRFC7540 is a node in an HTTP/2 priority tree.
    97  // Each node is associated with a single stream ID.
    98  // See RFC 7540, Section 5.3.
    99  type priorityNodeRFC7540 struct {
   100  	q            writeQueue               // queue of pending frames to write
   101  	id           uint32                   // id of the stream, or 0 for the root of the tree
   102  	weight       uint8                    // the actual weight is weight+1, so the value is in [1,256]
   103  	state        priorityNodeStateRFC7540 // open | closed | idle
   104  	bytes        int64                    // number of bytes written by this node, or 0 if closed
   105  	subtreeBytes int64                    // sum(node.bytes) of all nodes in this subtree
   106  
   107  	// These links form the priority tree.
   108  	parent     *priorityNodeRFC7540
   109  	kids       *priorityNodeRFC7540 // start of the kids list
   110  	prev, next *priorityNodeRFC7540 // doubly-linked list of siblings
   111  }
   112  
   113  func (n *priorityNodeRFC7540) setParent(parent *priorityNodeRFC7540) {
   114  	if n == parent {
   115  		panic("setParent to self")
   116  	}
   117  	if n.parent == parent {
   118  		return
   119  	}
   120  	// Unlink from current parent.
   121  	if parent := n.parent; parent != nil {
   122  		if n.prev == nil {
   123  			parent.kids = n.next
   124  		} else {
   125  			n.prev.next = n.next
   126  		}
   127  		if n.next != nil {
   128  			n.next.prev = n.prev
   129  		}
   130  	}
   131  	// Link to new parent.
   132  	// If parent=nil, remove n from the tree.
   133  	// Always insert at the head of parent.kids (this is assumed by walkReadyInOrder).
   134  	n.parent = parent
   135  	if parent == nil {
   136  		n.next = nil
   137  		n.prev = nil
   138  	} else {
   139  		n.next = parent.kids
   140  		n.prev = nil
   141  		if n.next != nil {
   142  			n.next.prev = n
   143  		}
   144  		parent.kids = n
   145  	}
   146  }
   147  
   148  func (n *priorityNodeRFC7540) addBytes(b int64) {
   149  	n.bytes += b
   150  	for ; n != nil; n = n.parent {
   151  		n.subtreeBytes += b
   152  	}
   153  }
   154  
   155  // walkReadyInOrder iterates over the tree in priority order, calling f for each node
   156  // with a non-empty write queue. When f returns true, this function returns true and the
   157  // walk halts. tmp is used as scratch space for sorting.
   158  //
   159  // f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true
   160  // if any ancestor p of n is still open (ignoring the root node).
   161  func (n *priorityNodeRFC7540) walkReadyInOrder(openParent bool, tmp *[]*priorityNodeRFC7540, f func(*priorityNodeRFC7540, bool) bool) bool {
   162  	if !n.q.empty() && f(n, openParent) {
   163  		return true
   164  	}
   165  	if n.kids == nil {
   166  		return false
   167  	}
   168  
   169  	// Don't consider the root "open" when updating openParent since
   170  	// we can't send data frames on the root stream (only control frames).
   171  	if n.id != 0 {
   172  		openParent = openParent || (n.state == priorityNodeOpenRFC7540)
   173  	}
   174  
   175  	// Common case: only one kid or all kids have the same weight.
   176  	// Some clients don't use weights; other clients (like web browsers)
   177  	// use mostly-linear priority trees.
   178  	w := n.kids.weight
   179  	needSort := false
   180  	for k := n.kids.next; k != nil; k = k.next {
   181  		if k.weight != w {
   182  			needSort = true
   183  			break
   184  		}
   185  	}
   186  	if !needSort {
   187  		for k := n.kids; k != nil; k = k.next {
   188  			if k.walkReadyInOrder(openParent, tmp, f) {
   189  				return true
   190  			}
   191  		}
   192  		return false
   193  	}
   194  
   195  	// Uncommon case: sort the child nodes. We remove the kids from the parent,
   196  	// then re-insert after sorting so we can reuse tmp for future sort calls.
   197  	*tmp = (*tmp)[:0]
   198  	for n.kids != nil {
   199  		*tmp = append(*tmp, n.kids)
   200  		n.kids.setParent(nil)
   201  	}
   202  	sort.Sort(sortPriorityNodeSiblingsRFC7540(*tmp))
   203  	for i := len(*tmp) - 1; i >= 0; i-- {
   204  		(*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
   205  	}
   206  	for k := n.kids; k != nil; k = k.next {
   207  		if k.walkReadyInOrder(openParent, tmp, f) {
   208  			return true
   209  		}
   210  	}
   211  	return false
   212  }
   213  
   214  type sortPriorityNodeSiblingsRFC7540 []*priorityNodeRFC7540
   215  
   216  func (z sortPriorityNodeSiblingsRFC7540) Len() int      { return len(z) }
   217  func (z sortPriorityNodeSiblingsRFC7540) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
   218  func (z sortPriorityNodeSiblingsRFC7540) Less(i, k int) bool {
   219  	// Prefer the subtree that has sent fewer bytes relative to its weight.
   220  	// See sections 5.3.2 and 5.3.4.
   221  	wi, bi := float64(z[i].weight)+1, float64(z[i].subtreeBytes)
   222  	wk, bk := float64(z[k].weight)+1, float64(z[k].subtreeBytes)
   223  	if bi == 0 && bk == 0 {
   224  		return wi >= wk
   225  	}
   226  	if bk == 0 {
   227  		return false
   228  	}
   229  	return bi/bk <= wi/wk
   230  }
   231  
   232  type priorityWriteSchedulerRFC7540 struct {
   233  	// root is the root of the priority tree, where root.id = 0.
   234  	// The root queues control frames that are not associated with any stream.
   235  	root priorityNodeRFC7540
   236  
   237  	// nodes maps stream ids to priority tree nodes.
   238  	nodes map[uint32]*priorityNodeRFC7540
   239  
   240  	// maxID is the maximum stream id in nodes.
   241  	maxID uint32
   242  
   243  	// lists of nodes that have been closed or are idle, but are kept in
   244  	// the tree for improved prioritization. When the lengths exceed either
   245  	// maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded.
   246  	closedNodes, idleNodes []*priorityNodeRFC7540
   247  
   248  	// From the config.
   249  	maxClosedNodesInTree int
   250  	maxIdleNodesInTree   int
   251  	writeThrottleLimit   int32
   252  	enableWriteThrottle  bool
   253  
   254  	// tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations.
   255  	tmp []*priorityNodeRFC7540
   256  
   257  	// pool of empty queues for reuse.
   258  	queuePool writeQueuePool
   259  }
   260  
   261  func (ws *priorityWriteSchedulerRFC7540) OpenStream(streamID uint32, options OpenStreamOptions) {
   262  	// The stream may be currently idle but cannot be opened or closed.
   263  	if curr := ws.nodes[streamID]; curr != nil {
   264  		if curr.state != priorityNodeIdleRFC7540 {
   265  			panic(fmt.Sprintf("stream %d already opened", streamID))
   266  		}
   267  		curr.state = priorityNodeOpenRFC7540
   268  		return
   269  	}
   270  
   271  	// RFC 7540, Section 5.3.5:
   272  	//  "All streams are initially assigned a non-exclusive dependency on stream 0x0.
   273  	//  Pushed streams initially depend on their associated stream. In both cases,
   274  	//  streams are assigned a default weight of 16."
   275  	parent := ws.nodes[options.PusherID]
   276  	if parent == nil {
   277  		parent = &ws.root
   278  	}
   279  	n := &priorityNodeRFC7540{
   280  		q:      *ws.queuePool.get(),
   281  		id:     streamID,
   282  		weight: priorityDefaultWeightRFC7540,
   283  		state:  priorityNodeOpenRFC7540,
   284  	}
   285  	n.setParent(parent)
   286  	ws.nodes[streamID] = n
   287  	if streamID > ws.maxID {
   288  		ws.maxID = streamID
   289  	}
   290  }
   291  
   292  func (ws *priorityWriteSchedulerRFC7540) CloseStream(streamID uint32) {
   293  	if streamID == 0 {
   294  		panic("violation of WriteScheduler interface: cannot close stream 0")
   295  	}
   296  	if ws.nodes[streamID] == nil {
   297  		panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
   298  	}
   299  	if ws.nodes[streamID].state != priorityNodeOpenRFC7540 {
   300  		panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
   301  	}
   302  
   303  	n := ws.nodes[streamID]
   304  	n.state = priorityNodeClosedRFC7540
   305  	n.addBytes(-n.bytes)
   306  
   307  	q := n.q
   308  	ws.queuePool.put(&q)
   309  	if ws.maxClosedNodesInTree > 0 {
   310  		ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
   311  	} else {
   312  		ws.removeNode(n)
   313  	}
   314  }
   315  
   316  func (ws *priorityWriteSchedulerRFC7540) AdjustStream(streamID uint32, priority PriorityParam) {
   317  	if streamID == 0 {
   318  		panic("adjustPriority on root")
   319  	}
   320  
   321  	// If streamID does not exist, there are two cases:
   322  	// - A closed stream that has been removed (this will have ID <= maxID)
   323  	// - An idle stream that is being used for "grouping" (this will have ID > maxID)
   324  	n := ws.nodes[streamID]
   325  	if n == nil {
   326  		if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
   327  			return
   328  		}
   329  		ws.maxID = streamID
   330  		n = &priorityNodeRFC7540{
   331  			q:      *ws.queuePool.get(),
   332  			id:     streamID,
   333  			weight: priorityDefaultWeightRFC7540,
   334  			state:  priorityNodeIdleRFC7540,
   335  		}
   336  		n.setParent(&ws.root)
   337  		ws.nodes[streamID] = n
   338  		ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
   339  	}
   340  
   341  	// Section 5.3.1: A dependency on a stream that is not currently in the tree
   342  	// results in that stream being given a default priority (Section 5.3.5).
   343  	parent := ws.nodes[priority.StreamDep]
   344  	if parent == nil {
   345  		n.setParent(&ws.root)
   346  		n.weight = priorityDefaultWeightRFC7540
   347  		return
   348  	}
   349  
   350  	// Ignore if the client tries to make a node its own parent.
   351  	if n == parent {
   352  		return
   353  	}
   354  
   355  	// Section 5.3.3:
   356  	//   "If a stream is made dependent on one of its own dependencies, the
   357  	//   formerly dependent stream is first moved to be dependent on the
   358  	//   reprioritized stream's previous parent. The moved dependency retains
   359  	//   its weight."
   360  	//
   361  	// That is: if parent depends on n, move parent to depend on n.parent.
   362  	for x := parent.parent; x != nil; x = x.parent {
   363  		if x == n {
   364  			parent.setParent(n.parent)
   365  			break
   366  		}
   367  	}
   368  
   369  	// Section 5.3.3: The exclusive flag causes the stream to become the sole
   370  	// dependency of its parent stream, causing other dependencies to become
   371  	// dependent on the exclusive stream.
   372  	if priority.Exclusive {
   373  		k := parent.kids
   374  		for k != nil {
   375  			next := k.next
   376  			if k != n {
   377  				k.setParent(n)
   378  			}
   379  			k = next
   380  		}
   381  	}
   382  
   383  	n.setParent(parent)
   384  	n.weight = priority.Weight
   385  }
   386  
   387  func (ws *priorityWriteSchedulerRFC7540) Push(wr FrameWriteRequest) {
   388  	var n *priorityNodeRFC7540
   389  	if wr.isControl() {
   390  		n = &ws.root
   391  	} else {
   392  		id := wr.StreamID()
   393  		n = ws.nodes[id]
   394  		if n == nil {
   395  			// id is an idle or closed stream. wr should not be a HEADERS or
   396  			// DATA frame. In other case, we push wr onto the root, rather
   397  			// than creating a new priorityNode.
   398  			if wr.DataSize() > 0 {
   399  				panic("add DATA on non-open stream")
   400  			}
   401  			n = &ws.root
   402  		}
   403  	}
   404  	n.q.push(wr)
   405  }
   406  
   407  func (ws *priorityWriteSchedulerRFC7540) Pop() (wr FrameWriteRequest, ok bool) {
   408  	ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNodeRFC7540, openParent bool) bool {
   409  		limit := int32(math.MaxInt32)
   410  		if openParent {
   411  			limit = ws.writeThrottleLimit
   412  		}
   413  		wr, ok = n.q.consume(limit)
   414  		if !ok {
   415  			return false
   416  		}
   417  		n.addBytes(int64(wr.DataSize()))
   418  		// If B depends on A and B continuously has data available but A
   419  		// does not, gradually increase the throttling limit to allow B to
   420  		// steal more and more bandwidth from A.
   421  		if openParent {
   422  			ws.writeThrottleLimit += 1024
   423  			if ws.writeThrottleLimit < 0 {
   424  				ws.writeThrottleLimit = math.MaxInt32
   425  			}
   426  		} else if ws.enableWriteThrottle {
   427  			ws.writeThrottleLimit = 1024
   428  		}
   429  		return true
   430  	})
   431  	return wr, ok
   432  }
   433  
   434  func (ws *priorityWriteSchedulerRFC7540) addClosedOrIdleNode(list *[]*priorityNodeRFC7540, maxSize int, n *priorityNodeRFC7540) {
   435  	if maxSize == 0 {
   436  		return
   437  	}
   438  	if len(*list) == maxSize {
   439  		// Remove the oldest node, then shift left.
   440  		ws.removeNode((*list)[0])
   441  		x := (*list)[1:]
   442  		copy(*list, x)
   443  		*list = (*list)[:len(x)]
   444  	}
   445  	*list = append(*list, n)
   446  }
   447  
   448  func (ws *priorityWriteSchedulerRFC7540) removeNode(n *priorityNodeRFC7540) {
   449  	for n.kids != nil {
   450  		n.kids.setParent(n.parent)
   451  	}
   452  	n.setParent(nil)
   453  	delete(ws.nodes, n.id)
   454  }
   455  

View as plain text