1
2
3
4
5 package http2_test
6
7 import (
8 "bytes"
9 "context"
10 "errors"
11 "io"
12 "math"
13 "net"
14 "net/netip"
15 "os"
16 "sync"
17 "time"
18 )
19
20
21
22
23
24
25
26 func synctestNetPipe() (r, w *synctestNetConn) {
27 s1addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:8000"))
28 s2addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:8001"))
29 s1 := newSynctestNetConnHalf(s1addr)
30 s2 := newSynctestNetConnHalf(s2addr)
31 r = &synctestNetConn{loc: s1, rem: s2}
32 w = &synctestNetConn{loc: s2, rem: s1}
33 r.peer = w
34 w.peer = r
35 return r, w
36 }
37
38
39 type synctestNetConn struct {
40
41
42
43 loc, rem *synctestNetConnHalf
44
45
46 peer *synctestNetConn
47 }
48
49
50 func (c *synctestNetConn) Read(b []byte) (n int, err error) {
51 return c.loc.read(b)
52 }
53
54
55
56 func (c *synctestNetConn) Peek() []byte {
57 return c.loc.peek()
58 }
59
60
61 func (c *synctestNetConn) Write(b []byte) (n int, err error) {
62 return c.rem.write(b)
63 }
64
65
66 func (c *synctestNetConn) IsClosedByPeer() bool {
67 return c.loc.isClosedByPeer()
68 }
69
70
71 func (c *synctestNetConn) Close() error {
72 c.loc.setWriteError(errors.New("connection closed by peer"))
73 c.rem.setReadError(io.EOF)
74 return nil
75 }
76
77
78 func (c *synctestNetConn) LocalAddr() net.Addr {
79 return c.loc.addr
80 }
81
82
83 func (c *synctestNetConn) RemoteAddr() net.Addr {
84 return c.rem.addr
85 }
86
87
88 func (c *synctestNetConn) SetDeadline(t time.Time) error {
89 c.SetReadDeadline(t)
90 c.SetWriteDeadline(t)
91 return nil
92 }
93
94
95 func (c *synctestNetConn) SetReadDeadline(t time.Time) error {
96 c.loc.rctx.setDeadline(t)
97 return nil
98 }
99
100
101 func (c *synctestNetConn) SetWriteDeadline(t time.Time) error {
102 c.rem.wctx.setDeadline(t)
103 return nil
104 }
105
106
107
108 func (c *synctestNetConn) SetReadBufferSize(size int) {
109 c.loc.setReadBufferSize(size)
110 }
111
112
113
114 type synctestNetConnHalf struct {
115 addr net.Addr
116
117
118 rctx, wctx deadlineContext
119
120
121
122
123
124
125
126 lockr chan struct{}
127 lockw chan struct{}
128 lockrw chan struct{}
129 lockc chan struct{}
130
131 bufMax int
132 buf bytes.Buffer
133 readErr error
134 writeErr error
135 }
136
137 func newSynctestNetConnHalf(addr net.Addr) *synctestNetConnHalf {
138 h := &synctestNetConnHalf{
139 addr: addr,
140 lockw: make(chan struct{}, 1),
141 lockr: make(chan struct{}, 1),
142 lockrw: make(chan struct{}, 1),
143 lockc: make(chan struct{}, 1),
144 bufMax: math.MaxInt,
145 }
146 h.unlock()
147 return h
148 }
149
150 func (h *synctestNetConnHalf) lock() {
151 select {
152 case <-h.lockw:
153 case <-h.lockr:
154 case <-h.lockrw:
155 case <-h.lockc:
156 }
157 }
158
159 func (h *synctestNetConnHalf) unlock() {
160 canRead := h.readErr != nil || h.buf.Len() > 0
161 canWrite := h.writeErr != nil || h.bufMax > h.buf.Len()
162 switch {
163 case canRead && canWrite:
164 h.lockrw <- struct{}{}
165 case canRead:
166 h.lockr <- struct{}{}
167 case canWrite:
168 h.lockw <- struct{}{}
169 default:
170 h.lockc <- struct{}{}
171 }
172 }
173
174 func (h *synctestNetConnHalf) readWaitAndLock() error {
175 select {
176 case <-h.lockr:
177 return nil
178 case <-h.lockrw:
179 return nil
180 default:
181 }
182 ctx := h.rctx.context()
183 select {
184 case <-h.lockr:
185 return nil
186 case <-h.lockrw:
187 return nil
188 case <-ctx.Done():
189 return context.Cause(ctx)
190 }
191 }
192
193 func (h *synctestNetConnHalf) writeWaitAndLock() error {
194 select {
195 case <-h.lockw:
196 return nil
197 case <-h.lockrw:
198 return nil
199 default:
200 }
201 ctx := h.wctx.context()
202 select {
203 case <-h.lockw:
204 return nil
205 case <-h.lockrw:
206 return nil
207 case <-ctx.Done():
208 return context.Cause(ctx)
209 }
210 }
211
212 func (h *synctestNetConnHalf) peek() []byte {
213 h.lock()
214 defer h.unlock()
215 return h.buf.Bytes()
216 }
217
218 func (h *synctestNetConnHalf) isClosedByPeer() bool {
219 h.lock()
220 defer h.unlock()
221 return h.readErr != nil
222 }
223
224 func (h *synctestNetConnHalf) read(b []byte) (n int, err error) {
225 if err := h.readWaitAndLock(); err != nil {
226 return 0, err
227 }
228 defer h.unlock()
229 if h.buf.Len() == 0 && h.readErr != nil {
230 return 0, h.readErr
231 }
232 return h.buf.Read(b)
233 }
234
235 func (h *synctestNetConnHalf) setReadBufferSize(size int) {
236 h.lock()
237 defer h.unlock()
238 h.bufMax = size
239 }
240
241 func (h *synctestNetConnHalf) write(b []byte) (n int, err error) {
242 for n < len(b) {
243 nn, err := h.writePartial(b[n:])
244 n += nn
245 if err != nil {
246 return n, err
247 }
248 }
249 return n, nil
250 }
251
252 func (h *synctestNetConnHalf) writePartial(b []byte) (n int, err error) {
253 if err := h.writeWaitAndLock(); err != nil {
254 return 0, err
255 }
256 defer h.unlock()
257 if h.writeErr != nil {
258 return 0, h.writeErr
259 }
260 writeMax := h.bufMax - h.buf.Len()
261 if writeMax < len(b) {
262 b = b[:writeMax]
263 }
264 return h.buf.Write(b)
265 }
266
267 func (h *synctestNetConnHalf) setReadError(err error) {
268 h.lock()
269 defer h.unlock()
270 if h.readErr == nil {
271 h.readErr = err
272 }
273 }
274
275 func (h *synctestNetConnHalf) setWriteError(err error) {
276 h.lock()
277 defer h.unlock()
278 if h.writeErr == nil {
279 h.writeErr = err
280 }
281 }
282
283
284 type deadlineContext struct {
285 mu sync.Mutex
286 ctx context.Context
287 cancel context.CancelCauseFunc
288 timer *time.Timer
289 }
290
291
292 func (t *deadlineContext) context() context.Context {
293 t.mu.Lock()
294 defer t.mu.Unlock()
295 if t.ctx == nil {
296 t.ctx, t.cancel = context.WithCancelCause(context.Background())
297 }
298 return t.ctx
299 }
300
301
302 func (t *deadlineContext) setDeadline(deadline time.Time) {
303 t.mu.Lock()
304 defer t.mu.Unlock()
305
306
307 if t.ctx == nil || t.cancel == nil {
308 t.ctx, t.cancel = context.WithCancelCause(context.Background())
309 }
310
311 if t.timer != nil {
312 t.timer.Stop()
313 }
314 if deadline.IsZero() {
315
316 return
317 }
318 if !deadline.After(time.Now()) {
319
320 t.cancel(os.ErrDeadlineExceeded)
321 t.cancel = nil
322 return
323 }
324 if t.timer != nil {
325
326 t.timer.Reset(deadline.Sub(time.Now()))
327 return
328 }
329
330 t.timer = time.AfterFunc(deadline.Sub(time.Now()), func() {
331 t.mu.Lock()
332 defer t.mu.Unlock()
333 t.cancel(os.ErrDeadlineExceeded)
334 t.cancel = nil
335 })
336 }
337
338 type oneConnListener struct {
339 ch chan net.Conn
340 err error
341 once sync.Once
342 addr net.Addr
343 }
344
345 func newOneConnListener(conn net.Conn) net.Listener {
346 ch := make(chan net.Conn, 1)
347 ch <- conn
348 return &oneConnListener{ch: ch}
349 }
350
351 func (li *oneConnListener) Accept() (net.Conn, error) {
352 c := <-li.ch
353 if c == nil {
354 return nil, li.err
355 }
356 return c, nil
357 }
358
359 func (li *oneConnListener) Close() error {
360 li.once.Do(func() {
361 li.err = errors.New("closed")
362 close(li.ch)
363 })
364 return nil
365 }
366
367 func (li *oneConnListener) Addr() net.Addr {
368 return li.addr
369 }
370
View as plain text