Source file src/net/http/internal/http2/client_conn_pool.go

     1  // Copyright 2015 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Transport code's client connection pooling.
     6  
     7  package http2
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"net"
    13  	"sync"
    14  )
    15  
    16  // ClientConnPool manages a pool of HTTP/2 client connections.
    17  type ClientConnPool interface {
    18  	// GetClientConn returns a specific HTTP/2 connection (usually
    19  	// a TLS-TCP connection) to an HTTP/2 server. On success, the
    20  	// returned ClientConn accounts for the upcoming RoundTrip
    21  	// call, so the caller should not omit it. If the caller needs
    22  	// to, ClientConn.RoundTrip can be called with a bogus
    23  	// new(ClientRequest) to release the stream reservation.
    24  	GetClientConn(req *ClientRequest, addr string) (*ClientConn, error)
    25  	MarkDead(*ClientConn)
    26  }
    27  
    28  // clientConnPoolIdleCloser is the interface implemented by ClientConnPool
    29  // implementations which can close their idle connections.
    30  type clientConnPoolIdleCloser interface {
    31  	ClientConnPool
    32  	closeIdleConnections()
    33  }
    34  
    35  var (
    36  	_ clientConnPoolIdleCloser = (*clientConnPool)(nil)
    37  	_ clientConnPoolIdleCloser = noDialClientConnPool{}
    38  )
    39  
    40  // TODO: use singleflight for dialing and addConnCalls?
    41  type clientConnPool struct {
    42  	t *Transport
    43  
    44  	mu sync.Mutex // TODO: maybe switch to RWMutex
    45  	// TODO: add support for sharing conns based on cert names
    46  	// (e.g. share conn for googleapis.com and appspot.com)
    47  	conns        map[string][]*ClientConn // key is host:port
    48  	dialing      map[string]*dialCall     // currently in-flight dials
    49  	keys         map[*ClientConn][]string
    50  	addConnCalls map[string]*addConnCall // in-flight addConnIfNeeded calls
    51  }
    52  
    53  func (p *clientConnPool) GetClientConn(req *ClientRequest, addr string) (*ClientConn, error) {
    54  	return p.getClientConn(req, addr, dialOnMiss)
    55  }
    56  
    57  const (
    58  	dialOnMiss   = true
    59  	noDialOnMiss = false
    60  )
    61  
    62  func (p *clientConnPool) getClientConn(req *ClientRequest, addr string, dialOnMiss bool) (*ClientConn, error) {
    63  	// TODO(dneil): Dial a new connection when t.DisableKeepAlives is set?
    64  	if isConnectionCloseRequest(req) && dialOnMiss {
    65  		// It gets its own connection.
    66  		traceGetConn(req, addr)
    67  		const singleUse = true
    68  		cc, err := p.t.dialClientConn(req.Context, addr, singleUse)
    69  		if err != nil {
    70  			return nil, err
    71  		}
    72  		return cc, nil
    73  	}
    74  	for {
    75  		p.mu.Lock()
    76  		for _, cc := range p.conns[addr] {
    77  			if cc.ReserveNewRequest() {
    78  				// When a connection is presented to us by the net/http package,
    79  				// the GetConn hook has already been called.
    80  				// Don't call it a second time here.
    81  				if !cc.getConnCalled {
    82  					traceGetConn(req, addr)
    83  				}
    84  				cc.getConnCalled = false
    85  				p.mu.Unlock()
    86  				return cc, nil
    87  			}
    88  		}
    89  		if !dialOnMiss {
    90  			p.mu.Unlock()
    91  			return nil, ErrNoCachedConn
    92  		}
    93  		traceGetConn(req, addr)
    94  		call := p.getStartDialLocked(req.Context, addr)
    95  		p.mu.Unlock()
    96  		<-call.done
    97  		if shouldRetryDial(call, req) {
    98  			continue
    99  		}
   100  		cc, err := call.res, call.err
   101  		if err != nil {
   102  			return nil, err
   103  		}
   104  		if cc.ReserveNewRequest() {
   105  			return cc, nil
   106  		}
   107  	}
   108  }
   109  
   110  // dialCall is an in-flight Transport dial call to a host.
   111  type dialCall struct {
   112  	_ incomparable
   113  	p *clientConnPool
   114  	// the context associated with the request
   115  	// that created this dialCall
   116  	ctx  context.Context
   117  	done chan struct{} // closed when done
   118  	res  *ClientConn   // valid after done is closed
   119  	err  error         // valid after done is closed
   120  }
   121  
   122  // requires p.mu is held.
   123  func (p *clientConnPool) getStartDialLocked(ctx context.Context, addr string) *dialCall {
   124  	if call, ok := p.dialing[addr]; ok {
   125  		// A dial is already in-flight. Don't start another.
   126  		return call
   127  	}
   128  	call := &dialCall{p: p, done: make(chan struct{}), ctx: ctx}
   129  	if p.dialing == nil {
   130  		p.dialing = make(map[string]*dialCall)
   131  	}
   132  	p.dialing[addr] = call
   133  	go call.dial(call.ctx, addr)
   134  	return call
   135  }
   136  
   137  // run in its own goroutine.
   138  func (c *dialCall) dial(ctx context.Context, addr string) {
   139  	const singleUse = false // shared conn
   140  	c.res, c.err = c.p.t.dialClientConn(ctx, addr, singleUse)
   141  
   142  	c.p.mu.Lock()
   143  	delete(c.p.dialing, addr)
   144  	if c.err == nil {
   145  		c.p.addConnLocked(addr, c.res)
   146  	}
   147  	c.p.mu.Unlock()
   148  
   149  	close(c.done)
   150  }
   151  
   152  // addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
   153  // already exist. It coalesces concurrent calls with the same key.
   154  // This is used by the http1 Transport code when it creates a new connection. Because
   155  // the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
   156  // the protocol), it can get into a situation where it has multiple TLS connections.
   157  // This code decides which ones live or die.
   158  // The return value used is whether c was used.
   159  // c is never closed.
   160  func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c net.Conn) (used bool, err error) {
   161  	p.mu.Lock()
   162  	for _, cc := range p.conns[key] {
   163  		if cc.CanTakeNewRequest() {
   164  			p.mu.Unlock()
   165  			return false, nil
   166  		}
   167  	}
   168  	call, dup := p.addConnCalls[key]
   169  	if !dup {
   170  		if p.addConnCalls == nil {
   171  			p.addConnCalls = make(map[string]*addConnCall)
   172  		}
   173  		call = &addConnCall{
   174  			p:    p,
   175  			done: make(chan struct{}),
   176  		}
   177  		p.addConnCalls[key] = call
   178  		go call.run(t, key, c)
   179  	}
   180  	p.mu.Unlock()
   181  
   182  	<-call.done
   183  	if call.err != nil {
   184  		return false, call.err
   185  	}
   186  	return !dup, nil
   187  }
   188  
   189  type addConnCall struct {
   190  	_    incomparable
   191  	p    *clientConnPool
   192  	done chan struct{} // closed when done
   193  	err  error
   194  }
   195  
   196  func (c *addConnCall) run(t *Transport, key string, nc net.Conn) {
   197  	cc, err := t.newClientConn(nc, t.disableKeepAlives(), nil)
   198  
   199  	p := c.p
   200  	p.mu.Lock()
   201  	if err != nil {
   202  		c.err = err
   203  	} else {
   204  		cc.getConnCalled = true // already called by the net/http package
   205  		p.addConnLocked(key, cc)
   206  	}
   207  	delete(p.addConnCalls, key)
   208  	p.mu.Unlock()
   209  	close(c.done)
   210  }
   211  
   212  // p.mu must be held
   213  func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
   214  	for _, v := range p.conns[key] {
   215  		if v == cc {
   216  			return
   217  		}
   218  	}
   219  	if p.conns == nil {
   220  		p.conns = make(map[string][]*ClientConn)
   221  	}
   222  	if p.keys == nil {
   223  		p.keys = make(map[*ClientConn][]string)
   224  	}
   225  	p.conns[key] = append(p.conns[key], cc)
   226  	p.keys[cc] = append(p.keys[cc], key)
   227  }
   228  
   229  func (p *clientConnPool) MarkDead(cc *ClientConn) {
   230  	p.mu.Lock()
   231  	defer p.mu.Unlock()
   232  	for _, key := range p.keys[cc] {
   233  		vv, ok := p.conns[key]
   234  		if !ok {
   235  			continue
   236  		}
   237  		newList := filterOutClientConn(vv, cc)
   238  		if len(newList) > 0 {
   239  			p.conns[key] = newList
   240  		} else {
   241  			delete(p.conns, key)
   242  		}
   243  	}
   244  	delete(p.keys, cc)
   245  }
   246  
   247  func (p *clientConnPool) closeIdleConnections() {
   248  	p.mu.Lock()
   249  	defer p.mu.Unlock()
   250  	// TODO: don't close a cc if it was just added to the pool
   251  	// milliseconds ago and has never been used. There's currently
   252  	// a small race window with the HTTP/1 Transport's integration
   253  	// where it can add an idle conn just before using it, and
   254  	// somebody else can concurrently call CloseIdleConns and
   255  	// break some caller's RoundTrip.
   256  	for _, vv := range p.conns {
   257  		for _, cc := range vv {
   258  			cc.closeIfIdle()
   259  		}
   260  	}
   261  }
   262  
   263  func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
   264  	out := in[:0]
   265  	for _, v := range in {
   266  		if v != exclude {
   267  			out = append(out, v)
   268  		}
   269  	}
   270  	// If we filtered it out, zero out the last item to prevent
   271  	// the GC from seeing it.
   272  	if len(in) != len(out) {
   273  		in[len(in)-1] = nil
   274  	}
   275  	return out
   276  }
   277  
   278  // noDialClientConnPool is an implementation of http2.ClientConnPool
   279  // which never dials. We let the HTTP/1.1 client dial and use its TLS
   280  // connection instead.
   281  type noDialClientConnPool struct{ *clientConnPool }
   282  
   283  func (p noDialClientConnPool) GetClientConn(req *ClientRequest, addr string) (*ClientConn, error) {
   284  	return p.getClientConn(req, addr, noDialOnMiss)
   285  }
   286  
   287  // shouldRetryDial reports whether the current request should
   288  // retry dialing after the call finished unsuccessfully, for example
   289  // if the dial was canceled because of a context cancellation or
   290  // deadline expiry.
   291  func shouldRetryDial(call *dialCall, req *ClientRequest) bool {
   292  	if call.err == nil {
   293  		// No error, no need to retry
   294  		return false
   295  	}
   296  	if call.ctx == req.Context {
   297  		// If the call has the same context as the request, the dial
   298  		// should not be retried, since any cancellation will have come
   299  		// from this request.
   300  		return false
   301  	}
   302  	if !errors.Is(call.err, context.Canceled) && !errors.Is(call.err, context.DeadlineExceeded) {
   303  		// If the call error is not because of a context cancellation or a deadline expiry,
   304  		// the dial should not be retried.
   305  		return false
   306  	}
   307  	// Only retry if the error is a context cancellation error or deadline expiry
   308  	// and the context associated with the call was canceled or expired.
   309  	return call.ctx.Err() != nil
   310  }
   311  

View as plain text