Source file src/net/http/internal/http2/netconn_test.go

     1  // Copyright 2024 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package http2_test
     6  
     7  import (
     8  	"bytes"
     9  	"context"
    10  	"errors"
    11  	"io"
    12  	"math"
    13  	"net"
    14  	"net/netip"
    15  	"os"
    16  	"sync"
    17  	"time"
    18  )
    19  
    20  // synctestNetPipe creates an in-memory, full duplex network connection.
    21  // Read and write timeouts are managed by the synctest group.
    22  //
    23  // Unlike net.Pipe, the connection is not synchronous.
    24  // Writes are made to a buffer, and return immediately.
    25  // By default, the buffer size is unlimited.
    26  func synctestNetPipe() (r, w *synctestNetConn) {
    27  	s1addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:8000"))
    28  	s2addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:8001"))
    29  	s1 := newSynctestNetConnHalf(s1addr)
    30  	s2 := newSynctestNetConnHalf(s2addr)
    31  	r = &synctestNetConn{loc: s1, rem: s2}
    32  	w = &synctestNetConn{loc: s2, rem: s1}
    33  	r.peer = w
    34  	w.peer = r
    35  	return r, w
    36  }
    37  
    38  // A synctestNetConn is one endpoint of the connection created by synctestNetPipe.
    39  type synctestNetConn struct {
    40  	// local and remote connection halves.
    41  	// Each half contains a buffer.
    42  	// Reads pull from the local buffer, and writes push to the remote buffer.
    43  	loc, rem *synctestNetConnHalf
    44  
    45  	// peer is the other endpoint.
    46  	peer *synctestNetConn
    47  }
    48  
    49  // Read reads data from the connection.
    50  func (c *synctestNetConn) Read(b []byte) (n int, err error) {
    51  	return c.loc.read(b)
    52  }
    53  
    54  // Peek returns the available unread read buffer,
    55  // without consuming its contents.
    56  func (c *synctestNetConn) Peek() []byte {
    57  	return c.loc.peek()
    58  }
    59  
    60  // Write writes data to the connection.
    61  func (c *synctestNetConn) Write(b []byte) (n int, err error) {
    62  	return c.rem.write(b)
    63  }
    64  
    65  // IsClosedByPeer reports whether the peer has closed its end of the connection.
    66  func (c *synctestNetConn) IsClosedByPeer() bool {
    67  	return c.loc.isClosedByPeer()
    68  }
    69  
    70  // Close closes the connection.
    71  func (c *synctestNetConn) Close() error {
    72  	c.loc.setWriteError(errors.New("connection closed by peer"))
    73  	c.rem.setReadError(io.EOF)
    74  	return nil
    75  }
    76  
    77  // LocalAddr returns the (fake) local network address.
    78  func (c *synctestNetConn) LocalAddr() net.Addr {
    79  	return c.loc.addr
    80  }
    81  
    82  // RemoteAddr returns the (fake) remote network address.
    83  func (c *synctestNetConn) RemoteAddr() net.Addr {
    84  	return c.rem.addr
    85  }
    86  
    87  // SetDeadline sets the read and write deadlines for the connection.
    88  func (c *synctestNetConn) SetDeadline(t time.Time) error {
    89  	c.SetReadDeadline(t)
    90  	c.SetWriteDeadline(t)
    91  	return nil
    92  }
    93  
    94  // SetReadDeadline sets the read deadline for the connection.
    95  func (c *synctestNetConn) SetReadDeadline(t time.Time) error {
    96  	c.loc.rctx.setDeadline(t)
    97  	return nil
    98  }
    99  
   100  // SetWriteDeadline sets the write deadline for the connection.
   101  func (c *synctestNetConn) SetWriteDeadline(t time.Time) error {
   102  	c.rem.wctx.setDeadline(t)
   103  	return nil
   104  }
   105  
   106  // SetReadBufferSize sets the read buffer limit for the connection.
   107  // Writes by the peer will block so long as the buffer is full.
   108  func (c *synctestNetConn) SetReadBufferSize(size int) {
   109  	c.loc.setReadBufferSize(size)
   110  }
   111  
   112  // synctestNetConnHalf is one data flow in the connection created by synctestNetPipe.
   113  // Each half contains a buffer. Writes to the half push to the buffer, and reads pull from it.
   114  type synctestNetConnHalf struct {
   115  	addr net.Addr
   116  
   117  	// Read and write timeouts.
   118  	rctx, wctx deadlineContext
   119  
   120  	// A half can be readable and/or writable.
   121  	//
   122  	// These four channels act as a lock,
   123  	// and allow waiting for readability/writability.
   124  	// When the half is unlocked, exactly one channel contains a value.
   125  	// When the half is locked, all channels are empty.
   126  	lockr  chan struct{} // readable
   127  	lockw  chan struct{} // writable
   128  	lockrw chan struct{} // readable and writable
   129  	lockc  chan struct{} // neither readable nor writable
   130  
   131  	bufMax   int // maximum buffer size
   132  	buf      bytes.Buffer
   133  	readErr  error // error returned by reads
   134  	writeErr error // error returned by writes
   135  }
   136  
   137  func newSynctestNetConnHalf(addr net.Addr) *synctestNetConnHalf {
   138  	h := &synctestNetConnHalf{
   139  		addr:   addr,
   140  		lockw:  make(chan struct{}, 1),
   141  		lockr:  make(chan struct{}, 1),
   142  		lockrw: make(chan struct{}, 1),
   143  		lockc:  make(chan struct{}, 1),
   144  		bufMax: math.MaxInt, // unlimited
   145  	}
   146  	h.unlock()
   147  	return h
   148  }
   149  
   150  func (h *synctestNetConnHalf) lock() {
   151  	select {
   152  	case <-h.lockw:
   153  	case <-h.lockr:
   154  	case <-h.lockrw:
   155  	case <-h.lockc:
   156  	}
   157  }
   158  
   159  func (h *synctestNetConnHalf) unlock() {
   160  	canRead := h.readErr != nil || h.buf.Len() > 0
   161  	canWrite := h.writeErr != nil || h.bufMax > h.buf.Len()
   162  	switch {
   163  	case canRead && canWrite:
   164  		h.lockrw <- struct{}{}
   165  	case canRead:
   166  		h.lockr <- struct{}{}
   167  	case canWrite:
   168  		h.lockw <- struct{}{}
   169  	default:
   170  		h.lockc <- struct{}{}
   171  	}
   172  }
   173  
   174  func (h *synctestNetConnHalf) readWaitAndLock() error {
   175  	select {
   176  	case <-h.lockr:
   177  		return nil
   178  	case <-h.lockrw:
   179  		return nil
   180  	default:
   181  	}
   182  	ctx := h.rctx.context()
   183  	select {
   184  	case <-h.lockr:
   185  		return nil
   186  	case <-h.lockrw:
   187  		return nil
   188  	case <-ctx.Done():
   189  		return context.Cause(ctx)
   190  	}
   191  }
   192  
   193  func (h *synctestNetConnHalf) writeWaitAndLock() error {
   194  	select {
   195  	case <-h.lockw:
   196  		return nil
   197  	case <-h.lockrw:
   198  		return nil
   199  	default:
   200  	}
   201  	ctx := h.wctx.context()
   202  	select {
   203  	case <-h.lockw:
   204  		return nil
   205  	case <-h.lockrw:
   206  		return nil
   207  	case <-ctx.Done():
   208  		return context.Cause(ctx)
   209  	}
   210  }
   211  
   212  func (h *synctestNetConnHalf) peek() []byte {
   213  	h.lock()
   214  	defer h.unlock()
   215  	return h.buf.Bytes()
   216  }
   217  
   218  func (h *synctestNetConnHalf) isClosedByPeer() bool {
   219  	h.lock()
   220  	defer h.unlock()
   221  	return h.readErr != nil
   222  }
   223  
   224  func (h *synctestNetConnHalf) read(b []byte) (n int, err error) {
   225  	if err := h.readWaitAndLock(); err != nil {
   226  		return 0, err
   227  	}
   228  	defer h.unlock()
   229  	if h.buf.Len() == 0 && h.readErr != nil {
   230  		return 0, h.readErr
   231  	}
   232  	return h.buf.Read(b)
   233  }
   234  
   235  func (h *synctestNetConnHalf) setReadBufferSize(size int) {
   236  	h.lock()
   237  	defer h.unlock()
   238  	h.bufMax = size
   239  }
   240  
   241  func (h *synctestNetConnHalf) write(b []byte) (n int, err error) {
   242  	for n < len(b) {
   243  		nn, err := h.writePartial(b[n:])
   244  		n += nn
   245  		if err != nil {
   246  			return n, err
   247  		}
   248  	}
   249  	return n, nil
   250  }
   251  
   252  func (h *synctestNetConnHalf) writePartial(b []byte) (n int, err error) {
   253  	if err := h.writeWaitAndLock(); err != nil {
   254  		return 0, err
   255  	}
   256  	defer h.unlock()
   257  	if h.writeErr != nil {
   258  		return 0, h.writeErr
   259  	}
   260  	writeMax := h.bufMax - h.buf.Len()
   261  	if writeMax < len(b) {
   262  		b = b[:writeMax]
   263  	}
   264  	return h.buf.Write(b)
   265  }
   266  
   267  func (h *synctestNetConnHalf) setReadError(err error) {
   268  	h.lock()
   269  	defer h.unlock()
   270  	if h.readErr == nil {
   271  		h.readErr = err
   272  	}
   273  }
   274  
   275  func (h *synctestNetConnHalf) setWriteError(err error) {
   276  	h.lock()
   277  	defer h.unlock()
   278  	if h.writeErr == nil {
   279  		h.writeErr = err
   280  	}
   281  }
   282  
   283  // deadlineContext converts a changeable deadline (as in net.Conn.SetDeadline) into a Context.
   284  type deadlineContext struct {
   285  	mu     sync.Mutex
   286  	ctx    context.Context
   287  	cancel context.CancelCauseFunc
   288  	timer  *time.Timer
   289  }
   290  
   291  // context returns a Context which expires when the deadline does.
   292  func (t *deadlineContext) context() context.Context {
   293  	t.mu.Lock()
   294  	defer t.mu.Unlock()
   295  	if t.ctx == nil {
   296  		t.ctx, t.cancel = context.WithCancelCause(context.Background())
   297  	}
   298  	return t.ctx
   299  }
   300  
   301  // setDeadline sets the current deadline.
   302  func (t *deadlineContext) setDeadline(deadline time.Time) {
   303  	t.mu.Lock()
   304  	defer t.mu.Unlock()
   305  	// If t.ctx is non-nil and t.cancel is nil, then t.ctx was canceled
   306  	// and we should create a new one.
   307  	if t.ctx == nil || t.cancel == nil {
   308  		t.ctx, t.cancel = context.WithCancelCause(context.Background())
   309  	}
   310  	// Stop any existing deadline from expiring.
   311  	if t.timer != nil {
   312  		t.timer.Stop()
   313  	}
   314  	if deadline.IsZero() {
   315  		// No deadline.
   316  		return
   317  	}
   318  	if !deadline.After(time.Now()) {
   319  		// Deadline has already expired.
   320  		t.cancel(os.ErrDeadlineExceeded)
   321  		t.cancel = nil
   322  		return
   323  	}
   324  	if t.timer != nil {
   325  		// Reuse existing deadline timer.
   326  		t.timer.Reset(deadline.Sub(time.Now()))
   327  		return
   328  	}
   329  	// Create a new timer to cancel the context at the deadline.
   330  	t.timer = time.AfterFunc(deadline.Sub(time.Now()), func() {
   331  		t.mu.Lock()
   332  		defer t.mu.Unlock()
   333  		t.cancel(os.ErrDeadlineExceeded)
   334  		t.cancel = nil
   335  	})
   336  }
   337  
   338  type oneConnListener struct {
   339  	ch   chan net.Conn
   340  	err  error
   341  	once sync.Once
   342  	addr net.Addr
   343  }
   344  
   345  func newOneConnListener(conn net.Conn) net.Listener {
   346  	ch := make(chan net.Conn, 1)
   347  	ch <- conn
   348  	return &oneConnListener{ch: ch}
   349  }
   350  
   351  func (li *oneConnListener) Accept() (net.Conn, error) {
   352  	c := <-li.ch
   353  	if c == nil {
   354  		return nil, li.err
   355  	}
   356  	return c, nil
   357  }
   358  
   359  func (li *oneConnListener) Close() error {
   360  	li.once.Do(func() {
   361  		li.err = errors.New("closed")
   362  		close(li.ch)
   363  	})
   364  	return nil
   365  }
   366  
   367  func (li *oneConnListener) Addr() net.Addr {
   368  	return li.addr
   369  }
   370  

View as plain text