Source file
src/net/http/internal/http2/writesched_priority_rfc9218.go
1
2
3
4
5 package http2
6
7 import (
8 "fmt"
9 "math"
10 )
11
12 type streamMetadata struct {
13 location *writeQueue
14 priority PriorityParam
15 }
16
17 type priorityWriteSchedulerRFC9218 struct {
18
19 control writeQueue
20
21
22
23
24
25
26 heads [8][2]*writeQueue
27
28
29
30
31 streams map[uint32]streamMetadata
32
33
34 queuePool writeQueuePool
35
36
37
38
39 prioritizeIncremental bool
40
41
42
43 priorityUpdateBuf struct {
44
45
46 streamID uint32
47 priority PriorityParam
48 }
49 }
50
51 func newPriorityWriteSchedulerRFC9218() WriteScheduler {
52 ws := &priorityWriteSchedulerRFC9218{
53 streams: make(map[uint32]streamMetadata),
54 }
55 return ws
56 }
57
58 func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStreamOptions) {
59 if ws.streams[streamID].location != nil {
60 panic(fmt.Errorf("stream %d already opened", streamID))
61 }
62 if streamID == ws.priorityUpdateBuf.streamID {
63 ws.priorityUpdateBuf.streamID = 0
64 opt.priority = ws.priorityUpdateBuf.priority
65 }
66 q := ws.queuePool.get()
67 ws.streams[streamID] = streamMetadata{
68 location: q,
69 priority: opt.priority,
70 }
71
72 u, i := opt.priority.urgency, opt.priority.incremental
73 if ws.heads[u][i] == nil {
74 ws.heads[u][i] = q
75 q.next = q
76 q.prev = q
77 } else {
78
79
80 q.prev = ws.heads[u][i].prev
81 q.next = ws.heads[u][i]
82 q.prev.next = q
83 q.next.prev = q
84 }
85 }
86
87 func (ws *priorityWriteSchedulerRFC9218) CloseStream(streamID uint32) {
88 metadata := ws.streams[streamID]
89 q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
90 if q == nil {
91 return
92 }
93 if q.next == q {
94
95 ws.heads[u][i] = nil
96 } else {
97 q.prev.next = q.next
98 q.next.prev = q.prev
99 if ws.heads[u][i] == q {
100 ws.heads[u][i] = q.next
101 }
102 }
103 delete(ws.streams, streamID)
104 ws.queuePool.put(q)
105 }
106
107 func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority PriorityParam) {
108 metadata := ws.streams[streamID]
109 q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
110 if q == nil {
111 ws.priorityUpdateBuf.streamID = streamID
112 ws.priorityUpdateBuf.priority = priority
113 return
114 }
115
116
117 if q.next == q {
118
119 ws.heads[u][i] = nil
120 } else {
121 q.prev.next = q.next
122 q.next.prev = q.prev
123 if ws.heads[u][i] == q {
124 ws.heads[u][i] = q.next
125 }
126 }
127
128
129 u, i = priority.urgency, priority.incremental
130 if ws.heads[u][i] == nil {
131 ws.heads[u][i] = q
132 q.next = q
133 q.prev = q
134 } else {
135
136
137 q.prev = ws.heads[u][i].prev
138 q.next = ws.heads[u][i]
139 q.prev.next = q
140 q.next.prev = q
141 }
142
143
144 ws.streams[streamID] = streamMetadata{
145 location: q,
146 priority: priority,
147 }
148 }
149
150 func (ws *priorityWriteSchedulerRFC9218) Push(wr FrameWriteRequest) {
151 if wr.isControl() {
152 ws.control.push(wr)
153 return
154 }
155 q := ws.streams[wr.StreamID()].location
156 if q == nil {
157
158
159
160 if wr.DataSize() > 0 {
161 panic("add DATA on non-open stream")
162 }
163 ws.control.push(wr)
164 return
165 }
166 q.push(wr)
167 }
168
169 func (ws *priorityWriteSchedulerRFC9218) Pop() (FrameWriteRequest, bool) {
170
171 if !ws.control.empty() {
172 return ws.control.shift(), true
173 }
174
175
176
177
178
179
180
181 ws.prioritizeIncremental = !ws.prioritizeIncremental
182
183
184 for u := range ws.heads {
185 for i := range ws.heads[u] {
186
187
188 if ws.prioritizeIncremental {
189 i = (i + 1) % 2
190 }
191 q := ws.heads[u][i]
192 if q == nil {
193 continue
194 }
195 for {
196 if wr, ok := q.consume(math.MaxInt32); ok {
197 if i == 1 {
198
199
200
201 ws.heads[u][i] = q.next
202 } else {
203
204
205
206
207
208
209
210
211 ws.heads[u][i] = q
212 }
213 return wr, true
214 }
215 q = q.next
216 if q == ws.heads[u][i] {
217 break
218 }
219 }
220
221 }
222 }
223 return FrameWriteRequest{}, false
224 }
225
View as plain text