Source file
src/net/http/internal/http2/writesched_priority_rfc7540.go
1
2
3
4
5 package http2
6
7 import (
8 "fmt"
9 "math"
10 "sort"
11 )
12
13
14 const priorityDefaultWeightRFC7540 = 15
15
16
17 type PriorityWriteSchedulerConfig struct {
18
19
20
21
22
23
24
25
26
27
28
29
30 MaxClosedNodesInTree int
31
32
33
34
35
36
37
38
39
40
41
42 MaxIdleNodesInTree int
43
44
45
46
47
48
49
50
51
52 ThrottleOutOfOrderWrites bool
53 }
54
55
56
57
58 func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
59 return newPriorityWriteSchedulerRFC7540(cfg)
60 }
61
62 func newPriorityWriteSchedulerRFC7540(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
63 if cfg == nil {
64
65
66 cfg = &PriorityWriteSchedulerConfig{
67 MaxClosedNodesInTree: 10,
68 MaxIdleNodesInTree: 10,
69 ThrottleOutOfOrderWrites: false,
70 }
71 }
72
73 ws := &priorityWriteSchedulerRFC7540{
74 nodes: make(map[uint32]*priorityNodeRFC7540),
75 maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
76 maxIdleNodesInTree: cfg.MaxIdleNodesInTree,
77 enableWriteThrottle: cfg.ThrottleOutOfOrderWrites,
78 }
79 ws.nodes[0] = &ws.root
80 if cfg.ThrottleOutOfOrderWrites {
81 ws.writeThrottleLimit = 1024
82 } else {
83 ws.writeThrottleLimit = math.MaxInt32
84 }
85 return ws
86 }
87
88 type priorityNodeStateRFC7540 int
89
90 const (
91 priorityNodeOpenRFC7540 priorityNodeStateRFC7540 = iota
92 priorityNodeClosedRFC7540
93 priorityNodeIdleRFC7540
94 )
95
96
97
98
99 type priorityNodeRFC7540 struct {
100 q writeQueue
101 id uint32
102 weight uint8
103 state priorityNodeStateRFC7540
104 bytes int64
105 subtreeBytes int64
106
107
108 parent *priorityNodeRFC7540
109 kids *priorityNodeRFC7540
110 prev, next *priorityNodeRFC7540
111 }
112
113 func (n *priorityNodeRFC7540) setParent(parent *priorityNodeRFC7540) {
114 if n == parent {
115 panic("setParent to self")
116 }
117 if n.parent == parent {
118 return
119 }
120
121 if parent := n.parent; parent != nil {
122 if n.prev == nil {
123 parent.kids = n.next
124 } else {
125 n.prev.next = n.next
126 }
127 if n.next != nil {
128 n.next.prev = n.prev
129 }
130 }
131
132
133
134 n.parent = parent
135 if parent == nil {
136 n.next = nil
137 n.prev = nil
138 } else {
139 n.next = parent.kids
140 n.prev = nil
141 if n.next != nil {
142 n.next.prev = n
143 }
144 parent.kids = n
145 }
146 }
147
148 func (n *priorityNodeRFC7540) addBytes(b int64) {
149 n.bytes += b
150 for ; n != nil; n = n.parent {
151 n.subtreeBytes += b
152 }
153 }
154
155
156
157
158
159
160
161 func (n *priorityNodeRFC7540) walkReadyInOrder(openParent bool, tmp *[]*priorityNodeRFC7540, f func(*priorityNodeRFC7540, bool) bool) bool {
162 if !n.q.empty() && f(n, openParent) {
163 return true
164 }
165 if n.kids == nil {
166 return false
167 }
168
169
170
171 if n.id != 0 {
172 openParent = openParent || (n.state == priorityNodeOpenRFC7540)
173 }
174
175
176
177
178 w := n.kids.weight
179 needSort := false
180 for k := n.kids.next; k != nil; k = k.next {
181 if k.weight != w {
182 needSort = true
183 break
184 }
185 }
186 if !needSort {
187 for k := n.kids; k != nil; k = k.next {
188 if k.walkReadyInOrder(openParent, tmp, f) {
189 return true
190 }
191 }
192 return false
193 }
194
195
196
197 *tmp = (*tmp)[:0]
198 for n.kids != nil {
199 *tmp = append(*tmp, n.kids)
200 n.kids.setParent(nil)
201 }
202 sort.Sort(sortPriorityNodeSiblingsRFC7540(*tmp))
203 for i := len(*tmp) - 1; i >= 0; i-- {
204 (*tmp)[i].setParent(n)
205 }
206 for k := n.kids; k != nil; k = k.next {
207 if k.walkReadyInOrder(openParent, tmp, f) {
208 return true
209 }
210 }
211 return false
212 }
213
214 type sortPriorityNodeSiblingsRFC7540 []*priorityNodeRFC7540
215
216 func (z sortPriorityNodeSiblingsRFC7540) Len() int { return len(z) }
217 func (z sortPriorityNodeSiblingsRFC7540) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
218 func (z sortPriorityNodeSiblingsRFC7540) Less(i, k int) bool {
219
220
221 wi, bi := float64(z[i].weight)+1, float64(z[i].subtreeBytes)
222 wk, bk := float64(z[k].weight)+1, float64(z[k].subtreeBytes)
223 if bi == 0 && bk == 0 {
224 return wi >= wk
225 }
226 if bk == 0 {
227 return false
228 }
229 return bi/bk <= wi/wk
230 }
231
232 type priorityWriteSchedulerRFC7540 struct {
233
234
235 root priorityNodeRFC7540
236
237
238 nodes map[uint32]*priorityNodeRFC7540
239
240
241 maxID uint32
242
243
244
245
246 closedNodes, idleNodes []*priorityNodeRFC7540
247
248
249 maxClosedNodesInTree int
250 maxIdleNodesInTree int
251 writeThrottleLimit int32
252 enableWriteThrottle bool
253
254
255 tmp []*priorityNodeRFC7540
256
257
258 queuePool writeQueuePool
259 }
260
261 func (ws *priorityWriteSchedulerRFC7540) OpenStream(streamID uint32, options OpenStreamOptions) {
262
263 if curr := ws.nodes[streamID]; curr != nil {
264 if curr.state != priorityNodeIdleRFC7540 {
265 panic(fmt.Sprintf("stream %d already opened", streamID))
266 }
267 curr.state = priorityNodeOpenRFC7540
268 return
269 }
270
271
272
273
274
275 parent := ws.nodes[options.PusherID]
276 if parent == nil {
277 parent = &ws.root
278 }
279 n := &priorityNodeRFC7540{
280 q: *ws.queuePool.get(),
281 id: streamID,
282 weight: priorityDefaultWeightRFC7540,
283 state: priorityNodeOpenRFC7540,
284 }
285 n.setParent(parent)
286 ws.nodes[streamID] = n
287 if streamID > ws.maxID {
288 ws.maxID = streamID
289 }
290 }
291
292 func (ws *priorityWriteSchedulerRFC7540) CloseStream(streamID uint32) {
293 if streamID == 0 {
294 panic("violation of WriteScheduler interface: cannot close stream 0")
295 }
296 if ws.nodes[streamID] == nil {
297 panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
298 }
299 if ws.nodes[streamID].state != priorityNodeOpenRFC7540 {
300 panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
301 }
302
303 n := ws.nodes[streamID]
304 n.state = priorityNodeClosedRFC7540
305 n.addBytes(-n.bytes)
306
307 q := n.q
308 ws.queuePool.put(&q)
309 if ws.maxClosedNodesInTree > 0 {
310 ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
311 } else {
312 ws.removeNode(n)
313 }
314 }
315
316 func (ws *priorityWriteSchedulerRFC7540) AdjustStream(streamID uint32, priority PriorityParam) {
317 if streamID == 0 {
318 panic("adjustPriority on root")
319 }
320
321
322
323
324 n := ws.nodes[streamID]
325 if n == nil {
326 if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
327 return
328 }
329 ws.maxID = streamID
330 n = &priorityNodeRFC7540{
331 q: *ws.queuePool.get(),
332 id: streamID,
333 weight: priorityDefaultWeightRFC7540,
334 state: priorityNodeIdleRFC7540,
335 }
336 n.setParent(&ws.root)
337 ws.nodes[streamID] = n
338 ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
339 }
340
341
342
343 parent := ws.nodes[priority.StreamDep]
344 if parent == nil {
345 n.setParent(&ws.root)
346 n.weight = priorityDefaultWeightRFC7540
347 return
348 }
349
350
351 if n == parent {
352 return
353 }
354
355
356
357
358
359
360
361
362 for x := parent.parent; x != nil; x = x.parent {
363 if x == n {
364 parent.setParent(n.parent)
365 break
366 }
367 }
368
369
370
371
372 if priority.Exclusive {
373 k := parent.kids
374 for k != nil {
375 next := k.next
376 if k != n {
377 k.setParent(n)
378 }
379 k = next
380 }
381 }
382
383 n.setParent(parent)
384 n.weight = priority.Weight
385 }
386
387 func (ws *priorityWriteSchedulerRFC7540) Push(wr FrameWriteRequest) {
388 var n *priorityNodeRFC7540
389 if wr.isControl() {
390 n = &ws.root
391 } else {
392 id := wr.StreamID()
393 n = ws.nodes[id]
394 if n == nil {
395
396
397
398 if wr.DataSize() > 0 {
399 panic("add DATA on non-open stream")
400 }
401 n = &ws.root
402 }
403 }
404 n.q.push(wr)
405 }
406
407 func (ws *priorityWriteSchedulerRFC7540) Pop() (wr FrameWriteRequest, ok bool) {
408 ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNodeRFC7540, openParent bool) bool {
409 limit := int32(math.MaxInt32)
410 if openParent {
411 limit = ws.writeThrottleLimit
412 }
413 wr, ok = n.q.consume(limit)
414 if !ok {
415 return false
416 }
417 n.addBytes(int64(wr.DataSize()))
418
419
420
421 if openParent {
422 ws.writeThrottleLimit += 1024
423 if ws.writeThrottleLimit < 0 {
424 ws.writeThrottleLimit = math.MaxInt32
425 }
426 } else if ws.enableWriteThrottle {
427 ws.writeThrottleLimit = 1024
428 }
429 return true
430 })
431 return wr, ok
432 }
433
434 func (ws *priorityWriteSchedulerRFC7540) addClosedOrIdleNode(list *[]*priorityNodeRFC7540, maxSize int, n *priorityNodeRFC7540) {
435 if maxSize == 0 {
436 return
437 }
438 if len(*list) == maxSize {
439
440 ws.removeNode((*list)[0])
441 x := (*list)[1:]
442 copy(*list, x)
443 *list = (*list)[:len(x)]
444 }
445 *list = append(*list, n)
446 }
447
448 func (ws *priorityWriteSchedulerRFC7540) removeNode(n *priorityNodeRFC7540) {
449 for n.kids != nil {
450 n.kids.setParent(n.parent)
451 }
452 n.setParent(nil)
453 delete(ws.nodes, n.id)
454 }
455
View as plain text