Source file src/net/http/internal/http2/transport.go

     1  // Copyright 2015 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Transport code.
     6  
     7  package http2
     8  
     9  import (
    10  	"bufio"
    11  	"bytes"
    12  	"compress/flate"
    13  	"compress/gzip"
    14  	"context"
    15  	"crypto/rand"
    16  	"crypto/tls"
    17  	"errors"
    18  	"fmt"
    19  	"io"
    20  	"io/fs"
    21  	"log"
    22  	"math"
    23  	"math/bits"
    24  	mathrand "math/rand"
    25  	"net"
    26  	"net/http/httptrace"
    27  	"net/http/internal"
    28  	"net/http/internal/httpcommon"
    29  	"net/textproto"
    30  	"slices"
    31  	"strconv"
    32  	"strings"
    33  	"sync"
    34  	"sync/atomic"
    35  	"time"
    36  
    37  	"golang.org/x/net/http/httpguts"
    38  	"golang.org/x/net/http2/hpack"
    39  	"golang.org/x/net/idna"
    40  )
    41  
    42  const (
    43  	// transportDefaultConnFlow is how many connection-level flow control
    44  	// tokens we give the server at start-up, past the default 64k.
    45  	transportDefaultConnFlow = 1 << 30
    46  
    47  	// transportDefaultStreamFlow is how many stream-level flow
    48  	// control tokens we announce to the peer, and how many bytes
    49  	// we buffer per stream.
    50  	transportDefaultStreamFlow = 4 << 20
    51  
    52  	defaultUserAgent = "Go-http-client/2.0"
    53  
    54  	// initialMaxConcurrentStreams is a connections maxConcurrentStreams until
    55  	// it's received servers initial SETTINGS frame, which corresponds with the
    56  	// spec's minimum recommended value.
    57  	initialMaxConcurrentStreams = 100
    58  
    59  	// defaultMaxConcurrentStreams is a connections default maxConcurrentStreams
    60  	// if the server doesn't include one in its initial SETTINGS frame.
    61  	defaultMaxConcurrentStreams = 1000
    62  )
    63  
    64  // Transport is an HTTP/2 Transport.
    65  //
    66  // A Transport internally caches connections to servers. It is safe
    67  // for concurrent use by multiple goroutines.
    68  type Transport struct {
    69  	// DialTLSContext specifies an optional dial function with context for
    70  	// creating TLS connections for requests.
    71  	//
    72  	// If DialTLSContext and DialTLS is nil, tls.Dial is used.
    73  	//
    74  	// If the returned net.Conn has a ConnectionState method like tls.Conn,
    75  	// it will be used to set http.Response.TLS.
    76  	DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
    77  
    78  	// DialTLS specifies an optional dial function for creating
    79  	// TLS connections for requests.
    80  	//
    81  	// If DialTLSContext and DialTLS is nil, tls.Dial is used.
    82  	//
    83  	// Deprecated: Use DialTLSContext instead, which allows the transport
    84  	// to cancel dials as soon as they are no longer needed.
    85  	// If both are set, DialTLSContext takes priority.
    86  	DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
    87  
    88  	// TLSClientConfig specifies the TLS configuration to use with
    89  	// tls.Client. If nil, the default configuration is used.
    90  	TLSClientConfig *tls.Config
    91  
    92  	// ConnPool optionally specifies an alternate connection pool to use.
    93  	// If nil, the default is used.
    94  	ConnPool ClientConnPool
    95  
    96  	// DisableCompression, if true, prevents the Transport from
    97  	// requesting compression with an "Accept-Encoding: gzip"
    98  	// request header when the Request contains no existing
    99  	// Accept-Encoding value. If the Transport requests gzip on
   100  	// its own and gets a gzipped response, it's transparently
   101  	// decoded in the Response.Body. However, if the user
   102  	// explicitly requested gzip it is not automatically
   103  	// uncompressed.
   104  	DisableCompression bool
   105  
   106  	// AllowHTTP, if true, permits HTTP/2 requests using the insecure,
   107  	// plain-text "http" scheme. Note that this does not enable h2c support.
   108  	AllowHTTP bool
   109  
   110  	// MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
   111  	// send in the initial settings frame. It is how many bytes
   112  	// of response headers are allowed. Unlike the http2 spec, zero here
   113  	// means to use a default limit (currently 10MB). If you actually
   114  	// want to advertise an unlimited value to the peer, Transport
   115  	// interprets the highest possible value here (0xffffffff or 1<<32-1)
   116  	// to mean no limit.
   117  	MaxHeaderListSize uint32
   118  
   119  	// MaxReadFrameSize is the http2 SETTINGS_MAX_FRAME_SIZE to send in the
   120  	// initial settings frame. It is the size in bytes of the largest frame
   121  	// payload that the sender is willing to receive. If 0, no setting is
   122  	// sent, and the value is provided by the peer, which should be 16384
   123  	// according to the spec:
   124  	// https://datatracker.ietf.org/doc/html/rfc7540#section-6.5.2.
   125  	// Values are bounded in the range 16k to 16M.
   126  	MaxReadFrameSize uint32
   127  
   128  	// MaxDecoderHeaderTableSize optionally specifies the http2
   129  	// SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It
   130  	// informs the remote endpoint of the maximum size of the header compression
   131  	// table used to decode header blocks, in octets. If zero, the default value
   132  	// of 4096 is used.
   133  	MaxDecoderHeaderTableSize uint32
   134  
   135  	// MaxEncoderHeaderTableSize optionally specifies an upper limit for the
   136  	// header compression table used for encoding request headers. Received
   137  	// SETTINGS_HEADER_TABLE_SIZE settings are capped at this limit. If zero,
   138  	// the default value of 4096 is used.
   139  	MaxEncoderHeaderTableSize uint32
   140  
   141  	// StrictMaxConcurrentStreams controls whether the server's
   142  	// SETTINGS_MAX_CONCURRENT_STREAMS should be respected
   143  	// globally. If false, new TCP connections are created to the
   144  	// server as needed to keep each under the per-connection
   145  	// SETTINGS_MAX_CONCURRENT_STREAMS limit. If true, the
   146  	// server's SETTINGS_MAX_CONCURRENT_STREAMS is interpreted as
   147  	// a global limit and callers of RoundTrip block when needed,
   148  	// waiting for their turn.
   149  	StrictMaxConcurrentStreams bool
   150  
   151  	// IdleConnTimeout is the maximum amount of time an idle
   152  	// (keep-alive) connection will remain idle before closing
   153  	// itself.
   154  	// Zero means no limit.
   155  	IdleConnTimeout time.Duration
   156  
   157  	// ReadIdleTimeout is the timeout after which a health check using ping
   158  	// frame will be carried out if no frame is received on the connection.
   159  	// Note that a ping response will is considered a received frame, so if
   160  	// there is no other traffic on the connection, the health check will
   161  	// be performed every ReadIdleTimeout interval.
   162  	// If zero, no health check is performed.
   163  	ReadIdleTimeout time.Duration
   164  
   165  	// PingTimeout is the timeout after which the connection will be closed
   166  	// if a response to Ping is not received.
   167  	// Defaults to 15s.
   168  	PingTimeout time.Duration
   169  
   170  	// WriteByteTimeout is the timeout after which the connection will be
   171  	// closed no data can be written to it. The timeout begins when data is
   172  	// available to write, and is extended whenever any bytes are written.
   173  	WriteByteTimeout time.Duration
   174  
   175  	// CountError, if non-nil, is called on HTTP/2 transport errors.
   176  	// It's intended to increment a metric for monitoring, such
   177  	// as an expvar or Prometheus metric.
   178  	// The errType consists of only ASCII word characters.
   179  	CountError func(errType string)
   180  
   181  	t1 TransportConfig
   182  
   183  	connPoolOnce  sync.Once
   184  	connPoolOrDef ClientConnPool // non-nil version of ConnPool
   185  
   186  	*transportTestHooks
   187  }
   188  
   189  // Hook points used for testing.
   190  // Outside of tests, t.transportTestHooks is nil and these all have minimal implementations.
   191  // Inside tests, see the testSyncHooks function docs.
   192  
   193  type transportTestHooks struct {
   194  	newclientconn func(*ClientConn)
   195  }
   196  
   197  func (t *Transport) maxHeaderListSize() uint32 {
   198  	n := t.t1.MaxResponseHeaderBytes()
   199  	if n > 0 {
   200  		n = adjustHTTP1MaxHeaderSize(n)
   201  	}
   202  	if n <= 0 {
   203  		return 10 << 20
   204  	}
   205  	if n >= 0xffffffff {
   206  		return 0
   207  	}
   208  	return uint32(n)
   209  }
   210  
   211  func (t *Transport) disableCompression() bool {
   212  	return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression())
   213  }
   214  
   215  func NewTransport(t1 TransportConfig) *Transport {
   216  	connPool := new(clientConnPool)
   217  	t2 := &Transport{
   218  		ConnPool: noDialClientConnPool{connPool},
   219  		t1:       t1,
   220  	}
   221  	connPool.t = t2
   222  	return t2
   223  }
   224  
   225  func (t *Transport) AddConn(scheme, authority string, c net.Conn) error {
   226  	connPool, ok := t.ConnPool.(noDialClientConnPool)
   227  	if !ok {
   228  		go c.Close()
   229  		return nil
   230  	}
   231  	addr := authorityAddr(scheme, authority)
   232  	used, err := connPool.addConnIfNeeded(addr, t, c)
   233  	if !used {
   234  		go c.Close()
   235  	}
   236  	return err
   237  }
   238  
   239  // unencryptedTransport is a Transport with a RoundTrip method that
   240  // always permits http:// URLs.
   241  type unencryptedTransport Transport
   242  
   243  func (t *unencryptedTransport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
   244  	return (*Transport)(t).RoundTripOpt(req, RoundTripOpt{allowHTTP: true})
   245  }
   246  
   247  func (t *Transport) connPool() ClientConnPool {
   248  	t.connPoolOnce.Do(t.initConnPool)
   249  	return t.connPoolOrDef
   250  }
   251  
   252  func (t *Transport) initConnPool() {
   253  	if t.ConnPool != nil {
   254  		t.connPoolOrDef = t.ConnPool
   255  	} else {
   256  		t.connPoolOrDef = &clientConnPool{t: t}
   257  	}
   258  }
   259  
   260  // ClientConn is the state of a single HTTP/2 client connection to an
   261  // HTTP/2 server.
   262  type ClientConn struct {
   263  	t             *Transport
   264  	tconn         net.Conn             // usually *tls.Conn, except specialized impls
   265  	tlsState      *tls.ConnectionState // nil only for specialized impls
   266  	atomicReused  uint32               // whether conn is being reused; atomic
   267  	singleUse     bool                 // whether being used for a single http.Request
   268  	getConnCalled bool                 // used by clientConnPool
   269  
   270  	// readLoop goroutine fields:
   271  	readerDone chan struct{} // closed on error
   272  	readerErr  error         // set before readerDone is closed
   273  
   274  	idleTimeout time.Duration // or 0 for never
   275  	idleTimer   *time.Timer
   276  
   277  	mu               sync.Mutex // guards following
   278  	cond             *sync.Cond // hold mu; broadcast on flow/closed changes
   279  	flow             outflow    // our conn-level flow control quota (cs.outflow is per stream)
   280  	inflow           inflow     // peer's conn-level flow control
   281  	doNotReuse       bool       // whether conn is marked to not be reused for any future requests
   282  	closing          bool
   283  	closed           bool
   284  	closedOnIdle     bool                     // true if conn was closed for idleness
   285  	seenSettings     bool                     // true if we've seen a settings frame, false otherwise
   286  	seenSettingsChan chan struct{}            // closed when seenSettings is true or frame reading fails
   287  	wantSettingsAck  bool                     // we sent a SETTINGS frame and haven't heard back
   288  	goAway           *GoAwayFrame             // if non-nil, the GoAwayFrame we received
   289  	goAwayDebug      string                   // goAway frame's debug data, retained as a string
   290  	streams          map[uint32]*clientStream // client-initiated
   291  	streamsReserved  int                      // incr by ReserveNewRequest; decr on RoundTrip
   292  	nextStreamID     uint32
   293  	pendingRequests  int                       // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
   294  	pings            map[[8]byte]chan struct{} // in flight ping data to notification channel
   295  	br               *bufio.Reader
   296  	lastActive       time.Time
   297  	lastIdle         time.Time // time last idle
   298  	// Settings from peer: (also guarded by wmu)
   299  	maxFrameSize                uint32
   300  	maxConcurrentStreams        uint32
   301  	peerMaxHeaderListSize       uint64
   302  	peerMaxHeaderTableSize      uint32
   303  	initialWindowSize           uint32
   304  	initialStreamRecvWindowSize int32
   305  	readIdleTimeout             time.Duration
   306  	pingTimeout                 time.Duration
   307  	extendedConnectAllowed      bool
   308  	strictMaxConcurrentStreams  bool
   309  
   310  	// rstStreamPingsBlocked works around an unfortunate gRPC behavior.
   311  	// gRPC strictly limits the number of PING frames that it will receive.
   312  	// The default is two pings per two hours, but the limit resets every time
   313  	// the gRPC endpoint sends a HEADERS or DATA frame. See golang/go#70575.
   314  	//
   315  	// rstStreamPingsBlocked is set after receiving a response to a PING frame
   316  	// bundled with an RST_STREAM (see pendingResets below), and cleared after
   317  	// receiving a HEADERS or DATA frame.
   318  	rstStreamPingsBlocked bool
   319  
   320  	// pendingResets is the number of RST_STREAM frames we have sent to the peer,
   321  	// without confirming that the peer has received them. When we send a RST_STREAM,
   322  	// we bundle it with a PING frame, unless a PING is already in flight. We count
   323  	// the reset stream against the connection's concurrency limit until we get
   324  	// a PING response. This limits the number of requests we'll try to send to a
   325  	// completely unresponsive connection.
   326  	pendingResets int
   327  
   328  	// readBeforeStreamID is the smallest stream ID that has not been followed by
   329  	// a frame read from the peer. We use this to determine when a request may
   330  	// have been sent to a completely unresponsive connection:
   331  	// If the request ID is less than readBeforeStreamID, then we have had some
   332  	// indication of life on the connection since sending the request.
   333  	readBeforeStreamID uint32
   334  
   335  	// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
   336  	// Write to reqHeaderMu to lock it, read from it to unlock.
   337  	// Lock reqmu BEFORE mu or wmu.
   338  	reqHeaderMu chan struct{}
   339  
   340  	// internalStateHook reports state changes back to the net/http.ClientConn.
   341  	// Note that this is different from the user state hook registered by
   342  	// net/http.ClientConn.SetStateHook: The internal hook calls ClientConn,
   343  	// which calls the user hook.
   344  	internalStateHook func()
   345  
   346  	// wmu is held while writing.
   347  	// Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
   348  	// Only acquire both at the same time when changing peer settings.
   349  	wmu  sync.Mutex
   350  	bw   *bufio.Writer
   351  	fr   *Framer
   352  	werr error        // first write error that has occurred
   353  	hbuf bytes.Buffer // HPACK encoder writes into this
   354  	henc *hpack.Encoder
   355  }
   356  
   357  // clientStream is the state for a single HTTP/2 stream. One of these
   358  // is created for each Transport.RoundTrip call.
   359  type clientStream struct {
   360  	cc *ClientConn
   361  
   362  	// Fields of Request that we may access even after the response body is closed.
   363  	ctx       context.Context
   364  	reqCancel <-chan struct{}
   365  
   366  	trace         *httptrace.ClientTrace // or nil
   367  	ID            uint32
   368  	bufPipe       pipe // buffered pipe with the flow-controlled response payload
   369  	requestedGzip bool
   370  	isHead        bool
   371  
   372  	abortOnce sync.Once
   373  	abort     chan struct{} // closed to signal stream should end immediately
   374  	abortErr  error         // set if abort is closed
   375  
   376  	peerClosed chan struct{} // closed when the peer sends an END_STREAM flag
   377  	donec      chan struct{} // closed after the stream is in the closed state
   378  	on100      chan struct{} // buffered; written to if a 100 is received
   379  
   380  	respHeaderRecv chan struct{}   // closed when headers are received
   381  	res            *ClientResponse // set if respHeaderRecv is closed
   382  
   383  	flow        outflow // guarded by cc.mu
   384  	inflow      inflow  // guarded by cc.mu
   385  	bytesRemain int64   // -1 means unknown; owned by transportResponseBody.Read
   386  	readErr     error   // sticky read error; owned by transportResponseBody.Read
   387  
   388  	reqBody              io.ReadCloser
   389  	reqBodyContentLength int64         // -1 means unknown
   390  	reqBodyClosed        chan struct{} // guarded by cc.mu; non-nil on Close, closed when done
   391  
   392  	// owned by writeRequest:
   393  	sentEndStream bool // sent an END_STREAM flag to the peer
   394  	sentHeaders   bool
   395  
   396  	// owned by clientConnReadLoop:
   397  	firstByte       bool  // got the first response byte
   398  	pastHeaders     bool  // got first MetaHeadersFrame (actual headers)
   399  	pastTrailers    bool  // got optional second MetaHeadersFrame (trailers)
   400  	readClosed      bool  // peer sent an END_STREAM flag
   401  	readAborted     bool  // read loop reset the stream
   402  	totalHeaderSize int64 // total size of 1xx headers seen
   403  
   404  	trailer    Header  // accumulated trailers
   405  	resTrailer *Header // client's Response.Trailer
   406  
   407  	staticResp ClientResponse
   408  }
   409  
   410  var got1xxFuncForTests func(int, textproto.MIMEHeader) error
   411  
   412  // get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
   413  // if any. It returns nil if not set or if the Go version is too old.
   414  func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
   415  	if fn := got1xxFuncForTests; fn != nil {
   416  		return fn
   417  	}
   418  	return traceGot1xxResponseFunc(cs.trace)
   419  }
   420  
   421  func (cs *clientStream) abortStream(err error) {
   422  	cs.cc.mu.Lock()
   423  	defer cs.cc.mu.Unlock()
   424  	cs.abortStreamLocked(err)
   425  }
   426  
   427  func (cs *clientStream) abortStreamLocked(err error) {
   428  	cs.abortOnce.Do(func() {
   429  		cs.abortErr = err
   430  		close(cs.abort)
   431  	})
   432  	if cs.reqBody != nil {
   433  		cs.closeReqBodyLocked()
   434  	}
   435  	// TODO(dneil): Clean up tests where cs.cc.cond is nil.
   436  	if cs.cc.cond != nil {
   437  		// Wake up writeRequestBody if it is waiting on flow control.
   438  		cs.cc.cond.Broadcast()
   439  	}
   440  }
   441  
   442  func (cs *clientStream) abortRequestBodyWrite() {
   443  	cc := cs.cc
   444  	cc.mu.Lock()
   445  	defer cc.mu.Unlock()
   446  	if cs.reqBody != nil && cs.reqBodyClosed == nil {
   447  		cs.closeReqBodyLocked()
   448  		cc.cond.Broadcast()
   449  	}
   450  }
   451  
   452  func (cs *clientStream) closeReqBodyLocked() {
   453  	if cs.reqBodyClosed != nil {
   454  		return
   455  	}
   456  	cs.reqBodyClosed = make(chan struct{})
   457  	reqBodyClosed := cs.reqBodyClosed
   458  	go func() {
   459  		cs.reqBody.Close()
   460  		close(reqBodyClosed)
   461  	}()
   462  }
   463  
   464  type stickyErrWriter struct {
   465  	conn    net.Conn
   466  	timeout time.Duration
   467  	err     *error
   468  }
   469  
   470  func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
   471  	if *sew.err != nil {
   472  		return 0, *sew.err
   473  	}
   474  	n, err = writeWithByteTimeout(sew.conn, sew.timeout, p)
   475  	*sew.err = err
   476  	return n, err
   477  }
   478  
   479  // noCachedConnError is the concrete type of ErrNoCachedConn, which
   480  // needs to be detected by net/http regardless of whether it's its
   481  // bundled version (in h2_bundle.go with a rewritten type name) or
   482  // from a user's x/net/http2. As such, as it has a unique method name
   483  // (IsHTTP2NoCachedConnError) that net/http sniffs for via func
   484  // isNoCachedConnError.
   485  type noCachedConnError struct{}
   486  
   487  func (noCachedConnError) IsHTTP2NoCachedConnError() {}
   488  func (noCachedConnError) Error() string             { return "http2: no cached connection was available" }
   489  
   490  // isNoCachedConnError reports whether err is of type noCachedConnError
   491  // or its equivalent renamed type in net/http2's h2_bundle.go. Both types
   492  // may coexist in the same running program.
   493  func isNoCachedConnError(err error) bool {
   494  	_, ok := err.(interface{ IsHTTP2NoCachedConnError() })
   495  	return ok
   496  }
   497  
   498  var ErrNoCachedConn error = noCachedConnError{}
   499  
   500  // RoundTripOpt are options for the Transport.RoundTripOpt method.
   501  type RoundTripOpt struct {
   502  	// OnlyCachedConn controls whether RoundTripOpt may
   503  	// create a new TCP connection. If set true and
   504  	// no cached connection is available, RoundTripOpt
   505  	// will return ErrNoCachedConn.
   506  	OnlyCachedConn bool
   507  
   508  	allowHTTP bool // allow http:// URLs
   509  }
   510  
   511  func (t *Transport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
   512  	return t.RoundTripOpt(req, RoundTripOpt{})
   513  }
   514  
   515  // authorityAddr returns a given authority (a host/IP, or host:port / ip:port)
   516  // and returns a host:port. The port 443 is added if needed.
   517  func authorityAddr(scheme string, authority string) (addr string) {
   518  	host, port, err := net.SplitHostPort(authority)
   519  	if err != nil { // authority didn't have a port
   520  		host = authority
   521  		port = ""
   522  	}
   523  	if port == "" { // authority's port was empty
   524  		port = "443"
   525  		if scheme == "http" {
   526  			port = "80"
   527  		}
   528  	}
   529  	if a, err := idna.ToASCII(host); err == nil {
   530  		host = a
   531  	}
   532  	// IPv6 address literal, without a port:
   533  	if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
   534  		return host + ":" + port
   535  	}
   536  	return net.JoinHostPort(host, port)
   537  }
   538  
   539  // RoundTripOpt is like RoundTrip, but takes options.
   540  func (t *Transport) RoundTripOpt(req *ClientRequest, opt RoundTripOpt) (*ClientResponse, error) {
   541  	switch req.URL.Scheme {
   542  	case "https":
   543  		// Always okay.
   544  	case "http":
   545  		if !t.AllowHTTP && !opt.allowHTTP {
   546  			return nil, errors.New("http2: unencrypted HTTP/2 not enabled")
   547  		}
   548  	default:
   549  		return nil, errors.New("http2: unsupported scheme")
   550  	}
   551  
   552  	addr := authorityAddr(req.URL.Scheme, req.URL.Host)
   553  	for retry := 0; ; retry++ {
   554  		cc, err := t.connPool().GetClientConn(req, addr)
   555  		if err != nil {
   556  			t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
   557  			return nil, err
   558  		}
   559  		reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
   560  		traceGotConn(req, cc, reused)
   561  		res, err := cc.RoundTrip(req)
   562  		if err != nil && retry <= 6 {
   563  			roundTripErr := err
   564  			if req, err = shouldRetryRequest(req, err); err == nil {
   565  				// After the first retry, do exponential backoff with 10% jitter.
   566  				if retry == 0 {
   567  					t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
   568  					continue
   569  				}
   570  				backoff := float64(uint(1) << (uint(retry) - 1))
   571  				backoff += backoff * (0.1 * mathrand.Float64())
   572  				d := time.Second * time.Duration(backoff)
   573  				tm := time.NewTimer(d)
   574  				select {
   575  				case <-tm.C:
   576  					t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
   577  					continue
   578  				case <-req.Context.Done():
   579  					tm.Stop()
   580  					err = req.Context.Err()
   581  				}
   582  			}
   583  		}
   584  		if err == errClientConnNotEstablished {
   585  			// This ClientConn was created recently,
   586  			// this is the first request to use it,
   587  			// and the connection is closed and not usable.
   588  			//
   589  			// In this state, cc.idleTimer will remove the conn from the pool
   590  			// when it fires. Stop the timer and remove it here so future requests
   591  			// won't try to use this connection.
   592  			//
   593  			// If the timer has already fired and we're racing it, the redundant
   594  			// call to MarkDead is harmless.
   595  			if cc.idleTimer != nil {
   596  				cc.idleTimer.Stop()
   597  			}
   598  			t.connPool().MarkDead(cc)
   599  		}
   600  		if err != nil {
   601  			t.vlogf("RoundTrip failure: %v", err)
   602  			return nil, err
   603  		}
   604  		return res, nil
   605  	}
   606  }
   607  
   608  func (t *Transport) IdleConnStrsForTesting() []string {
   609  	pool, ok := t.connPool().(noDialClientConnPool)
   610  	if !ok {
   611  		return nil
   612  	}
   613  
   614  	var ret []string
   615  	pool.mu.Lock()
   616  	defer pool.mu.Unlock()
   617  	for k, ccs := range pool.conns {
   618  		for _, cc := range ccs {
   619  			if cc.idleState().canTakeNewRequest {
   620  				ret = append(ret, k)
   621  			}
   622  		}
   623  	}
   624  	slices.Sort(ret)
   625  	return ret
   626  }
   627  
   628  // CloseIdleConnections closes any connections which were previously
   629  // connected from previous requests but are now sitting idle.
   630  // It does not interrupt any connections currently in use.
   631  func (t *Transport) CloseIdleConnections() {
   632  	if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
   633  		cp.closeIdleConnections()
   634  	}
   635  }
   636  
   637  var (
   638  	errClientConnClosed         = errors.New("http2: client conn is closed")
   639  	errClientConnUnusable       = errors.New("http2: client conn not usable")
   640  	errClientConnNotEstablished = errors.New("http2: client conn could not be established")
   641  	errClientConnGotGoAway      = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
   642  	errClientConnForceClosed    = errors.New("http2: client connection force closed via ClientConn.Close")
   643  )
   644  
   645  // shouldRetryRequest is called by RoundTrip when a request fails to get
   646  // response headers. It is always called with a non-nil error.
   647  // It returns either a request to retry (either the same request, or a
   648  // modified clone), or an error if the request can't be replayed.
   649  func shouldRetryRequest(req *ClientRequest, err error) (*ClientRequest, error) {
   650  	if !canRetryError(err) {
   651  		return nil, err
   652  	}
   653  	// If the Body is nil (or http.NoBody), it's safe to reuse
   654  	// this request and its Body.
   655  	if req.Body == nil || req.Body == NoBody {
   656  		return req, nil
   657  	}
   658  
   659  	// If the request body can be reset back to its original
   660  	// state via the optional req.GetBody, do that.
   661  	if req.GetBody != nil {
   662  		body, err := req.GetBody()
   663  		if err != nil {
   664  			return nil, err
   665  		}
   666  		newReq := req.Clone()
   667  		newReq.Body = body
   668  		return newReq, nil
   669  	}
   670  
   671  	// The Request.Body can't reset back to the beginning, but we
   672  	// don't seem to have started to read from it yet, so reuse
   673  	// the request directly.
   674  	if err == errClientConnUnusable {
   675  		return req, nil
   676  	}
   677  
   678  	return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
   679  }
   680  
   681  func canRetryError(err error) bool {
   682  	if err == errClientConnUnusable || err == errClientConnGotGoAway {
   683  		return true
   684  	}
   685  	if se, ok := err.(StreamError); ok {
   686  		return se.Code == ErrCodeRefusedStream
   687  	}
   688  	return false
   689  }
   690  
   691  func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
   692  	if t.transportTestHooks != nil {
   693  		return t.newClientConn(nil, singleUse, nil)
   694  	}
   695  	host, _, err := net.SplitHostPort(addr)
   696  	if err != nil {
   697  		return nil, err
   698  	}
   699  	tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
   700  	if err != nil {
   701  		return nil, err
   702  	}
   703  	return t.newClientConn(tconn, singleUse, nil)
   704  }
   705  
   706  func (t *Transport) newTLSConfig(host string) *tls.Config {
   707  	cfg := new(tls.Config)
   708  	if t.TLSClientConfig != nil {
   709  		*cfg = *t.TLSClientConfig.Clone()
   710  	}
   711  	if !strSliceContains(cfg.NextProtos, NextProtoTLS) {
   712  		cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
   713  	}
   714  	if cfg.ServerName == "" {
   715  		cfg.ServerName = host
   716  	}
   717  	return cfg
   718  }
   719  
   720  func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
   721  	if t.DialTLSContext != nil {
   722  		return t.DialTLSContext(ctx, network, addr, tlsCfg)
   723  	} else if t.DialTLS != nil {
   724  		return t.DialTLS(network, addr, tlsCfg)
   725  	}
   726  
   727  	tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
   728  	if err != nil {
   729  		return nil, err
   730  	}
   731  	state := tlsCn.ConnectionState()
   732  	if p := state.NegotiatedProtocol; p != NextProtoTLS {
   733  		return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
   734  	}
   735  	if !state.NegotiatedProtocolIsMutual {
   736  		return nil, errors.New("http2: could not negotiate protocol mutually")
   737  	}
   738  	return tlsCn, nil
   739  }
   740  
   741  // disableKeepAlives reports whether connections should be closed as
   742  // soon as possible after handling the first request.
   743  func (t *Transport) disableKeepAlives() bool {
   744  	return t.t1 != nil && t.t1.DisableKeepAlives()
   745  }
   746  
   747  func (t *Transport) expectContinueTimeout() time.Duration {
   748  	if t.t1 == nil {
   749  		return 0
   750  	}
   751  	return t.t1.ExpectContinueTimeout()
   752  }
   753  
   754  func (t *Transport) NewClientConn(c net.Conn, internalStateHook func()) (NetHTTPClientConn, error) {
   755  	cc, err := t.newClientConn(c, t.disableKeepAlives(), internalStateHook)
   756  	if err != nil {
   757  		return NetHTTPClientConn{}, err
   758  	}
   759  
   760  	// RoundTrip should block when the conn is at its concurrency limit,
   761  	// not return an error. Setting strictMaxConcurrentStreams enables this.
   762  	cc.strictMaxConcurrentStreams = true
   763  
   764  	return NetHTTPClientConn{cc}, nil
   765  }
   766  
   767  func (t *Transport) newClientConn(c net.Conn, singleUse bool, internalStateHook func()) (*ClientConn, error) {
   768  	conf := configFromTransport(t)
   769  	cc := &ClientConn{
   770  		t:                           t,
   771  		tconn:                       c,
   772  		readerDone:                  make(chan struct{}),
   773  		nextStreamID:                1,
   774  		maxFrameSize:                16 << 10, // spec default
   775  		initialWindowSize:           65535,    // spec default
   776  		initialStreamRecvWindowSize: int32(conf.MaxReceiveBufferPerStream),
   777  		maxConcurrentStreams:        initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
   778  		strictMaxConcurrentStreams:  conf.StrictMaxConcurrentRequests,
   779  		peerMaxHeaderListSize:       0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
   780  		streams:                     make(map[uint32]*clientStream),
   781  		singleUse:                   singleUse,
   782  		seenSettingsChan:            make(chan struct{}),
   783  		wantSettingsAck:             true,
   784  		readIdleTimeout:             conf.SendPingTimeout,
   785  		pingTimeout:                 conf.PingTimeout,
   786  		pings:                       make(map[[8]byte]chan struct{}),
   787  		reqHeaderMu:                 make(chan struct{}, 1),
   788  		lastActive:                  time.Now(),
   789  		internalStateHook:           internalStateHook,
   790  	}
   791  	if t.transportTestHooks != nil {
   792  		t.transportTestHooks.newclientconn(cc)
   793  		c = cc.tconn
   794  	}
   795  	if VerboseLogs {
   796  		t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
   797  	}
   798  
   799  	cc.cond = sync.NewCond(&cc.mu)
   800  	cc.flow.add(int32(initialWindowSize))
   801  
   802  	// TODO: adjust this writer size to account for frame size +
   803  	// MTU + crypto/tls record padding.
   804  	cc.bw = bufio.NewWriter(stickyErrWriter{
   805  		conn:    c,
   806  		timeout: conf.WriteByteTimeout,
   807  		err:     &cc.werr,
   808  	})
   809  	cc.br = bufio.NewReader(c)
   810  	cc.fr = NewFramer(cc.bw, cc.br)
   811  	cc.fr.SetMaxReadFrameSize(uint32(conf.MaxReadFrameSize))
   812  	if t.CountError != nil {
   813  		cc.fr.countError = t.CountError
   814  	}
   815  	maxHeaderTableSize := uint32(conf.MaxDecoderHeaderTableSize)
   816  	cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
   817  	cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
   818  
   819  	cc.henc = hpack.NewEncoder(&cc.hbuf)
   820  	cc.henc.SetMaxDynamicTableSizeLimit(uint32(conf.MaxEncoderHeaderTableSize))
   821  	cc.peerMaxHeaderTableSize = initialHeaderTableSize
   822  
   823  	if cs, ok := c.(connectionStater); ok {
   824  		state := cs.ConnectionState()
   825  		cc.tlsState = &state
   826  	}
   827  
   828  	initialSettings := []Setting{
   829  		{ID: SettingEnablePush, Val: 0},
   830  		{ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
   831  	}
   832  	initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: uint32(conf.MaxReadFrameSize)})
   833  	if max := t.maxHeaderListSize(); max != 0 {
   834  		initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
   835  	}
   836  	if maxHeaderTableSize != initialHeaderTableSize {
   837  		initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
   838  	}
   839  
   840  	cc.bw.Write(clientPreface)
   841  	cc.fr.WriteSettings(initialSettings...)
   842  	cc.fr.WriteWindowUpdate(0, uint32(conf.MaxReceiveBufferPerConnection))
   843  	cc.inflow.init(int32(conf.MaxReceiveBufferPerConnection) + initialWindowSize)
   844  	cc.bw.Flush()
   845  	if cc.werr != nil {
   846  		cc.Close()
   847  		return nil, cc.werr
   848  	}
   849  
   850  	// Start the idle timer after the connection is fully initialized.
   851  	if d := t.idleConnTimeout(); d != 0 {
   852  		cc.idleTimeout = d
   853  		cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
   854  	}
   855  
   856  	go cc.readLoop()
   857  	return cc, nil
   858  }
   859  
   860  func (cc *ClientConn) healthCheck() {
   861  	pingTimeout := cc.pingTimeout
   862  	// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
   863  	// trigger the healthCheck again if there is no frame received.
   864  	ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
   865  	defer cancel()
   866  	cc.vlogf("http2: Transport sending health check")
   867  	err := cc.Ping(ctx)
   868  	if err != nil {
   869  		cc.vlogf("http2: Transport health check failure: %v", err)
   870  		cc.closeForLostPing()
   871  	} else {
   872  		cc.vlogf("http2: Transport health check success")
   873  	}
   874  }
   875  
   876  // SetDoNotReuse marks cc as not reusable for future HTTP requests.
   877  func (cc *ClientConn) SetDoNotReuse() {
   878  	cc.mu.Lock()
   879  	defer cc.mu.Unlock()
   880  	cc.doNotReuse = true
   881  }
   882  
   883  func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
   884  	cc.mu.Lock()
   885  	defer cc.mu.Unlock()
   886  
   887  	old := cc.goAway
   888  	cc.goAway = f
   889  
   890  	// Merge the previous and current GoAway error frames.
   891  	if cc.goAwayDebug == "" {
   892  		cc.goAwayDebug = string(f.DebugData())
   893  	}
   894  	if old != nil && old.ErrCode != ErrCodeNo {
   895  		cc.goAway.ErrCode = old.ErrCode
   896  	}
   897  	last := f.LastStreamID
   898  	for streamID, cs := range cc.streams {
   899  		if streamID <= last {
   900  			// The server's GOAWAY indicates that it received this stream.
   901  			// It will either finish processing it, or close the connection
   902  			// without doing so. Either way, leave the stream alone for now.
   903  			continue
   904  		}
   905  		if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
   906  			// Don't retry the first stream on a connection if we get a non-NO error.
   907  			// If the server is sending an error on a new connection,
   908  			// retrying the request on a new one probably isn't going to work.
   909  			cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
   910  		} else {
   911  			// Aborting the stream with errClentConnGotGoAway indicates that
   912  			// the request should be retried on a new connection.
   913  			cs.abortStreamLocked(errClientConnGotGoAway)
   914  		}
   915  	}
   916  }
   917  
   918  // CanTakeNewRequest reports whether the connection can take a new request,
   919  // meaning it has not been closed or received or sent a GOAWAY.
   920  //
   921  // If the caller is going to immediately make a new request on this
   922  // connection, use ReserveNewRequest instead.
   923  func (cc *ClientConn) CanTakeNewRequest() bool {
   924  	cc.mu.Lock()
   925  	defer cc.mu.Unlock()
   926  	return cc.canTakeNewRequestLocked()
   927  }
   928  
   929  // ReserveNewRequest is like CanTakeNewRequest but also reserves a
   930  // concurrent stream in cc. The reservation is decremented on the
   931  // next call to RoundTrip.
   932  func (cc *ClientConn) ReserveNewRequest() bool {
   933  	cc.mu.Lock()
   934  	defer cc.mu.Unlock()
   935  	if st := cc.idleStateLocked(); !st.canTakeNewRequest {
   936  		return false
   937  	}
   938  	cc.streamsReserved++
   939  	return true
   940  }
   941  
   942  // ClientConnState describes the state of a ClientConn.
   943  type ClientConnState struct {
   944  	// Closed is whether the connection is closed.
   945  	Closed bool
   946  
   947  	// Closing is whether the connection is in the process of
   948  	// closing. It may be closing due to shutdown, being a
   949  	// single-use connection, being marked as DoNotReuse, or
   950  	// having received a GOAWAY frame.
   951  	Closing bool
   952  
   953  	// StreamsActive is how many streams are active.
   954  	StreamsActive int
   955  
   956  	// StreamsReserved is how many streams have been reserved via
   957  	// ClientConn.ReserveNewRequest.
   958  	StreamsReserved int
   959  
   960  	// StreamsPending is how many requests have been sent in excess
   961  	// of the peer's advertised MaxConcurrentStreams setting and
   962  	// are waiting for other streams to complete.
   963  	StreamsPending int
   964  
   965  	// MaxConcurrentStreams is how many concurrent streams the
   966  	// peer advertised as acceptable. Zero means no SETTINGS
   967  	// frame has been received yet.
   968  	MaxConcurrentStreams uint32
   969  
   970  	// LastIdle, if non-zero, is when the connection last
   971  	// transitioned to idle state.
   972  	LastIdle time.Time
   973  }
   974  
   975  // State returns a snapshot of cc's state.
   976  func (cc *ClientConn) State() ClientConnState {
   977  	cc.wmu.Lock()
   978  	maxConcurrent := cc.maxConcurrentStreams
   979  	if !cc.seenSettings {
   980  		maxConcurrent = 0
   981  	}
   982  	cc.wmu.Unlock()
   983  
   984  	cc.mu.Lock()
   985  	defer cc.mu.Unlock()
   986  	return ClientConnState{
   987  		Closed:               cc.closed,
   988  		Closing:              cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
   989  		StreamsActive:        len(cc.streams) + cc.pendingResets,
   990  		StreamsReserved:      cc.streamsReserved,
   991  		StreamsPending:       cc.pendingRequests,
   992  		LastIdle:             cc.lastIdle,
   993  		MaxConcurrentStreams: maxConcurrent,
   994  	}
   995  }
   996  
   997  // clientConnIdleState describes the suitability of a client
   998  // connection to initiate a new RoundTrip request.
   999  type clientConnIdleState struct {
  1000  	canTakeNewRequest bool
  1001  }
  1002  
  1003  func (cc *ClientConn) idleState() clientConnIdleState {
  1004  	cc.mu.Lock()
  1005  	defer cc.mu.Unlock()
  1006  	return cc.idleStateLocked()
  1007  }
  1008  
  1009  func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
  1010  	if cc.singleUse && cc.nextStreamID > 1 {
  1011  		return
  1012  	}
  1013  	var maxConcurrentOkay bool
  1014  	if cc.strictMaxConcurrentStreams {
  1015  		// We'll tell the caller we can take a new request to
  1016  		// prevent the caller from dialing a new TCP
  1017  		// connection, but then we'll block later before
  1018  		// writing it.
  1019  		maxConcurrentOkay = true
  1020  	} else {
  1021  		// We can take a new request if the total of
  1022  		//   - active streams;
  1023  		//   - reservation slots for new streams; and
  1024  		//   - streams for which we have sent a RST_STREAM and a PING,
  1025  		//     but received no subsequent frame
  1026  		// is less than the concurrency limit.
  1027  		maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
  1028  	}
  1029  
  1030  	st.canTakeNewRequest = maxConcurrentOkay && cc.isUsableLocked()
  1031  
  1032  	// If this connection has never been used for a request and is closed,
  1033  	// then let it take a request (which will fail).
  1034  	// If the conn was closed for idleness, we're racing the idle timer;
  1035  	// don't try to use the conn. (Issue #70515.)
  1036  	//
  1037  	// This avoids a situation where an error early in a connection's lifetime
  1038  	// goes unreported.
  1039  	if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed && !cc.closedOnIdle {
  1040  		st.canTakeNewRequest = true
  1041  	}
  1042  
  1043  	return
  1044  }
  1045  
  1046  func (cc *ClientConn) isUsableLocked() bool {
  1047  	return cc.goAway == nil &&
  1048  		!cc.closed &&
  1049  		!cc.closing &&
  1050  		!cc.doNotReuse &&
  1051  		int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
  1052  		!cc.tooIdleLocked()
  1053  }
  1054  
  1055  // canReserveLocked reports whether a net/http.ClientConn can reserve a slot on this conn.
  1056  //
  1057  // This follows slightly different rules than clientConnIdleState.canTakeNewRequest.
  1058  // We only permit reservations up to the conn's concurrency limit.
  1059  // This differs from ClientConn.ReserveNewRequest, which permits reservations
  1060  // past the limit when StrictMaxConcurrentStreams is set.
  1061  func (cc *ClientConn) canReserveLocked() bool {
  1062  	if cc.currentRequestCountLocked() >= int(cc.maxConcurrentStreams) {
  1063  		return false
  1064  	}
  1065  	if !cc.isUsableLocked() {
  1066  		return false
  1067  	}
  1068  	return true
  1069  }
  1070  
  1071  // currentRequestCountLocked reports the number of concurrency slots currently in use,
  1072  // including active streams, reserved slots, and reset streams waiting for acknowledgement.
  1073  func (cc *ClientConn) currentRequestCountLocked() int {
  1074  	return len(cc.streams) + cc.streamsReserved + cc.pendingResets
  1075  }
  1076  
  1077  func (cc *ClientConn) canTakeNewRequestLocked() bool {
  1078  	st := cc.idleStateLocked()
  1079  	return st.canTakeNewRequest
  1080  }
  1081  
  1082  // availableLocked reports the number of concurrency slots available.
  1083  func (cc *ClientConn) availableLocked() int {
  1084  	if !cc.canTakeNewRequestLocked() {
  1085  		return 0
  1086  	}
  1087  	return max(0, int(cc.maxConcurrentStreams)-cc.currentRequestCountLocked())
  1088  }
  1089  
  1090  // tooIdleLocked reports whether this connection has been been sitting idle
  1091  // for too much wall time.
  1092  func (cc *ClientConn) tooIdleLocked() bool {
  1093  	// The Round(0) strips the monontonic clock reading so the
  1094  	// times are compared based on their wall time. We don't want
  1095  	// to reuse a connection that's been sitting idle during
  1096  	// VM/laptop suspend if monotonic time was also frozen.
  1097  	return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
  1098  }
  1099  
  1100  // onIdleTimeout is called from a time.AfterFunc goroutine. It will
  1101  // only be called when we're idle, but because we're coming from a new
  1102  // goroutine, there could be a new request coming in at the same time,
  1103  // so this simply calls the synchronized closeIfIdle to shut down this
  1104  // connection. The timer could just call closeIfIdle, but this is more
  1105  // clear.
  1106  func (cc *ClientConn) onIdleTimeout() {
  1107  	cc.closeIfIdle()
  1108  }
  1109  
  1110  func (cc *ClientConn) closeConn() {
  1111  	t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
  1112  	defer t.Stop()
  1113  	cc.tconn.Close()
  1114  	cc.maybeCallStateHook()
  1115  }
  1116  
  1117  // A tls.Conn.Close can hang for a long time if the peer is unresponsive.
  1118  // Try to shut it down more aggressively.
  1119  func (cc *ClientConn) forceCloseConn() {
  1120  	tc, ok := cc.tconn.(*tls.Conn)
  1121  	if !ok {
  1122  		return
  1123  	}
  1124  	if nc := tc.NetConn(); nc != nil {
  1125  		nc.Close()
  1126  	}
  1127  }
  1128  
  1129  func (cc *ClientConn) closeIfIdle() {
  1130  	cc.mu.Lock()
  1131  	if len(cc.streams) > 0 || cc.streamsReserved > 0 {
  1132  		cc.mu.Unlock()
  1133  		return
  1134  	}
  1135  	cc.closed = true
  1136  	cc.closedOnIdle = true
  1137  	nextID := cc.nextStreamID
  1138  	// TODO: do clients send GOAWAY too? maybe? Just Close:
  1139  	cc.mu.Unlock()
  1140  
  1141  	if VerboseLogs {
  1142  		cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
  1143  	}
  1144  	cc.closeConn()
  1145  }
  1146  
  1147  func (cc *ClientConn) isDoNotReuseAndIdle() bool {
  1148  	cc.mu.Lock()
  1149  	defer cc.mu.Unlock()
  1150  	return cc.doNotReuse && len(cc.streams) == 0
  1151  }
  1152  
  1153  var shutdownEnterWaitStateHook = func() {}
  1154  
  1155  // Shutdown gracefully closes the client connection, waiting for running streams to complete.
  1156  func (cc *ClientConn) Shutdown(ctx context.Context) error {
  1157  	if err := cc.sendGoAway(); err != nil {
  1158  		return err
  1159  	}
  1160  	// Wait for all in-flight streams to complete or connection to close
  1161  	done := make(chan struct{})
  1162  	cancelled := false // guarded by cc.mu
  1163  	go func() {
  1164  		cc.mu.Lock()
  1165  		defer cc.mu.Unlock()
  1166  		for {
  1167  			if len(cc.streams) == 0 || cc.closed {
  1168  				cc.closed = true
  1169  				close(done)
  1170  				break
  1171  			}
  1172  			if cancelled {
  1173  				break
  1174  			}
  1175  			cc.cond.Wait()
  1176  		}
  1177  	}()
  1178  	shutdownEnterWaitStateHook()
  1179  	select {
  1180  	case <-done:
  1181  		cc.closeConn()
  1182  		return nil
  1183  	case <-ctx.Done():
  1184  		cc.mu.Lock()
  1185  		// Free the goroutine above
  1186  		cancelled = true
  1187  		cc.cond.Broadcast()
  1188  		cc.mu.Unlock()
  1189  		return ctx.Err()
  1190  	}
  1191  }
  1192  
  1193  func (cc *ClientConn) sendGoAway() error {
  1194  	cc.mu.Lock()
  1195  	closing := cc.closing
  1196  	cc.closing = true
  1197  	maxStreamID := cc.nextStreamID
  1198  	cc.mu.Unlock()
  1199  	if closing {
  1200  		// GOAWAY sent already
  1201  		return nil
  1202  	}
  1203  
  1204  	cc.wmu.Lock()
  1205  	defer cc.wmu.Unlock()
  1206  	// Send a graceful shutdown frame to server
  1207  	if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
  1208  		return err
  1209  	}
  1210  	if err := cc.bw.Flush(); err != nil {
  1211  		return err
  1212  	}
  1213  	// Prevent new requests
  1214  	return nil
  1215  }
  1216  
  1217  // closes the client connection immediately. In-flight requests are interrupted.
  1218  // err is sent to streams.
  1219  func (cc *ClientConn) closeForError(err error) {
  1220  	cc.mu.Lock()
  1221  	cc.closed = true
  1222  	for _, cs := range cc.streams {
  1223  		cs.abortStreamLocked(err)
  1224  	}
  1225  	cc.cond.Broadcast()
  1226  	cc.mu.Unlock()
  1227  	cc.closeConn()
  1228  }
  1229  
  1230  // Close closes the client connection immediately.
  1231  //
  1232  // In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
  1233  func (cc *ClientConn) Close() error {
  1234  	cc.closeForError(errClientConnForceClosed)
  1235  	return nil
  1236  }
  1237  
  1238  // closes the client connection immediately. In-flight requests are interrupted.
  1239  func (cc *ClientConn) closeForLostPing() {
  1240  	err := errors.New("http2: client connection lost")
  1241  	if f := cc.t.CountError; f != nil {
  1242  		f("conn_close_lost_ping")
  1243  	}
  1244  	cc.closeForError(err)
  1245  }
  1246  
  1247  // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
  1248  // exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
  1249  var errRequestCanceled = internal.ErrRequestCanceled
  1250  
  1251  func (cc *ClientConn) responseHeaderTimeout() time.Duration {
  1252  	if cc.t.t1 != nil {
  1253  		return cc.t.t1.ResponseHeaderTimeout()
  1254  	}
  1255  	// No way to do this (yet?) with just an http2.Transport. Probably
  1256  	// no need. Request.Cancel this is the new way. We only need to support
  1257  	// this for compatibility with the old http.Transport fields when
  1258  	// we're doing transparent http2.
  1259  	return 0
  1260  }
  1261  
  1262  // actualContentLength returns a sanitized version of
  1263  // req.ContentLength, where 0 actually means zero (not unknown) and -1
  1264  // means unknown.
  1265  func actualContentLength(req *ClientRequest) int64 {
  1266  	if req.Body == nil || req.Body == NoBody {
  1267  		return 0
  1268  	}
  1269  	if req.ContentLength != 0 {
  1270  		return req.ContentLength
  1271  	}
  1272  	return -1
  1273  }
  1274  
  1275  func (cc *ClientConn) decrStreamReservations() {
  1276  	cc.mu.Lock()
  1277  	defer cc.mu.Unlock()
  1278  	cc.decrStreamReservationsLocked()
  1279  }
  1280  
  1281  func (cc *ClientConn) decrStreamReservationsLocked() {
  1282  	if cc.streamsReserved > 0 {
  1283  		cc.streamsReserved--
  1284  	}
  1285  }
  1286  
  1287  func (cc *ClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
  1288  	return cc.roundTrip(req, nil)
  1289  }
  1290  
  1291  func (cc *ClientConn) roundTrip(req *ClientRequest, streamf func(*clientStream)) (*ClientResponse, error) {
  1292  	ctx := req.Context
  1293  	req.stream = clientStream{
  1294  		cc:                   cc,
  1295  		ctx:                  ctx,
  1296  		reqCancel:            req.Cancel,
  1297  		isHead:               req.Method == "HEAD",
  1298  		reqBody:              req.Body,
  1299  		reqBodyContentLength: actualContentLength(req),
  1300  		trace:                httptrace.ContextClientTrace(ctx),
  1301  		peerClosed:           make(chan struct{}),
  1302  		abort:                make(chan struct{}),
  1303  		respHeaderRecv:       make(chan struct{}),
  1304  		donec:                make(chan struct{}),
  1305  		resTrailer:           req.ResTrailer,
  1306  	}
  1307  	cs := &req.stream
  1308  
  1309  	cs.requestedGzip = httpcommon.IsRequestGzip(req.Method, req.Header, cc.t.disableCompression())
  1310  
  1311  	go cs.doRequest(req, streamf)
  1312  
  1313  	waitDone := func() error {
  1314  		select {
  1315  		case <-cs.donec:
  1316  			return nil
  1317  		case <-ctx.Done():
  1318  			return ctx.Err()
  1319  		case <-cs.reqCancel:
  1320  			return errRequestCanceled
  1321  		}
  1322  	}
  1323  
  1324  	handleResponseHeaders := func() (*ClientResponse, error) {
  1325  		res := cs.res
  1326  		if res.StatusCode > 299 {
  1327  			// On error or status code 3xx, 4xx, 5xx, etc abort any
  1328  			// ongoing write, assuming that the server doesn't care
  1329  			// about our request body. If the server replied with 1xx or
  1330  			// 2xx, however, then assume the server DOES potentially
  1331  			// want our body (e.g. full-duplex streaming:
  1332  			// golang.org/issue/13444). If it turns out the server
  1333  			// doesn't, they'll RST_STREAM us soon enough. This is a
  1334  			// heuristic to avoid adding knobs to Transport. Hopefully
  1335  			// we can keep it.
  1336  			cs.abortRequestBodyWrite()
  1337  		}
  1338  		res.TLS = cc.tlsState
  1339  		if res.Body == NoBody && actualContentLength(req) == 0 {
  1340  			// If there isn't a request or response body still being
  1341  			// written, then wait for the stream to be closed before
  1342  			// RoundTrip returns.
  1343  			if err := waitDone(); err != nil {
  1344  				return nil, err
  1345  			}
  1346  		}
  1347  		return res, nil
  1348  	}
  1349  
  1350  	cancelRequest := func(cs *clientStream, err error) error {
  1351  		cs.cc.mu.Lock()
  1352  		bodyClosed := cs.reqBodyClosed
  1353  		cs.cc.mu.Unlock()
  1354  		// Wait for the request body to be closed.
  1355  		//
  1356  		// If nothing closed the body before now, abortStreamLocked
  1357  		// will have started a goroutine to close it.
  1358  		//
  1359  		// Closing the body before returning avoids a race condition
  1360  		// with net/http checking its readTrackingBody to see if the
  1361  		// body was read from or closed. See golang/go#60041.
  1362  		//
  1363  		// The body is closed in a separate goroutine without the
  1364  		// connection mutex held, but dropping the mutex before waiting
  1365  		// will keep us from holding it indefinitely if the body
  1366  		// close is slow for some reason.
  1367  		if bodyClosed != nil {
  1368  			<-bodyClosed
  1369  		}
  1370  		return err
  1371  	}
  1372  
  1373  	for {
  1374  		select {
  1375  		case <-cs.respHeaderRecv:
  1376  			return handleResponseHeaders()
  1377  		case <-cs.abort:
  1378  			select {
  1379  			case <-cs.respHeaderRecv:
  1380  				// If both cs.respHeaderRecv and cs.abort are signaling,
  1381  				// pick respHeaderRecv. The server probably wrote the
  1382  				// response and immediately reset the stream.
  1383  				// golang.org/issue/49645
  1384  				return handleResponseHeaders()
  1385  			default:
  1386  				waitDone()
  1387  				return nil, cs.abortErr
  1388  			}
  1389  		case <-ctx.Done():
  1390  			err := ctx.Err()
  1391  			cs.abortStream(err)
  1392  			return nil, cancelRequest(cs, err)
  1393  		case <-cs.reqCancel:
  1394  			cs.abortStream(errRequestCanceled)
  1395  			return nil, cancelRequest(cs, errRequestCanceled)
  1396  		}
  1397  	}
  1398  }
  1399  
  1400  // doRequest runs for the duration of the request lifetime.
  1401  //
  1402  // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
  1403  func (cs *clientStream) doRequest(req *ClientRequest, streamf func(*clientStream)) {
  1404  	err := cs.writeRequest(req, streamf)
  1405  	cs.cleanupWriteRequest(err)
  1406  }
  1407  
  1408  var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")
  1409  
  1410  // writeRequest sends a request.
  1411  //
  1412  // It returns nil after the request is written, the response read,
  1413  // and the request stream is half-closed by the peer.
  1414  //
  1415  // It returns non-nil if the request ends otherwise.
  1416  // If the returned error is StreamError, the error Code may be used in resetting the stream.
  1417  func (cs *clientStream) writeRequest(req *ClientRequest, streamf func(*clientStream)) (err error) {
  1418  	cc := cs.cc
  1419  	ctx := cs.ctx
  1420  
  1421  	// wait for setting frames to be received, a server can change this value later,
  1422  	// but we just wait for the first settings frame
  1423  	var isExtendedConnect bool
  1424  	if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
  1425  		isExtendedConnect = true
  1426  	}
  1427  
  1428  	// Acquire the new-request lock by writing to reqHeaderMu.
  1429  	// This lock guards the critical section covering allocating a new stream ID
  1430  	// (requires mu) and creating the stream (requires wmu).
  1431  	if cc.reqHeaderMu == nil {
  1432  		panic("RoundTrip on uninitialized ClientConn") // for tests
  1433  	}
  1434  	if isExtendedConnect {
  1435  		select {
  1436  		case <-cs.reqCancel:
  1437  			return errRequestCanceled
  1438  		case <-ctx.Done():
  1439  			return ctx.Err()
  1440  		case <-cc.seenSettingsChan:
  1441  			if !cc.extendedConnectAllowed {
  1442  				return errExtendedConnectNotSupported
  1443  			}
  1444  		}
  1445  	}
  1446  	select {
  1447  	case cc.reqHeaderMu <- struct{}{}:
  1448  	case <-cs.reqCancel:
  1449  		return errRequestCanceled
  1450  	case <-ctx.Done():
  1451  		return ctx.Err()
  1452  	}
  1453  
  1454  	cc.mu.Lock()
  1455  	if cc.idleTimer != nil {
  1456  		cc.idleTimer.Stop()
  1457  	}
  1458  	cc.decrStreamReservationsLocked()
  1459  	if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
  1460  		cc.mu.Unlock()
  1461  		<-cc.reqHeaderMu
  1462  		return err
  1463  	}
  1464  	cc.addStreamLocked(cs) // assigns stream ID
  1465  	if isConnectionCloseRequest(req) {
  1466  		cc.doNotReuse = true
  1467  	}
  1468  	cc.mu.Unlock()
  1469  
  1470  	if streamf != nil {
  1471  		streamf(cs)
  1472  	}
  1473  
  1474  	continueTimeout := cc.t.expectContinueTimeout()
  1475  	if continueTimeout != 0 {
  1476  		if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
  1477  			continueTimeout = 0
  1478  		} else {
  1479  			cs.on100 = make(chan struct{}, 1)
  1480  		}
  1481  	}
  1482  
  1483  	// Past this point (where we send request headers), it is possible for
  1484  	// RoundTrip to return successfully. Since the RoundTrip contract permits
  1485  	// the caller to "mutate or reuse" the Request after closing the Response's Body,
  1486  	// we must take care when referencing the Request from here on.
  1487  	err = cs.encodeAndWriteHeaders(req)
  1488  	<-cc.reqHeaderMu
  1489  	if err != nil {
  1490  		return err
  1491  	}
  1492  
  1493  	hasBody := cs.reqBodyContentLength != 0
  1494  	if !hasBody {
  1495  		cs.sentEndStream = true
  1496  	} else {
  1497  		if continueTimeout != 0 {
  1498  			traceWait100Continue(cs.trace)
  1499  			timer := time.NewTimer(continueTimeout)
  1500  			select {
  1501  			case <-timer.C:
  1502  				err = nil
  1503  			case <-cs.on100:
  1504  				err = nil
  1505  			case <-cs.abort:
  1506  				err = cs.abortErr
  1507  			case <-ctx.Done():
  1508  				err = ctx.Err()
  1509  			case <-cs.reqCancel:
  1510  				err = errRequestCanceled
  1511  			}
  1512  			timer.Stop()
  1513  			if err != nil {
  1514  				traceWroteRequest(cs.trace, err)
  1515  				return err
  1516  			}
  1517  		}
  1518  
  1519  		if err = cs.writeRequestBody(req); err != nil {
  1520  			if err != errStopReqBodyWrite {
  1521  				traceWroteRequest(cs.trace, err)
  1522  				return err
  1523  			}
  1524  		} else {
  1525  			cs.sentEndStream = true
  1526  		}
  1527  	}
  1528  
  1529  	traceWroteRequest(cs.trace, err)
  1530  
  1531  	var respHeaderTimer <-chan time.Time
  1532  	var respHeaderRecv chan struct{}
  1533  	if d := cc.responseHeaderTimeout(); d != 0 {
  1534  		timer := time.NewTimer(d)
  1535  		defer timer.Stop()
  1536  		respHeaderTimer = timer.C
  1537  		respHeaderRecv = cs.respHeaderRecv
  1538  	}
  1539  	// Wait until the peer half-closes its end of the stream,
  1540  	// or until the request is aborted (via context, error, or otherwise),
  1541  	// whichever comes first.
  1542  	for {
  1543  		select {
  1544  		case <-cs.peerClosed:
  1545  			return nil
  1546  		case <-respHeaderTimer:
  1547  			return errTimeout
  1548  		case <-respHeaderRecv:
  1549  			respHeaderRecv = nil
  1550  			respHeaderTimer = nil // keep waiting for END_STREAM
  1551  		case <-cs.abort:
  1552  			return cs.abortErr
  1553  		case <-ctx.Done():
  1554  			return ctx.Err()
  1555  		case <-cs.reqCancel:
  1556  			return errRequestCanceled
  1557  		}
  1558  	}
  1559  }
  1560  
  1561  func (cs *clientStream) encodeAndWriteHeaders(req *ClientRequest) error {
  1562  	cc := cs.cc
  1563  	ctx := cs.ctx
  1564  
  1565  	cc.wmu.Lock()
  1566  	defer cc.wmu.Unlock()
  1567  
  1568  	// If the request was canceled while waiting for cc.mu, just quit.
  1569  	select {
  1570  	case <-cs.abort:
  1571  		return cs.abortErr
  1572  	case <-ctx.Done():
  1573  		return ctx.Err()
  1574  	case <-cs.reqCancel:
  1575  		return errRequestCanceled
  1576  	default:
  1577  	}
  1578  
  1579  	// Encode headers.
  1580  	//
  1581  	// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
  1582  	// sent by writeRequestBody below, along with any Trailers,
  1583  	// again in form HEADERS{1}, CONTINUATION{0,})
  1584  	cc.hbuf.Reset()
  1585  	res, err := encodeRequestHeaders(req, cs.requestedGzip, cc.peerMaxHeaderListSize, func(name, value string) {
  1586  		cc.writeHeader(name, value)
  1587  	})
  1588  	if err != nil {
  1589  		return fmt.Errorf("http2: %w", err)
  1590  	}
  1591  	hdrs := cc.hbuf.Bytes()
  1592  
  1593  	// Write the request.
  1594  	endStream := !res.HasBody && !res.HasTrailers
  1595  	cs.sentHeaders = true
  1596  	err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
  1597  	traceWroteHeaders(cs.trace)
  1598  	return err
  1599  }
  1600  
  1601  func encodeRequestHeaders(req *ClientRequest, addGzipHeader bool, peerMaxHeaderListSize uint64, headerf func(name, value string)) (httpcommon.EncodeHeadersResult, error) {
  1602  	return httpcommon.EncodeHeaders(req.Context, httpcommon.EncodeHeadersParam{
  1603  		Request: httpcommon.Request{
  1604  			Header:              req.Header,
  1605  			Trailer:             req.Trailer,
  1606  			URL:                 req.URL,
  1607  			Host:                req.Host,
  1608  			Method:              req.Method,
  1609  			ActualContentLength: actualContentLength(req),
  1610  		},
  1611  		AddGzipHeader:         addGzipHeader,
  1612  		PeerMaxHeaderListSize: peerMaxHeaderListSize,
  1613  		DefaultUserAgent:      defaultUserAgent,
  1614  	}, headerf)
  1615  }
  1616  
  1617  // cleanupWriteRequest performs post-request tasks.
  1618  //
  1619  // If err (the result of writeRequest) is non-nil and the stream is not closed,
  1620  // cleanupWriteRequest will send a reset to the peer.
  1621  func (cs *clientStream) cleanupWriteRequest(err error) {
  1622  	cc := cs.cc
  1623  
  1624  	if cs.ID == 0 {
  1625  		// We were canceled before creating the stream, so return our reservation.
  1626  		cc.decrStreamReservations()
  1627  	}
  1628  
  1629  	// TODO: write h12Compare test showing whether
  1630  	// Request.Body is closed by the Transport,
  1631  	// and in multiple cases: server replies <=299 and >299
  1632  	// while still writing request body
  1633  	cc.mu.Lock()
  1634  	mustCloseBody := false
  1635  	if cs.reqBody != nil && cs.reqBodyClosed == nil {
  1636  		mustCloseBody = true
  1637  		cs.reqBodyClosed = make(chan struct{})
  1638  	}
  1639  	bodyClosed := cs.reqBodyClosed
  1640  	closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
  1641  	// Have we read any frames from the connection since sending this request?
  1642  	readSinceStream := cc.readBeforeStreamID > cs.ID
  1643  	cc.mu.Unlock()
  1644  	if mustCloseBody {
  1645  		cs.reqBody.Close()
  1646  		close(bodyClosed)
  1647  	}
  1648  	if bodyClosed != nil {
  1649  		<-bodyClosed
  1650  	}
  1651  
  1652  	if err != nil && cs.sentEndStream {
  1653  		// If the connection is closed immediately after the response is read,
  1654  		// we may be aborted before finishing up here. If the stream was closed
  1655  		// cleanly on both sides, there is no error.
  1656  		select {
  1657  		case <-cs.peerClosed:
  1658  			err = nil
  1659  		default:
  1660  		}
  1661  	}
  1662  	if err != nil {
  1663  		cs.abortStream(err) // possibly redundant, but harmless
  1664  		if cs.sentHeaders {
  1665  			if se, ok := err.(StreamError); ok {
  1666  				if se.Cause != errFromPeer {
  1667  					cc.writeStreamReset(cs.ID, se.Code, false, err)
  1668  				}
  1669  			} else {
  1670  				// We're cancelling an in-flight request.
  1671  				//
  1672  				// This could be due to the server becoming unresponsive.
  1673  				// To avoid sending too many requests on a dead connection,
  1674  				// if we haven't read any frames from the connection since
  1675  				// sending this request, we let it continue to consume
  1676  				// a concurrency slot until we can confirm the server is
  1677  				// still responding.
  1678  				// We do this by sending a PING frame along with the RST_STREAM
  1679  				// (unless a ping is already in flight).
  1680  				//
  1681  				// For simplicity, we don't bother tracking the PING payload:
  1682  				// We reset cc.pendingResets any time we receive a PING ACK.
  1683  				//
  1684  				// We skip this if the conn is going to be closed on idle,
  1685  				// because it's short lived and will probably be closed before
  1686  				// we get the ping response.
  1687  				ping := false
  1688  				if !closeOnIdle && !readSinceStream {
  1689  					cc.mu.Lock()
  1690  					// rstStreamPingsBlocked works around a gRPC behavior:
  1691  					// see comment on the field for details.
  1692  					if !cc.rstStreamPingsBlocked {
  1693  						if cc.pendingResets == 0 {
  1694  							ping = true
  1695  						}
  1696  						cc.pendingResets++
  1697  					}
  1698  					cc.mu.Unlock()
  1699  				}
  1700  				cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
  1701  			}
  1702  		}
  1703  		cs.bufPipe.CloseWithError(err) // no-op if already closed
  1704  	} else {
  1705  		if cs.sentHeaders && !cs.sentEndStream {
  1706  			cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
  1707  		}
  1708  		cs.bufPipe.CloseWithError(errRequestCanceled)
  1709  	}
  1710  	if cs.ID != 0 {
  1711  		cc.forgetStreamID(cs.ID)
  1712  	}
  1713  
  1714  	cc.wmu.Lock()
  1715  	werr := cc.werr
  1716  	cc.wmu.Unlock()
  1717  	if werr != nil {
  1718  		cc.Close()
  1719  	}
  1720  
  1721  	close(cs.donec)
  1722  	cc.maybeCallStateHook()
  1723  }
  1724  
  1725  // awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
  1726  // Must hold cc.mu.
  1727  func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
  1728  	for {
  1729  		if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
  1730  			// This is the very first request sent to this connection.
  1731  			// Return a fatal error which aborts the retry loop.
  1732  			return errClientConnNotEstablished
  1733  		}
  1734  		cc.lastActive = time.Now()
  1735  		if cc.closed || !cc.canTakeNewRequestLocked() {
  1736  			return errClientConnUnusable
  1737  		}
  1738  		cc.lastIdle = time.Time{}
  1739  		if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
  1740  			return nil
  1741  		}
  1742  		cc.pendingRequests++
  1743  		cc.cond.Wait()
  1744  		cc.pendingRequests--
  1745  		select {
  1746  		case <-cs.abort:
  1747  			return cs.abortErr
  1748  		default:
  1749  		}
  1750  	}
  1751  }
  1752  
  1753  // requires cc.wmu be held
  1754  func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
  1755  	first := true // first frame written (HEADERS is first, then CONTINUATION)
  1756  	for len(hdrs) > 0 && cc.werr == nil {
  1757  		chunk := hdrs
  1758  		if len(chunk) > maxFrameSize {
  1759  			chunk = chunk[:maxFrameSize]
  1760  		}
  1761  		hdrs = hdrs[len(chunk):]
  1762  		endHeaders := len(hdrs) == 0
  1763  		if first {
  1764  			cc.fr.WriteHeaders(HeadersFrameParam{
  1765  				StreamID:      streamID,
  1766  				BlockFragment: chunk,
  1767  				EndStream:     endStream,
  1768  				EndHeaders:    endHeaders,
  1769  			})
  1770  			first = false
  1771  		} else {
  1772  			cc.fr.WriteContinuation(streamID, endHeaders, chunk)
  1773  		}
  1774  	}
  1775  	cc.bw.Flush()
  1776  	return cc.werr
  1777  }
  1778  
  1779  // internal error values; they don't escape to callers
  1780  var (
  1781  	// abort request body write; don't send cancel
  1782  	errStopReqBodyWrite = errors.New("http2: aborting request body write")
  1783  
  1784  	// abort request body write, but send stream reset of cancel.
  1785  	errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
  1786  
  1787  	errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
  1788  )
  1789  
  1790  // frameScratchBufferLen returns the length of a buffer to use for
  1791  // outgoing request bodies to read/write to/from.
  1792  //
  1793  // It returns max(1, min(peer's advertised max frame size,
  1794  // Request.ContentLength+1, 512KB)).
  1795  func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
  1796  	const max = 512 << 10
  1797  	n := int64(maxFrameSize)
  1798  	if n > max {
  1799  		n = max
  1800  	}
  1801  	if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
  1802  		// Add an extra byte past the declared content-length to
  1803  		// give the caller's Request.Body io.Reader a chance to
  1804  		// give us more bytes than they declared, so we can catch it
  1805  		// early.
  1806  		n = cl + 1
  1807  	}
  1808  	if n < 1 {
  1809  		return 1
  1810  	}
  1811  	return int(n) // doesn't truncate; max is 512K
  1812  }
  1813  
  1814  // Seven bufPools manage different frame sizes. This helps to avoid scenarios where long-running
  1815  // streaming requests using small frame sizes occupy large buffers initially allocated for prior
  1816  // requests needing big buffers. The size ranges are as follows:
  1817  // {0 KB, 16 KB], {16 KB, 32 KB], {32 KB, 64 KB], {64 KB, 128 KB], {128 KB, 256 KB],
  1818  // {256 KB, 512 KB], {512 KB, infinity}
  1819  // In practice, the maximum scratch buffer size should not exceed 512 KB due to
  1820  // frameScratchBufferLen(maxFrameSize), thus the "infinity pool" should never be used.
  1821  // It exists mainly as a safety measure, for potential future increases in max buffer size.
  1822  var bufPools [7]sync.Pool // of *[]byte
  1823  func bufPoolIndex(size int) int {
  1824  	if size <= 16384 {
  1825  		return 0
  1826  	}
  1827  	size -= 1
  1828  	bits := bits.Len(uint(size))
  1829  	index := bits - 14
  1830  	if index >= len(bufPools) {
  1831  		return len(bufPools) - 1
  1832  	}
  1833  	return index
  1834  }
  1835  
  1836  func (cs *clientStream) writeRequestBody(req *ClientRequest) (err error) {
  1837  	cc := cs.cc
  1838  	body := cs.reqBody
  1839  	sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
  1840  
  1841  	hasTrailers := req.Trailer != nil
  1842  	remainLen := cs.reqBodyContentLength
  1843  	hasContentLen := remainLen != -1
  1844  
  1845  	cc.mu.Lock()
  1846  	maxFrameSize := int(cc.maxFrameSize)
  1847  	cc.mu.Unlock()
  1848  
  1849  	// Scratch buffer for reading into & writing from.
  1850  	scratchLen := cs.frameScratchBufferLen(maxFrameSize)
  1851  	var buf []byte
  1852  	index := bufPoolIndex(scratchLen)
  1853  	if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
  1854  		defer bufPools[index].Put(bp)
  1855  		buf = *bp
  1856  	} else {
  1857  		buf = make([]byte, scratchLen)
  1858  		defer bufPools[index].Put(&buf)
  1859  	}
  1860  
  1861  	var sawEOF bool
  1862  	for !sawEOF {
  1863  		n, err := body.Read(buf)
  1864  		if hasContentLen {
  1865  			remainLen -= int64(n)
  1866  			if remainLen == 0 && err == nil {
  1867  				// The request body's Content-Length was predeclared and
  1868  				// we just finished reading it all, but the underlying io.Reader
  1869  				// returned the final chunk with a nil error (which is one of
  1870  				// the two valid things a Reader can do at EOF). Because we'd prefer
  1871  				// to send the END_STREAM bit early, double-check that we're actually
  1872  				// at EOF. Subsequent reads should return (0, EOF) at this point.
  1873  				// If either value is different, we return an error in one of two ways below.
  1874  				var scratch [1]byte
  1875  				var n1 int
  1876  				n1, err = body.Read(scratch[:])
  1877  				remainLen -= int64(n1)
  1878  			}
  1879  			if remainLen < 0 {
  1880  				err = errReqBodyTooLong
  1881  				return err
  1882  			}
  1883  		}
  1884  		if err != nil {
  1885  			cc.mu.Lock()
  1886  			bodyClosed := cs.reqBodyClosed != nil
  1887  			cc.mu.Unlock()
  1888  			switch {
  1889  			case bodyClosed:
  1890  				return errStopReqBodyWrite
  1891  			case err == io.EOF:
  1892  				sawEOF = true
  1893  				err = nil
  1894  			default:
  1895  				return err
  1896  			}
  1897  		}
  1898  
  1899  		remain := buf[:n]
  1900  		for len(remain) > 0 && err == nil {
  1901  			var allowed int32
  1902  			allowed, err = cs.awaitFlowControl(len(remain))
  1903  			if err != nil {
  1904  				return err
  1905  			}
  1906  			cc.wmu.Lock()
  1907  			data := remain[:allowed]
  1908  			remain = remain[allowed:]
  1909  			sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
  1910  			err = cc.fr.WriteData(cs.ID, sentEnd, data)
  1911  			if err == nil {
  1912  				// TODO(bradfitz): this flush is for latency, not bandwidth.
  1913  				// Most requests won't need this. Make this opt-in or
  1914  				// opt-out?  Use some heuristic on the body type? Nagel-like
  1915  				// timers?  Based on 'n'? Only last chunk of this for loop,
  1916  				// unless flow control tokens are low? For now, always.
  1917  				// If we change this, see comment below.
  1918  				err = cc.bw.Flush()
  1919  			}
  1920  			cc.wmu.Unlock()
  1921  		}
  1922  		if err != nil {
  1923  			return err
  1924  		}
  1925  	}
  1926  
  1927  	if sentEnd {
  1928  		// Already sent END_STREAM (which implies we have no
  1929  		// trailers) and flushed, because currently all
  1930  		// WriteData frames above get a flush. So we're done.
  1931  		return nil
  1932  	}
  1933  
  1934  	// Since the RoundTrip contract permits the caller to "mutate or reuse"
  1935  	// a request after the Response's Body is closed, verify that this hasn't
  1936  	// happened before accessing the trailers.
  1937  	cc.mu.Lock()
  1938  	trailer := req.Trailer
  1939  	err = cs.abortErr
  1940  	cc.mu.Unlock()
  1941  	if err != nil {
  1942  		return err
  1943  	}
  1944  
  1945  	cc.wmu.Lock()
  1946  	defer cc.wmu.Unlock()
  1947  	var trls []byte
  1948  	if len(trailer) > 0 {
  1949  		trls, err = cc.encodeTrailers(trailer)
  1950  		if err != nil {
  1951  			return err
  1952  		}
  1953  	}
  1954  
  1955  	// Two ways to send END_STREAM: either with trailers, or
  1956  	// with an empty DATA frame.
  1957  	if len(trls) > 0 {
  1958  		err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
  1959  	} else {
  1960  		err = cc.fr.WriteData(cs.ID, true, nil)
  1961  	}
  1962  	if ferr := cc.bw.Flush(); ferr != nil && err == nil {
  1963  		err = ferr
  1964  	}
  1965  	return err
  1966  }
  1967  
  1968  // awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow
  1969  // control tokens from the server.
  1970  // It returns either the non-zero number of tokens taken or an error
  1971  // if the stream is dead.
  1972  func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
  1973  	cc := cs.cc
  1974  	ctx := cs.ctx
  1975  	cc.mu.Lock()
  1976  	defer cc.mu.Unlock()
  1977  	for {
  1978  		if cc.closed {
  1979  			return 0, errClientConnClosed
  1980  		}
  1981  		if cs.reqBodyClosed != nil {
  1982  			return 0, errStopReqBodyWrite
  1983  		}
  1984  		select {
  1985  		case <-cs.abort:
  1986  			return 0, cs.abortErr
  1987  		case <-ctx.Done():
  1988  			return 0, ctx.Err()
  1989  		case <-cs.reqCancel:
  1990  			return 0, errRequestCanceled
  1991  		default:
  1992  		}
  1993  		if a := cs.flow.available(); a > 0 {
  1994  			take := a
  1995  			if int(take) > maxBytes {
  1996  
  1997  				take = int32(maxBytes) // can't truncate int; take is int32
  1998  			}
  1999  			if take > int32(cc.maxFrameSize) {
  2000  				take = int32(cc.maxFrameSize)
  2001  			}
  2002  			cs.flow.take(take)
  2003  			return take, nil
  2004  		}
  2005  		cc.cond.Wait()
  2006  	}
  2007  }
  2008  
  2009  // requires cc.wmu be held.
  2010  func (cc *ClientConn) encodeTrailers(trailer Header) ([]byte, error) {
  2011  	cc.hbuf.Reset()
  2012  
  2013  	hlSize := uint64(0)
  2014  	for k, vv := range trailer {
  2015  		for _, v := range vv {
  2016  			hf := hpack.HeaderField{Name: k, Value: v}
  2017  			hlSize += uint64(hf.Size())
  2018  		}
  2019  	}
  2020  	if hlSize > cc.peerMaxHeaderListSize {
  2021  		return nil, errRequestHeaderListSize
  2022  	}
  2023  
  2024  	for k, vv := range trailer {
  2025  		lowKey, ascii := httpcommon.LowerHeader(k)
  2026  		if !ascii {
  2027  			// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
  2028  			// field names have to be ASCII characters (just as in HTTP/1.x).
  2029  			continue
  2030  		}
  2031  		// Transfer-Encoding, etc.. have already been filtered at the
  2032  		// start of RoundTrip
  2033  		for _, v := range vv {
  2034  			cc.writeHeader(lowKey, v)
  2035  		}
  2036  	}
  2037  	return cc.hbuf.Bytes(), nil
  2038  }
  2039  
  2040  func (cc *ClientConn) writeHeader(name, value string) {
  2041  	if VerboseLogs {
  2042  		log.Printf("http2: Transport encoding header %q = %q", name, value)
  2043  	}
  2044  	cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
  2045  }
  2046  
  2047  type resAndError struct {
  2048  	_   incomparable
  2049  	res *ClientResponse
  2050  	err error
  2051  }
  2052  
  2053  // requires cc.mu be held.
  2054  func (cc *ClientConn) addStreamLocked(cs *clientStream) {
  2055  	cs.flow.add(int32(cc.initialWindowSize))
  2056  	cs.flow.setConnFlow(&cc.flow)
  2057  	cs.inflow.init(cc.initialStreamRecvWindowSize)
  2058  	cs.ID = cc.nextStreamID
  2059  	cc.nextStreamID += 2
  2060  	cc.streams[cs.ID] = cs
  2061  	if cs.ID == 0 {
  2062  		panic("assigned stream ID 0")
  2063  	}
  2064  }
  2065  
  2066  func (cc *ClientConn) forgetStreamID(id uint32) {
  2067  	cc.mu.Lock()
  2068  	slen := len(cc.streams)
  2069  	delete(cc.streams, id)
  2070  	if len(cc.streams) != slen-1 {
  2071  		panic("forgetting unknown stream id")
  2072  	}
  2073  	cc.lastActive = time.Now()
  2074  	if len(cc.streams) == 0 && cc.idleTimer != nil {
  2075  		cc.idleTimer.Reset(cc.idleTimeout)
  2076  		cc.lastIdle = time.Now()
  2077  	}
  2078  	// Wake up writeRequestBody via clientStream.awaitFlowControl and
  2079  	// wake up RoundTrip if there is a pending request.
  2080  	cc.cond.Broadcast()
  2081  
  2082  	closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
  2083  	if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
  2084  		if VerboseLogs {
  2085  			cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
  2086  		}
  2087  		cc.closed = true
  2088  		defer cc.closeConn()
  2089  	}
  2090  
  2091  	cc.mu.Unlock()
  2092  }
  2093  
  2094  // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
  2095  type clientConnReadLoop struct {
  2096  	_  incomparable
  2097  	cc *ClientConn
  2098  }
  2099  
  2100  // readLoop runs in its own goroutine and reads and dispatches frames.
  2101  func (cc *ClientConn) readLoop() {
  2102  	rl := &clientConnReadLoop{cc: cc}
  2103  	defer rl.cleanup()
  2104  	cc.readerErr = rl.run()
  2105  	if ce, ok := cc.readerErr.(ConnectionError); ok {
  2106  		cc.wmu.Lock()
  2107  		cc.fr.WriteGoAway(0, ErrCode(ce), nil)
  2108  		cc.wmu.Unlock()
  2109  	}
  2110  }
  2111  
  2112  // GoAwayError is returned by the Transport when the server closes the
  2113  // TCP connection after sending a GOAWAY frame.
  2114  type GoAwayError struct {
  2115  	LastStreamID uint32
  2116  	ErrCode      ErrCode
  2117  	DebugData    string
  2118  }
  2119  
  2120  func (e GoAwayError) Error() string {
  2121  	return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
  2122  		e.LastStreamID, e.ErrCode, e.DebugData)
  2123  }
  2124  
  2125  func isEOFOrNetReadError(err error) bool {
  2126  	if err == io.EOF {
  2127  		return true
  2128  	}
  2129  	ne, ok := err.(*net.OpError)
  2130  	return ok && ne.Op == "read"
  2131  }
  2132  
  2133  func (rl *clientConnReadLoop) cleanup() {
  2134  	cc := rl.cc
  2135  	defer cc.closeConn()
  2136  	defer close(cc.readerDone)
  2137  
  2138  	if cc.idleTimer != nil {
  2139  		cc.idleTimer.Stop()
  2140  	}
  2141  
  2142  	// Close any response bodies if the server closes prematurely.
  2143  	// TODO: also do this if we've written the headers but not
  2144  	// gotten a response yet.
  2145  	err := cc.readerErr
  2146  	cc.mu.Lock()
  2147  	if cc.goAway != nil && isEOFOrNetReadError(err) {
  2148  		err = GoAwayError{
  2149  			LastStreamID: cc.goAway.LastStreamID,
  2150  			ErrCode:      cc.goAway.ErrCode,
  2151  			DebugData:    cc.goAwayDebug,
  2152  		}
  2153  	} else if err == io.EOF {
  2154  		err = io.ErrUnexpectedEOF
  2155  	}
  2156  	cc.closed = true
  2157  
  2158  	// If the connection has never been used, and has been open for only a short time,
  2159  	// leave it in the connection pool for a little while.
  2160  	//
  2161  	// This avoids a situation where new connections are constantly created,
  2162  	// added to the pool, fail, and are removed from the pool, without any error
  2163  	// being surfaced to the user.
  2164  	unusedWaitTime := 5 * time.Second
  2165  	if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
  2166  		unusedWaitTime = cc.idleTimeout
  2167  	}
  2168  	idleTime := time.Now().Sub(cc.lastActive)
  2169  	if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
  2170  		cc.idleTimer = time.AfterFunc(unusedWaitTime-idleTime, func() {
  2171  			cc.t.connPool().MarkDead(cc)
  2172  		})
  2173  	} else {
  2174  		cc.mu.Unlock() // avoid any deadlocks in MarkDead
  2175  		cc.t.connPool().MarkDead(cc)
  2176  		cc.mu.Lock()
  2177  	}
  2178  
  2179  	for _, cs := range cc.streams {
  2180  		select {
  2181  		case <-cs.peerClosed:
  2182  			// The server closed the stream before closing the conn,
  2183  			// so no need to interrupt it.
  2184  		default:
  2185  			cs.abortStreamLocked(err)
  2186  		}
  2187  	}
  2188  	cc.cond.Broadcast()
  2189  	cc.mu.Unlock()
  2190  
  2191  	if !cc.seenSettings {
  2192  		// If we have a pending request that wants extended CONNECT,
  2193  		// let it continue and fail with the connection error.
  2194  		cc.extendedConnectAllowed = true
  2195  		close(cc.seenSettingsChan)
  2196  	}
  2197  }
  2198  
  2199  // countReadFrameError calls Transport.CountError with a string
  2200  // representing err.
  2201  func (cc *ClientConn) countReadFrameError(err error) {
  2202  	f := cc.t.CountError
  2203  	if f == nil || err == nil {
  2204  		return
  2205  	}
  2206  	if ce, ok := err.(ConnectionError); ok {
  2207  		errCode := ErrCode(ce)
  2208  		f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
  2209  		return
  2210  	}
  2211  	if errors.Is(err, io.EOF) {
  2212  		f("read_frame_eof")
  2213  		return
  2214  	}
  2215  	if errors.Is(err, io.ErrUnexpectedEOF) {
  2216  		f("read_frame_unexpected_eof")
  2217  		return
  2218  	}
  2219  	if errors.Is(err, ErrFrameTooLarge) {
  2220  		f("read_frame_too_large")
  2221  		return
  2222  	}
  2223  	f("read_frame_other")
  2224  }
  2225  
  2226  func (rl *clientConnReadLoop) run() error {
  2227  	cc := rl.cc
  2228  	gotSettings := false
  2229  	readIdleTimeout := cc.readIdleTimeout
  2230  	var t *time.Timer
  2231  	if readIdleTimeout != 0 {
  2232  		t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
  2233  	}
  2234  	for {
  2235  		f, err := cc.fr.ReadFrame()
  2236  		if t != nil {
  2237  			t.Reset(readIdleTimeout)
  2238  		}
  2239  		if err != nil {
  2240  			cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
  2241  		}
  2242  		if se, ok := err.(StreamError); ok {
  2243  			if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
  2244  				if se.Cause == nil {
  2245  					se.Cause = cc.fr.errDetail
  2246  				}
  2247  				rl.endStreamError(cs, se)
  2248  			}
  2249  			continue
  2250  		} else if err != nil {
  2251  			cc.countReadFrameError(err)
  2252  			return err
  2253  		}
  2254  		if VerboseLogs {
  2255  			cc.vlogf("http2: Transport received %s", summarizeFrame(f))
  2256  		}
  2257  		if !gotSettings {
  2258  			if _, ok := f.(*SettingsFrame); !ok {
  2259  				cc.logf("protocol error: received %T before a SETTINGS frame", f)
  2260  				return ConnectionError(ErrCodeProtocol)
  2261  			}
  2262  			gotSettings = true
  2263  		}
  2264  
  2265  		switch f := f.(type) {
  2266  		case *MetaHeadersFrame:
  2267  			err = rl.processHeaders(f)
  2268  		case *DataFrame:
  2269  			err = rl.processData(f)
  2270  		case *GoAwayFrame:
  2271  			err = rl.processGoAway(f)
  2272  		case *RSTStreamFrame:
  2273  			err = rl.processResetStream(f)
  2274  		case *SettingsFrame:
  2275  			err = rl.processSettings(f)
  2276  		case *PushPromiseFrame:
  2277  			err = rl.processPushPromise(f)
  2278  		case *WindowUpdateFrame:
  2279  			err = rl.processWindowUpdate(f)
  2280  		case *PingFrame:
  2281  			err = rl.processPing(f)
  2282  		default:
  2283  			cc.logf("Transport: unhandled response frame type %T", f)
  2284  		}
  2285  		if err != nil {
  2286  			if VerboseLogs {
  2287  				cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
  2288  			}
  2289  			return err
  2290  		}
  2291  	}
  2292  }
  2293  
  2294  func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
  2295  	cs := rl.streamByID(f.StreamID, headerOrDataFrame)
  2296  	if cs == nil {
  2297  		// We'd get here if we canceled a request while the
  2298  		// server had its response still in flight. So if this
  2299  		// was just something we canceled, ignore it.
  2300  		return nil
  2301  	}
  2302  	if cs.readClosed {
  2303  		rl.endStreamError(cs, StreamError{
  2304  			StreamID: f.StreamID,
  2305  			Code:     ErrCodeProtocol,
  2306  			Cause:    errors.New("protocol error: headers after END_STREAM"),
  2307  		})
  2308  		return nil
  2309  	}
  2310  	if !cs.firstByte {
  2311  		if cs.trace != nil {
  2312  			// TODO(bradfitz): move first response byte earlier,
  2313  			// when we first read the 9 byte header, not waiting
  2314  			// until all the HEADERS+CONTINUATION frames have been
  2315  			// merged. This works for now.
  2316  			traceFirstResponseByte(cs.trace)
  2317  		}
  2318  		cs.firstByte = true
  2319  	}
  2320  	if !cs.pastHeaders {
  2321  		cs.pastHeaders = true
  2322  	} else {
  2323  		return rl.processTrailers(cs, f)
  2324  	}
  2325  
  2326  	res, err := rl.handleResponse(cs, f)
  2327  	if err != nil {
  2328  		if _, ok := err.(ConnectionError); ok {
  2329  			return err
  2330  		}
  2331  		// Any other error type is a stream error.
  2332  		rl.endStreamError(cs, StreamError{
  2333  			StreamID: f.StreamID,
  2334  			Code:     ErrCodeProtocol,
  2335  			Cause:    err,
  2336  		})
  2337  		return nil // return nil from process* funcs to keep conn alive
  2338  	}
  2339  	if res == nil {
  2340  		// (nil, nil) special case. See handleResponse docs.
  2341  		return nil
  2342  	}
  2343  	cs.res = res
  2344  	close(cs.respHeaderRecv)
  2345  	if f.StreamEnded() {
  2346  		rl.endStream(cs)
  2347  	}
  2348  	return nil
  2349  }
  2350  
  2351  // may return error types nil, or ConnectionError. Any other error value
  2352  // is a StreamError of type ErrCodeProtocol. The returned error in that case
  2353  // is the detail.
  2354  //
  2355  // As a special case, handleResponse may return (nil, nil) to skip the
  2356  // frame (currently only used for 1xx responses).
  2357  func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*ClientResponse, error) {
  2358  	if f.Truncated {
  2359  		return nil, errResponseHeaderListSize
  2360  	}
  2361  
  2362  	status := f.PseudoValue("status")
  2363  	if status == "" {
  2364  		return nil, errors.New("malformed response from server: missing status pseudo header")
  2365  	}
  2366  	statusCode, err := strconv.Atoi(status)
  2367  	if err != nil {
  2368  		return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
  2369  	}
  2370  
  2371  	regularFields := f.RegularFields()
  2372  	strs := make([]string, len(regularFields))
  2373  	header := make(Header, len(regularFields))
  2374  	res := &cs.staticResp
  2375  	cs.staticResp = ClientResponse{
  2376  		Header:     header,
  2377  		StatusCode: statusCode,
  2378  		Status:     status,
  2379  	}
  2380  	for _, hf := range regularFields {
  2381  		key := httpcommon.CanonicalHeader(hf.Name)
  2382  		if key == "Trailer" {
  2383  			t := res.Trailer
  2384  			if t == nil {
  2385  				t = make(Header)
  2386  				res.Trailer = t
  2387  			}
  2388  			foreachHeaderElement(hf.Value, func(v string) {
  2389  				t[httpcommon.CanonicalHeader(v)] = nil
  2390  			})
  2391  		} else {
  2392  			vv := header[key]
  2393  			if vv == nil && len(strs) > 0 {
  2394  				// More than likely this will be a single-element key.
  2395  				// Most headers aren't multi-valued.
  2396  				// Set the capacity on strs[0] to 1, so any future append
  2397  				// won't extend the slice into the other strings.
  2398  				vv, strs = strs[:1:1], strs[1:]
  2399  				vv[0] = hf.Value
  2400  				header[key] = vv
  2401  			} else {
  2402  				header[key] = append(vv, hf.Value)
  2403  			}
  2404  		}
  2405  	}
  2406  
  2407  	if statusCode >= 100 && statusCode <= 199 {
  2408  		if f.StreamEnded() {
  2409  			return nil, errors.New("1xx informational response with END_STREAM flag")
  2410  		}
  2411  		if fn := cs.get1xxTraceFunc(); fn != nil {
  2412  			// If the 1xx response is being delivered to the user,
  2413  			// then they're responsible for limiting the number
  2414  			// of responses.
  2415  			if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
  2416  				return nil, err
  2417  			}
  2418  		} else {
  2419  			// If the user didn't examine the 1xx response, then we
  2420  			// limit the size of all 1xx headers.
  2421  			//
  2422  			// This differs a bit from the HTTP/1 implementation, which
  2423  			// limits the size of all 1xx headers plus the final response.
  2424  			// Use the larger limit of MaxHeaderListSize and
  2425  			// net/http.Transport.MaxResponseHeaderBytes.
  2426  			limit := int64(cs.cc.t.maxHeaderListSize())
  2427  			if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes() > limit {
  2428  				limit = t1.MaxResponseHeaderBytes()
  2429  			}
  2430  			for _, h := range f.Fields {
  2431  				cs.totalHeaderSize += int64(h.Size())
  2432  			}
  2433  			if cs.totalHeaderSize > limit {
  2434  				if VerboseLogs {
  2435  					log.Printf("http2: 1xx informational responses too large")
  2436  				}
  2437  				return nil, errors.New("header list too large")
  2438  			}
  2439  		}
  2440  		if statusCode == 100 {
  2441  			traceGot100Continue(cs.trace)
  2442  			select {
  2443  			case cs.on100 <- struct{}{}:
  2444  			default:
  2445  			}
  2446  		}
  2447  		cs.pastHeaders = false // do it all again
  2448  		return nil, nil
  2449  	}
  2450  
  2451  	res.ContentLength = -1
  2452  	if clens := res.Header["Content-Length"]; len(clens) == 1 {
  2453  		if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
  2454  			res.ContentLength = int64(cl)
  2455  		} else {
  2456  			// TODO: care? unlike http/1, it won't mess up our framing, so it's
  2457  			// more safe smuggling-wise to ignore.
  2458  		}
  2459  	} else if len(clens) > 1 {
  2460  		// TODO: care? unlike http/1, it won't mess up our framing, so it's
  2461  		// more safe smuggling-wise to ignore.
  2462  	} else if f.StreamEnded() && !cs.isHead {
  2463  		res.ContentLength = 0
  2464  	}
  2465  
  2466  	if cs.isHead {
  2467  		res.Body = NoBody
  2468  		return res, nil
  2469  	}
  2470  
  2471  	if f.StreamEnded() {
  2472  		if res.ContentLength > 0 {
  2473  			res.Body = missingBody{}
  2474  		} else {
  2475  			res.Body = NoBody
  2476  		}
  2477  		return res, nil
  2478  	}
  2479  
  2480  	cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
  2481  	cs.bytesRemain = res.ContentLength
  2482  	res.Body = transportResponseBody{cs}
  2483  
  2484  	if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
  2485  		res.Header.Del("Content-Encoding")
  2486  		res.Header.Del("Content-Length")
  2487  		res.ContentLength = -1
  2488  		res.Body = &gzipReader{body: res.Body}
  2489  		res.Uncompressed = true
  2490  	}
  2491  	return res, nil
  2492  }
  2493  
  2494  func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
  2495  	if cs.pastTrailers {
  2496  		// Too many HEADERS frames for this stream.
  2497  		return ConnectionError(ErrCodeProtocol)
  2498  	}
  2499  	cs.pastTrailers = true
  2500  	if !f.StreamEnded() {
  2501  		// We expect that any headers for trailers also
  2502  		// has END_STREAM.
  2503  		return ConnectionError(ErrCodeProtocol)
  2504  	}
  2505  	if len(f.PseudoFields()) > 0 {
  2506  		// No pseudo header fields are defined for trailers.
  2507  		// TODO: ConnectionError might be overly harsh? Check.
  2508  		return ConnectionError(ErrCodeProtocol)
  2509  	}
  2510  
  2511  	trailer := make(Header)
  2512  	for _, hf := range f.RegularFields() {
  2513  		key := httpcommon.CanonicalHeader(hf.Name)
  2514  		trailer[key] = append(trailer[key], hf.Value)
  2515  	}
  2516  	cs.trailer = trailer
  2517  
  2518  	rl.endStream(cs)
  2519  	return nil
  2520  }
  2521  
  2522  // transportResponseBody is the concrete type of Transport.RoundTrip's
  2523  // Response.Body. It is an io.ReadCloser.
  2524  type transportResponseBody struct {
  2525  	cs *clientStream
  2526  }
  2527  
  2528  func (b transportResponseBody) Read(p []byte) (n int, err error) {
  2529  	cs := b.cs
  2530  	cc := cs.cc
  2531  
  2532  	if cs.readErr != nil {
  2533  		return 0, cs.readErr
  2534  	}
  2535  	n, err = b.cs.bufPipe.Read(p)
  2536  	if cs.bytesRemain != -1 {
  2537  		if int64(n) > cs.bytesRemain {
  2538  			n = int(cs.bytesRemain)
  2539  			if err == nil {
  2540  				err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
  2541  				cs.abortStream(err)
  2542  			}
  2543  			cs.readErr = err
  2544  			return int(cs.bytesRemain), err
  2545  		}
  2546  		cs.bytesRemain -= int64(n)
  2547  		if err == io.EOF && cs.bytesRemain > 0 {
  2548  			err = io.ErrUnexpectedEOF
  2549  			cs.readErr = err
  2550  			return n, err
  2551  		}
  2552  	}
  2553  	if n == 0 {
  2554  		// No flow control tokens to send back.
  2555  		return
  2556  	}
  2557  
  2558  	cc.mu.Lock()
  2559  	connAdd := cc.inflow.add(n)
  2560  	var streamAdd int32
  2561  	if err == nil { // No need to refresh if the stream is over or failed.
  2562  		streamAdd = cs.inflow.add(n)
  2563  	}
  2564  	cc.mu.Unlock()
  2565  
  2566  	if connAdd != 0 || streamAdd != 0 {
  2567  		cc.wmu.Lock()
  2568  		defer cc.wmu.Unlock()
  2569  		if connAdd != 0 {
  2570  			cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
  2571  		}
  2572  		if streamAdd != 0 {
  2573  			cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
  2574  		}
  2575  		cc.bw.Flush()
  2576  	}
  2577  	return
  2578  }
  2579  
  2580  var errClosedResponseBody = errors.New("http2: response body closed")
  2581  
  2582  func (b transportResponseBody) Close() error {
  2583  	cs := b.cs
  2584  	cc := cs.cc
  2585  
  2586  	cs.bufPipe.BreakWithError(errClosedResponseBody)
  2587  	cs.abortStream(errClosedResponseBody)
  2588  
  2589  	unread := cs.bufPipe.Len()
  2590  	if unread > 0 {
  2591  		cc.mu.Lock()
  2592  		// Return connection-level flow control.
  2593  		connAdd := cc.inflow.add(unread)
  2594  		cc.mu.Unlock()
  2595  
  2596  		// TODO(dneil): Acquiring this mutex can block indefinitely.
  2597  		// Move flow control return to a goroutine?
  2598  		cc.wmu.Lock()
  2599  		// Return connection-level flow control.
  2600  		if connAdd > 0 {
  2601  			cc.fr.WriteWindowUpdate(0, uint32(connAdd))
  2602  		}
  2603  		cc.bw.Flush()
  2604  		cc.wmu.Unlock()
  2605  	}
  2606  
  2607  	select {
  2608  	case <-cs.donec:
  2609  	case <-cs.ctx.Done():
  2610  		// See golang/go#49366: The net/http package can cancel the
  2611  		// request context after the response body is fully read.
  2612  		// Don't treat this as an error.
  2613  		return nil
  2614  	case <-cs.reqCancel:
  2615  		return errRequestCanceled
  2616  	}
  2617  	return nil
  2618  }
  2619  
  2620  func (rl *clientConnReadLoop) processData(f *DataFrame) error {
  2621  	cc := rl.cc
  2622  	cs := rl.streamByID(f.StreamID, headerOrDataFrame)
  2623  	data := f.Data()
  2624  	if cs == nil {
  2625  		cc.mu.Lock()
  2626  		neverSent := cc.nextStreamID
  2627  		cc.mu.Unlock()
  2628  		if f.StreamID >= neverSent {
  2629  			// We never asked for this.
  2630  			cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
  2631  			return ConnectionError(ErrCodeProtocol)
  2632  		}
  2633  		// We probably did ask for this, but canceled. Just ignore it.
  2634  		// TODO: be stricter here? only silently ignore things which
  2635  		// we canceled, but not things which were closed normally
  2636  		// by the peer? Tough without accumulating too much state.
  2637  
  2638  		// But at least return their flow control:
  2639  		if f.Length > 0 {
  2640  			cc.mu.Lock()
  2641  			ok := cc.inflow.take(f.Length)
  2642  			connAdd := cc.inflow.add(int(f.Length))
  2643  			cc.mu.Unlock()
  2644  			if !ok {
  2645  				return ConnectionError(ErrCodeFlowControl)
  2646  			}
  2647  			if connAdd > 0 {
  2648  				cc.wmu.Lock()
  2649  				cc.fr.WriteWindowUpdate(0, uint32(connAdd))
  2650  				cc.bw.Flush()
  2651  				cc.wmu.Unlock()
  2652  			}
  2653  		}
  2654  		return nil
  2655  	}
  2656  	if cs.readClosed {
  2657  		cc.logf("protocol error: received DATA after END_STREAM")
  2658  		rl.endStreamError(cs, StreamError{
  2659  			StreamID: f.StreamID,
  2660  			Code:     ErrCodeProtocol,
  2661  		})
  2662  		return nil
  2663  	}
  2664  	if !cs.pastHeaders {
  2665  		cc.logf("protocol error: received DATA before a HEADERS frame")
  2666  		rl.endStreamError(cs, StreamError{
  2667  			StreamID: f.StreamID,
  2668  			Code:     ErrCodeProtocol,
  2669  		})
  2670  		return nil
  2671  	}
  2672  	if f.Length > 0 {
  2673  		if cs.isHead && len(data) > 0 {
  2674  			cc.logf("protocol error: received DATA on a HEAD request")
  2675  			rl.endStreamError(cs, StreamError{
  2676  				StreamID: f.StreamID,
  2677  				Code:     ErrCodeProtocol,
  2678  			})
  2679  			return nil
  2680  		}
  2681  		// Check connection-level flow control.
  2682  		cc.mu.Lock()
  2683  		if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
  2684  			cc.mu.Unlock()
  2685  			return ConnectionError(ErrCodeFlowControl)
  2686  		}
  2687  		// Return any padded flow control now, since we won't
  2688  		// refund it later on body reads.
  2689  		var refund int
  2690  		if pad := int(f.Length) - len(data); pad > 0 {
  2691  			refund += pad
  2692  		}
  2693  
  2694  		didReset := false
  2695  		var err error
  2696  		if len(data) > 0 {
  2697  			if _, err = cs.bufPipe.Write(data); err != nil {
  2698  				// Return len(data) now if the stream is already closed,
  2699  				// since data will never be read.
  2700  				didReset = true
  2701  				refund += len(data)
  2702  			}
  2703  		}
  2704  
  2705  		sendConn := cc.inflow.add(refund)
  2706  		var sendStream int32
  2707  		if !didReset {
  2708  			sendStream = cs.inflow.add(refund)
  2709  		}
  2710  		cc.mu.Unlock()
  2711  
  2712  		if sendConn > 0 || sendStream > 0 {
  2713  			cc.wmu.Lock()
  2714  			if sendConn > 0 {
  2715  				cc.fr.WriteWindowUpdate(0, uint32(sendConn))
  2716  			}
  2717  			if sendStream > 0 {
  2718  				cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
  2719  			}
  2720  			cc.bw.Flush()
  2721  			cc.wmu.Unlock()
  2722  		}
  2723  
  2724  		if err != nil {
  2725  			rl.endStreamError(cs, err)
  2726  			return nil
  2727  		}
  2728  	}
  2729  
  2730  	if f.StreamEnded() {
  2731  		rl.endStream(cs)
  2732  	}
  2733  	return nil
  2734  }
  2735  
  2736  func (rl *clientConnReadLoop) endStream(cs *clientStream) {
  2737  	// TODO: check that any declared content-length matches, like
  2738  	// server.go's (*stream).endStream method.
  2739  	if !cs.readClosed {
  2740  		cs.readClosed = true
  2741  		// Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
  2742  		// race condition: The caller can read io.EOF from Response.Body
  2743  		// and close the body before we close cs.peerClosed, causing
  2744  		// cleanupWriteRequest to send a RST_STREAM.
  2745  		rl.cc.mu.Lock()
  2746  		defer rl.cc.mu.Unlock()
  2747  		cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
  2748  		close(cs.peerClosed)
  2749  	}
  2750  }
  2751  
  2752  func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
  2753  	cs.readAborted = true
  2754  	cs.abortStream(err)
  2755  }
  2756  
  2757  func (rl *clientConnReadLoop) endStreamErrorLocked(cs *clientStream, err error) {
  2758  	cs.readAborted = true
  2759  	cs.abortStreamLocked(err)
  2760  }
  2761  
  2762  // Constants passed to streamByID for documentation purposes.
  2763  const (
  2764  	headerOrDataFrame    = true
  2765  	notHeaderOrDataFrame = false
  2766  )
  2767  
  2768  // streamByID returns the stream with the given id, or nil if no stream has that id.
  2769  // If headerOrData is true, it clears rst.StreamPingsBlocked.
  2770  func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
  2771  	rl.cc.mu.Lock()
  2772  	defer rl.cc.mu.Unlock()
  2773  	if headerOrData {
  2774  		// Work around an unfortunate gRPC behavior.
  2775  		// See comment on ClientConn.rstStreamPingsBlocked for details.
  2776  		rl.cc.rstStreamPingsBlocked = false
  2777  	}
  2778  	rl.cc.readBeforeStreamID = rl.cc.nextStreamID
  2779  	cs := rl.cc.streams[id]
  2780  	if cs != nil && !cs.readAborted {
  2781  		return cs
  2782  	}
  2783  	return nil
  2784  }
  2785  
  2786  func (cs *clientStream) copyTrailers() {
  2787  	for k, vv := range cs.trailer {
  2788  		t := cs.resTrailer
  2789  		if *t == nil {
  2790  			*t = make(Header)
  2791  		}
  2792  		(*t)[k] = vv
  2793  	}
  2794  }
  2795  
  2796  func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
  2797  	cc := rl.cc
  2798  	cc.t.connPool().MarkDead(cc)
  2799  	if f.ErrCode != 0 {
  2800  		// TODO: deal with GOAWAY more. particularly the error code
  2801  		cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
  2802  		if fn := cc.t.CountError; fn != nil {
  2803  			fn("recv_goaway_" + f.ErrCode.stringToken())
  2804  		}
  2805  	}
  2806  	cc.setGoAway(f)
  2807  	return nil
  2808  }
  2809  
  2810  func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
  2811  	cc := rl.cc
  2812  	// Locking both mu and wmu here allows frame encoding to read settings with only wmu held.
  2813  	// Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless.
  2814  	cc.wmu.Lock()
  2815  	defer cc.wmu.Unlock()
  2816  
  2817  	if err := rl.processSettingsNoWrite(f); err != nil {
  2818  		return err
  2819  	}
  2820  	if !f.IsAck() {
  2821  		cc.fr.WriteSettingsAck()
  2822  		cc.bw.Flush()
  2823  	}
  2824  	return nil
  2825  }
  2826  
  2827  func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
  2828  	cc := rl.cc
  2829  	defer cc.maybeCallStateHook()
  2830  	cc.mu.Lock()
  2831  	defer cc.mu.Unlock()
  2832  
  2833  	if f.IsAck() {
  2834  		if cc.wantSettingsAck {
  2835  			cc.wantSettingsAck = false
  2836  			return nil
  2837  		}
  2838  		return ConnectionError(ErrCodeProtocol)
  2839  	}
  2840  
  2841  	var seenMaxConcurrentStreams bool
  2842  	err := f.ForeachSetting(func(s Setting) error {
  2843  		switch s.ID {
  2844  		case SettingMaxFrameSize:
  2845  			cc.maxFrameSize = s.Val
  2846  		case SettingMaxConcurrentStreams:
  2847  			cc.maxConcurrentStreams = s.Val
  2848  			seenMaxConcurrentStreams = true
  2849  		case SettingMaxHeaderListSize:
  2850  			cc.peerMaxHeaderListSize = uint64(s.Val)
  2851  		case SettingInitialWindowSize:
  2852  			// Values above the maximum flow-control
  2853  			// window size of 2^31-1 MUST be treated as a
  2854  			// connection error (Section 5.4.1) of type
  2855  			// FLOW_CONTROL_ERROR.
  2856  			if s.Val > math.MaxInt32 {
  2857  				return ConnectionError(ErrCodeFlowControl)
  2858  			}
  2859  
  2860  			// Adjust flow control of currently-open
  2861  			// frames by the difference of the old initial
  2862  			// window size and this one.
  2863  			delta := int32(s.Val) - int32(cc.initialWindowSize)
  2864  			for _, cs := range cc.streams {
  2865  				cs.flow.add(delta)
  2866  			}
  2867  			cc.cond.Broadcast()
  2868  
  2869  			cc.initialWindowSize = s.Val
  2870  		case SettingHeaderTableSize:
  2871  			cc.henc.SetMaxDynamicTableSize(s.Val)
  2872  			cc.peerMaxHeaderTableSize = s.Val
  2873  		case SettingEnableConnectProtocol:
  2874  			if err := s.Valid(); err != nil {
  2875  				return err
  2876  			}
  2877  			// If the peer wants to send us SETTINGS_ENABLE_CONNECT_PROTOCOL,
  2878  			// we require that it do so in the first SETTINGS frame.
  2879  			//
  2880  			// When we attempt to use extended CONNECT, we wait for the first
  2881  			// SETTINGS frame to see if the server supports it. If we let the
  2882  			// server enable the feature with a later SETTINGS frame, then
  2883  			// users will see inconsistent results depending on whether we've
  2884  			// seen that frame or not.
  2885  			if !cc.seenSettings {
  2886  				cc.extendedConnectAllowed = s.Val == 1
  2887  			}
  2888  		default:
  2889  			cc.vlogf("Unhandled Setting: %v", s)
  2890  		}
  2891  		return nil
  2892  	})
  2893  	if err != nil {
  2894  		return err
  2895  	}
  2896  
  2897  	if !cc.seenSettings {
  2898  		if !seenMaxConcurrentStreams {
  2899  			// This was the servers initial SETTINGS frame and it
  2900  			// didn't contain a MAX_CONCURRENT_STREAMS field so
  2901  			// increase the number of concurrent streams this
  2902  			// connection can establish to our default.
  2903  			cc.maxConcurrentStreams = defaultMaxConcurrentStreams
  2904  		}
  2905  		close(cc.seenSettingsChan)
  2906  		cc.seenSettings = true
  2907  	}
  2908  
  2909  	return nil
  2910  }
  2911  
  2912  func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
  2913  	cc := rl.cc
  2914  	cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
  2915  	if f.StreamID != 0 && cs == nil {
  2916  		return nil
  2917  	}
  2918  
  2919  	cc.mu.Lock()
  2920  	defer cc.mu.Unlock()
  2921  
  2922  	fl := &cc.flow
  2923  	if cs != nil {
  2924  		fl = &cs.flow
  2925  	}
  2926  	if !fl.add(int32(f.Increment)) {
  2927  		// For stream, the sender sends RST_STREAM with an error code of FLOW_CONTROL_ERROR
  2928  		if cs != nil {
  2929  			rl.endStreamErrorLocked(cs, StreamError{
  2930  				StreamID: f.StreamID,
  2931  				Code:     ErrCodeFlowControl,
  2932  			})
  2933  			return nil
  2934  		}
  2935  
  2936  		return ConnectionError(ErrCodeFlowControl)
  2937  	}
  2938  	cc.cond.Broadcast()
  2939  	return nil
  2940  }
  2941  
  2942  func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
  2943  	cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
  2944  	if cs == nil {
  2945  		// TODO: return error if server tries to RST_STREAM an idle stream
  2946  		return nil
  2947  	}
  2948  	serr := streamError(cs.ID, f.ErrCode)
  2949  	serr.Cause = errFromPeer
  2950  	if f.ErrCode == ErrCodeProtocol {
  2951  		rl.cc.SetDoNotReuse()
  2952  	}
  2953  	if fn := cs.cc.t.CountError; fn != nil {
  2954  		fn("recv_rststream_" + f.ErrCode.stringToken())
  2955  	}
  2956  	cs.abortStream(serr)
  2957  
  2958  	cs.bufPipe.CloseWithError(serr)
  2959  	return nil
  2960  }
  2961  
  2962  // Ping sends a PING frame to the server and waits for the ack.
  2963  func (cc *ClientConn) Ping(ctx context.Context) error {
  2964  	c := make(chan struct{})
  2965  	// Generate a random payload
  2966  	var p [8]byte
  2967  	for {
  2968  		if _, err := rand.Read(p[:]); err != nil {
  2969  			return err
  2970  		}
  2971  		cc.mu.Lock()
  2972  		// check for dup before insert
  2973  		if _, found := cc.pings[p]; !found {
  2974  			cc.pings[p] = c
  2975  			cc.mu.Unlock()
  2976  			break
  2977  		}
  2978  		cc.mu.Unlock()
  2979  	}
  2980  	var pingError error
  2981  	errc := make(chan struct{})
  2982  	go func() {
  2983  		cc.wmu.Lock()
  2984  		defer cc.wmu.Unlock()
  2985  		if pingError = cc.fr.WritePing(false, p); pingError != nil {
  2986  			close(errc)
  2987  			return
  2988  		}
  2989  		if pingError = cc.bw.Flush(); pingError != nil {
  2990  			close(errc)
  2991  			return
  2992  		}
  2993  	}()
  2994  	select {
  2995  	case <-c:
  2996  		return nil
  2997  	case <-errc:
  2998  		return pingError
  2999  	case <-ctx.Done():
  3000  		return ctx.Err()
  3001  	case <-cc.readerDone:
  3002  		// connection closed
  3003  		return cc.readerErr
  3004  	}
  3005  }
  3006  
  3007  func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
  3008  	if f.IsAck() {
  3009  		cc := rl.cc
  3010  		defer cc.maybeCallStateHook()
  3011  		cc.mu.Lock()
  3012  		defer cc.mu.Unlock()
  3013  		// If ack, notify listener if any
  3014  		if c, ok := cc.pings[f.Data]; ok {
  3015  			close(c)
  3016  			delete(cc.pings, f.Data)
  3017  		}
  3018  		if cc.pendingResets > 0 {
  3019  			// See clientStream.cleanupWriteRequest.
  3020  			cc.pendingResets = 0
  3021  			cc.rstStreamPingsBlocked = true
  3022  			cc.cond.Broadcast()
  3023  		}
  3024  		return nil
  3025  	}
  3026  	cc := rl.cc
  3027  	cc.wmu.Lock()
  3028  	defer cc.wmu.Unlock()
  3029  	if err := cc.fr.WritePing(true, f.Data); err != nil {
  3030  		return err
  3031  	}
  3032  	return cc.bw.Flush()
  3033  }
  3034  
  3035  func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
  3036  	// We told the peer we don't want them.
  3037  	// Spec says:
  3038  	// "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
  3039  	// setting of the peer endpoint is set to 0. An endpoint that
  3040  	// has set this setting and has received acknowledgement MUST
  3041  	// treat the receipt of a PUSH_PROMISE frame as a connection
  3042  	// error (Section 5.4.1) of type PROTOCOL_ERROR."
  3043  	return ConnectionError(ErrCodeProtocol)
  3044  }
  3045  
  3046  // writeStreamReset sends a RST_STREAM frame.
  3047  // When ping is true, it also sends a PING frame with a random payload.
  3048  func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
  3049  	// TODO: map err to more interesting error codes, once the
  3050  	// HTTP community comes up with some. But currently for
  3051  	// RST_STREAM there's no equivalent to GOAWAY frame's debug
  3052  	// data, and the error codes are all pretty vague ("cancel").
  3053  	cc.wmu.Lock()
  3054  	cc.fr.WriteRSTStream(streamID, code)
  3055  	if ping {
  3056  		var payload [8]byte
  3057  		rand.Read(payload[:])
  3058  		cc.fr.WritePing(false, payload)
  3059  	}
  3060  	cc.bw.Flush()
  3061  	cc.wmu.Unlock()
  3062  }
  3063  
  3064  var (
  3065  	errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
  3066  	errRequestHeaderListSize  = httpcommon.ErrRequestHeaderListSize
  3067  )
  3068  
  3069  func (cc *ClientConn) logf(format string, args ...interface{}) {
  3070  	cc.t.logf(format, args...)
  3071  }
  3072  
  3073  func (cc *ClientConn) vlogf(format string, args ...interface{}) {
  3074  	cc.t.vlogf(format, args...)
  3075  }
  3076  
  3077  func (t *Transport) vlogf(format string, args ...interface{}) {
  3078  	if VerboseLogs {
  3079  		t.logf(format, args...)
  3080  	}
  3081  }
  3082  
  3083  func (t *Transport) logf(format string, args ...interface{}) {
  3084  	log.Printf(format, args...)
  3085  }
  3086  
  3087  type missingBody struct{}
  3088  
  3089  func (missingBody) Close() error             { return nil }
  3090  func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
  3091  
  3092  func strSliceContains(ss []string, s string) bool {
  3093  	for _, v := range ss {
  3094  		if v == s {
  3095  			return true
  3096  		}
  3097  	}
  3098  	return false
  3099  }
  3100  
  3101  type erringRoundTripper struct{ err error }
  3102  
  3103  func (rt erringRoundTripper) RoundTripErr() error                               { return rt.err }
  3104  func (rt erringRoundTripper) RoundTrip(*ClientRequest) (*ClientResponse, error) { return nil, rt.err }
  3105  
  3106  var errConcurrentReadOnResBody = errors.New("http2: concurrent read on response body")
  3107  
  3108  // gzipReader wraps a response body so it can lazily
  3109  // get gzip.Reader from the pool on the first call to Read.
  3110  // After Close is called it puts gzip.Reader to the pool immediately
  3111  // if there is no Read in progress or later when Read completes.
  3112  type gzipReader struct {
  3113  	_    incomparable
  3114  	body io.ReadCloser // underlying Response.Body
  3115  	mu   sync.Mutex    // guards zr and zerr
  3116  	zr   *gzip.Reader  // stores gzip reader from the pool between reads
  3117  	zerr error         // sticky gzip reader init error or sentinel value to detect concurrent read and read after close
  3118  }
  3119  
  3120  type eofReader struct{}
  3121  
  3122  func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
  3123  func (eofReader) ReadByte() (byte, error)  { return 0, io.EOF }
  3124  
  3125  var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
  3126  
  3127  // gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
  3128  func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
  3129  	zr := gzipPool.Get().(*gzip.Reader)
  3130  	if err := zr.Reset(r); err != nil {
  3131  		gzipPoolPut(zr)
  3132  		return nil, err
  3133  	}
  3134  	return zr, nil
  3135  }
  3136  
  3137  // gzipPoolPut puts a gzip.Reader back into the pool.
  3138  func gzipPoolPut(zr *gzip.Reader) {
  3139  	// Reset will allocate bufio.Reader if we pass it anything
  3140  	// other than a flate.Reader, so ensure that it's getting one.
  3141  	var r flate.Reader = eofReader{}
  3142  	zr.Reset(r)
  3143  	gzipPool.Put(zr)
  3144  }
  3145  
  3146  // acquire returns a gzip.Reader for reading response body.
  3147  // The reader must be released after use.
  3148  func (gz *gzipReader) acquire() (*gzip.Reader, error) {
  3149  	gz.mu.Lock()
  3150  	defer gz.mu.Unlock()
  3151  	if gz.zerr != nil {
  3152  		return nil, gz.zerr
  3153  	}
  3154  	if gz.zr == nil {
  3155  		gz.zr, gz.zerr = gzipPoolGet(gz.body)
  3156  		if gz.zerr != nil {
  3157  			return nil, gz.zerr
  3158  		}
  3159  	}
  3160  	ret := gz.zr
  3161  	gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
  3162  	return ret, nil
  3163  }
  3164  
  3165  // release returns the gzip.Reader to the pool if Close was called during Read.
  3166  func (gz *gzipReader) release(zr *gzip.Reader) {
  3167  	gz.mu.Lock()
  3168  	defer gz.mu.Unlock()
  3169  	if gz.zerr == errConcurrentReadOnResBody {
  3170  		gz.zr, gz.zerr = zr, nil
  3171  	} else { // fs.ErrClosed
  3172  		gzipPoolPut(zr)
  3173  	}
  3174  }
  3175  
  3176  // close returns the gzip.Reader to the pool immediately or
  3177  // signals release to do so after Read completes.
  3178  func (gz *gzipReader) close() {
  3179  	gz.mu.Lock()
  3180  	defer gz.mu.Unlock()
  3181  	if gz.zerr == nil && gz.zr != nil {
  3182  		gzipPoolPut(gz.zr)
  3183  		gz.zr = nil
  3184  	}
  3185  	gz.zerr = fs.ErrClosed
  3186  }
  3187  
  3188  func (gz *gzipReader) Read(p []byte) (n int, err error) {
  3189  	zr, err := gz.acquire()
  3190  	if err != nil {
  3191  		return 0, err
  3192  	}
  3193  	defer gz.release(zr)
  3194  
  3195  	return zr.Read(p)
  3196  }
  3197  
  3198  func (gz *gzipReader) Close() error {
  3199  	gz.close()
  3200  
  3201  	return gz.body.Close()
  3202  }
  3203  
  3204  // isConnectionCloseRequest reports whether req should use its own
  3205  // connection for a single request and then close the connection.
  3206  func isConnectionCloseRequest(req *ClientRequest) bool {
  3207  	return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
  3208  }
  3209  
  3210  // netHTTPClientConn wraps ClientConn and implements the interface net/http expects from
  3211  // the RoundTripper returned by NewClientConn.
  3212  type NetHTTPClientConn struct {
  3213  	cc *ClientConn
  3214  }
  3215  
  3216  func (cc NetHTTPClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
  3217  	return cc.cc.RoundTrip(req)
  3218  }
  3219  
  3220  func (cc NetHTTPClientConn) Close() error {
  3221  	return cc.cc.Close()
  3222  }
  3223  
  3224  func (cc NetHTTPClientConn) Err() error {
  3225  	cc.cc.mu.Lock()
  3226  	defer cc.cc.mu.Unlock()
  3227  	if cc.cc.closed {
  3228  		return errors.New("connection closed")
  3229  	}
  3230  	return nil
  3231  }
  3232  
  3233  func (cc NetHTTPClientConn) Reserve() error {
  3234  	defer cc.cc.maybeCallStateHook()
  3235  	cc.cc.mu.Lock()
  3236  	defer cc.cc.mu.Unlock()
  3237  	if !cc.cc.canReserveLocked() {
  3238  		return errors.New("connection is unavailable")
  3239  	}
  3240  	cc.cc.streamsReserved++
  3241  	return nil
  3242  }
  3243  
  3244  func (cc NetHTTPClientConn) Release() {
  3245  	defer cc.cc.maybeCallStateHook()
  3246  	cc.cc.mu.Lock()
  3247  	defer cc.cc.mu.Unlock()
  3248  	// We don't complain if streamsReserved is 0.
  3249  	//
  3250  	// This is consistent with RoundTrip: both Release and RoundTrip will
  3251  	// consume a reservation iff one exists.
  3252  	if cc.cc.streamsReserved > 0 {
  3253  		cc.cc.streamsReserved--
  3254  	}
  3255  }
  3256  
  3257  func (cc NetHTTPClientConn) Available() int {
  3258  	cc.cc.mu.Lock()
  3259  	defer cc.cc.mu.Unlock()
  3260  	return cc.cc.availableLocked()
  3261  }
  3262  
  3263  func (cc NetHTTPClientConn) InFlight() int {
  3264  	cc.cc.mu.Lock()
  3265  	defer cc.cc.mu.Unlock()
  3266  	return cc.cc.currentRequestCountLocked()
  3267  }
  3268  
  3269  func (cc *ClientConn) maybeCallStateHook() {
  3270  	if cc.internalStateHook != nil {
  3271  		cc.internalStateHook()
  3272  	}
  3273  }
  3274  
  3275  func (t *Transport) idleConnTimeout() time.Duration {
  3276  	// to keep things backwards compatible, we use non-zero values of
  3277  	// IdleConnTimeout, followed by using the IdleConnTimeout on the underlying
  3278  	// http1 transport, followed by 0
  3279  	if t.IdleConnTimeout != 0 {
  3280  		return t.IdleConnTimeout
  3281  	}
  3282  
  3283  	if t.t1 != nil {
  3284  		return t.t1.IdleConnTimeout()
  3285  	}
  3286  
  3287  	return 0
  3288  }
  3289  
  3290  func traceGetConn(req *ClientRequest, hostPort string) {
  3291  	trace := httptrace.ContextClientTrace(req.Context)
  3292  	if trace == nil || trace.GetConn == nil {
  3293  		return
  3294  	}
  3295  	trace.GetConn(hostPort)
  3296  }
  3297  
  3298  func traceGotConn(req *ClientRequest, cc *ClientConn, reused bool) {
  3299  	trace := httptrace.ContextClientTrace(req.Context)
  3300  	if trace == nil || trace.GotConn == nil {
  3301  		return
  3302  	}
  3303  	ci := httptrace.GotConnInfo{Conn: cc.tconn}
  3304  	ci.Reused = reused
  3305  	cc.mu.Lock()
  3306  	ci.WasIdle = len(cc.streams) == 0 && reused
  3307  	if ci.WasIdle && !cc.lastActive.IsZero() {
  3308  		ci.IdleTime = time.Since(cc.lastActive)
  3309  	}
  3310  	cc.mu.Unlock()
  3311  
  3312  	trace.GotConn(ci)
  3313  }
  3314  
  3315  func traceWroteHeaders(trace *httptrace.ClientTrace) {
  3316  	if trace != nil && trace.WroteHeaders != nil {
  3317  		trace.WroteHeaders()
  3318  	}
  3319  }
  3320  
  3321  func traceGot100Continue(trace *httptrace.ClientTrace) {
  3322  	if trace != nil && trace.Got100Continue != nil {
  3323  		trace.Got100Continue()
  3324  	}
  3325  }
  3326  
  3327  func traceWait100Continue(trace *httptrace.ClientTrace) {
  3328  	if trace != nil && trace.Wait100Continue != nil {
  3329  		trace.Wait100Continue()
  3330  	}
  3331  }
  3332  
  3333  func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
  3334  	if trace != nil && trace.WroteRequest != nil {
  3335  		trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
  3336  	}
  3337  }
  3338  
  3339  func traceFirstResponseByte(trace *httptrace.ClientTrace) {
  3340  	if trace != nil && trace.GotFirstResponseByte != nil {
  3341  		trace.GotFirstResponseByte()
  3342  	}
  3343  }
  3344  
  3345  func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
  3346  	if trace != nil {
  3347  		return trace.Got1xxResponse
  3348  	}
  3349  	return nil
  3350  }
  3351  
  3352  // dialTLSWithContext uses tls.Dialer, added in Go 1.15, to open a TLS
  3353  // connection.
  3354  func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
  3355  	dialer := &tls.Dialer{
  3356  		Config: cfg,
  3357  	}
  3358  	cn, err := dialer.DialContext(ctx, network, addr)
  3359  	if err != nil {
  3360  		return nil, err
  3361  	}
  3362  	tlsCn := cn.(*tls.Conn) // DialContext comment promises this will always succeed
  3363  	return tlsCn, nil
  3364  }
  3365  

View as plain text