1
2
3
4
5
6
7 package http2
8
9 import (
10 "bufio"
11 "bytes"
12 "compress/flate"
13 "compress/gzip"
14 "context"
15 "crypto/rand"
16 "crypto/tls"
17 "errors"
18 "fmt"
19 "io"
20 "io/fs"
21 "log"
22 "math"
23 "math/bits"
24 mathrand "math/rand"
25 "net"
26 "net/http/httptrace"
27 "net/http/internal"
28 "net/http/internal/httpcommon"
29 "net/textproto"
30 "slices"
31 "strconv"
32 "strings"
33 "sync"
34 "sync/atomic"
35 "time"
36
37 "golang.org/x/net/http/httpguts"
38 "golang.org/x/net/http2/hpack"
39 "golang.org/x/net/idna"
40 )
41
42 const (
43
44
45 transportDefaultConnFlow = 1 << 30
46
47
48
49
50 transportDefaultStreamFlow = 4 << 20
51
52 defaultUserAgent = "Go-http-client/2.0"
53
54
55
56
57 initialMaxConcurrentStreams = 100
58
59
60
61 defaultMaxConcurrentStreams = 1000
62 )
63
64
65
66
67
68 type Transport struct {
69
70
71
72
73
74
75
76 DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
77
78
79
80
81
82
83
84
85
86 DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
87
88
89
90 TLSClientConfig *tls.Config
91
92
93
94 ConnPool ClientConnPool
95
96
97
98
99
100
101
102
103
104 DisableCompression bool
105
106
107
108 AllowHTTP bool
109
110
111
112
113
114
115
116
117 MaxHeaderListSize uint32
118
119
120
121
122
123
124
125
126 MaxReadFrameSize uint32
127
128
129
130
131
132
133 MaxDecoderHeaderTableSize uint32
134
135
136
137
138
139 MaxEncoderHeaderTableSize uint32
140
141
142
143
144
145
146
147
148
149 StrictMaxConcurrentStreams bool
150
151
152
153
154
155 IdleConnTimeout time.Duration
156
157
158
159
160
161
162
163 ReadIdleTimeout time.Duration
164
165
166
167
168 PingTimeout time.Duration
169
170
171
172
173 WriteByteTimeout time.Duration
174
175
176
177
178
179 CountError func(errType string)
180
181 t1 TransportConfig
182
183 connPoolOnce sync.Once
184 connPoolOrDef ClientConnPool
185
186 *transportTestHooks
187 }
188
189
190
191
192
193 type transportTestHooks struct {
194 newclientconn func(*ClientConn)
195 }
196
197 func (t *Transport) maxHeaderListSize() uint32 {
198 n := t.t1.MaxResponseHeaderBytes()
199 if n > 0 {
200 n = adjustHTTP1MaxHeaderSize(n)
201 }
202 if n <= 0 {
203 return 10 << 20
204 }
205 if n >= 0xffffffff {
206 return 0
207 }
208 return uint32(n)
209 }
210
211 func (t *Transport) disableCompression() bool {
212 return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression())
213 }
214
215 func NewTransport(t1 TransportConfig) *Transport {
216 connPool := new(clientConnPool)
217 t2 := &Transport{
218 ConnPool: noDialClientConnPool{connPool},
219 t1: t1,
220 }
221 connPool.t = t2
222 return t2
223 }
224
225 func (t *Transport) AddConn(scheme, authority string, c net.Conn) error {
226 connPool, ok := t.ConnPool.(noDialClientConnPool)
227 if !ok {
228 go c.Close()
229 return nil
230 }
231 addr := authorityAddr(scheme, authority)
232 used, err := connPool.addConnIfNeeded(addr, t, c)
233 if !used {
234 go c.Close()
235 }
236 return err
237 }
238
239
240
241 type unencryptedTransport Transport
242
243 func (t *unencryptedTransport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
244 return (*Transport)(t).RoundTripOpt(req, RoundTripOpt{allowHTTP: true})
245 }
246
247 func (t *Transport) connPool() ClientConnPool {
248 t.connPoolOnce.Do(t.initConnPool)
249 return t.connPoolOrDef
250 }
251
252 func (t *Transport) initConnPool() {
253 if t.ConnPool != nil {
254 t.connPoolOrDef = t.ConnPool
255 } else {
256 t.connPoolOrDef = &clientConnPool{t: t}
257 }
258 }
259
260
261
262 type ClientConn struct {
263 t *Transport
264 tconn net.Conn
265 tlsState *tls.ConnectionState
266 atomicReused uint32
267 singleUse bool
268 getConnCalled bool
269
270
271 readerDone chan struct{}
272 readerErr error
273
274 idleTimeout time.Duration
275 idleTimer *time.Timer
276
277 mu sync.Mutex
278 cond *sync.Cond
279 flow outflow
280 inflow inflow
281 doNotReuse bool
282 closing bool
283 closed bool
284 closedOnIdle bool
285 seenSettings bool
286 seenSettingsChan chan struct{}
287 wantSettingsAck bool
288 goAway *GoAwayFrame
289 goAwayDebug string
290 streams map[uint32]*clientStream
291 streamsReserved int
292 nextStreamID uint32
293 pendingRequests int
294 pings map[[8]byte]chan struct{}
295 br *bufio.Reader
296 lastActive time.Time
297 lastIdle time.Time
298
299 maxFrameSize uint32
300 maxConcurrentStreams uint32
301 peerMaxHeaderListSize uint64
302 peerMaxHeaderTableSize uint32
303 initialWindowSize uint32
304 initialStreamRecvWindowSize int32
305 readIdleTimeout time.Duration
306 pingTimeout time.Duration
307 extendedConnectAllowed bool
308 strictMaxConcurrentStreams bool
309
310
311
312
313
314
315
316
317
318 rstStreamPingsBlocked bool
319
320
321
322
323
324
325
326 pendingResets int
327
328
329
330
331
332
333 readBeforeStreamID uint32
334
335
336
337
338 reqHeaderMu chan struct{}
339
340
341
342
343
344 internalStateHook func()
345
346
347
348
349 wmu sync.Mutex
350 bw *bufio.Writer
351 fr *Framer
352 werr error
353 hbuf bytes.Buffer
354 henc *hpack.Encoder
355 }
356
357
358
359 type clientStream struct {
360 cc *ClientConn
361
362
363 ctx context.Context
364 reqCancel <-chan struct{}
365
366 trace *httptrace.ClientTrace
367 ID uint32
368 bufPipe pipe
369 requestedGzip bool
370 isHead bool
371
372 abortOnce sync.Once
373 abort chan struct{}
374 abortErr error
375
376 peerClosed chan struct{}
377 donec chan struct{}
378 on100 chan struct{}
379
380 respHeaderRecv chan struct{}
381 res *ClientResponse
382
383 flow outflow
384 inflow inflow
385 bytesRemain int64
386 readErr error
387
388 reqBody io.ReadCloser
389 reqBodyContentLength int64
390 reqBodyClosed chan struct{}
391
392
393 sentEndStream bool
394 sentHeaders bool
395
396
397 firstByte bool
398 pastHeaders bool
399 pastTrailers bool
400 readClosed bool
401 readAborted bool
402 totalHeaderSize int64
403
404 trailer Header
405 resTrailer *Header
406
407 staticResp ClientResponse
408 }
409
410 var got1xxFuncForTests func(int, textproto.MIMEHeader) error
411
412
413
414 func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
415 if fn := got1xxFuncForTests; fn != nil {
416 return fn
417 }
418 return traceGot1xxResponseFunc(cs.trace)
419 }
420
421 func (cs *clientStream) abortStream(err error) {
422 cs.cc.mu.Lock()
423 defer cs.cc.mu.Unlock()
424 cs.abortStreamLocked(err)
425 }
426
427 func (cs *clientStream) abortStreamLocked(err error) {
428 cs.abortOnce.Do(func() {
429 cs.abortErr = err
430 close(cs.abort)
431 })
432 if cs.reqBody != nil {
433 cs.closeReqBodyLocked()
434 }
435
436 if cs.cc.cond != nil {
437
438 cs.cc.cond.Broadcast()
439 }
440 }
441
442 func (cs *clientStream) abortRequestBodyWrite() {
443 cc := cs.cc
444 cc.mu.Lock()
445 defer cc.mu.Unlock()
446 if cs.reqBody != nil && cs.reqBodyClosed == nil {
447 cs.closeReqBodyLocked()
448 cc.cond.Broadcast()
449 }
450 }
451
452 func (cs *clientStream) closeReqBodyLocked() {
453 if cs.reqBodyClosed != nil {
454 return
455 }
456 cs.reqBodyClosed = make(chan struct{})
457 reqBodyClosed := cs.reqBodyClosed
458 go func() {
459 cs.reqBody.Close()
460 close(reqBodyClosed)
461 }()
462 }
463
464 type stickyErrWriter struct {
465 conn net.Conn
466 timeout time.Duration
467 err *error
468 }
469
470 func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
471 if *sew.err != nil {
472 return 0, *sew.err
473 }
474 n, err = writeWithByteTimeout(sew.conn, sew.timeout, p)
475 *sew.err = err
476 return n, err
477 }
478
479
480
481
482
483
484
485 type noCachedConnError struct{}
486
487 func (noCachedConnError) IsHTTP2NoCachedConnError() {}
488 func (noCachedConnError) Error() string { return "http2: no cached connection was available" }
489
490
491
492
493 func isNoCachedConnError(err error) bool {
494 _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
495 return ok
496 }
497
498 var ErrNoCachedConn error = noCachedConnError{}
499
500
501 type RoundTripOpt struct {
502
503
504
505
506 OnlyCachedConn bool
507
508 allowHTTP bool
509 }
510
511 func (t *Transport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
512 return t.RoundTripOpt(req, RoundTripOpt{})
513 }
514
515
516
517 func authorityAddr(scheme string, authority string) (addr string) {
518 host, port, err := net.SplitHostPort(authority)
519 if err != nil {
520 host = authority
521 port = ""
522 }
523 if port == "" {
524 port = "443"
525 if scheme == "http" {
526 port = "80"
527 }
528 }
529 if a, err := idna.ToASCII(host); err == nil {
530 host = a
531 }
532
533 if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
534 return host + ":" + port
535 }
536 return net.JoinHostPort(host, port)
537 }
538
539
540 func (t *Transport) RoundTripOpt(req *ClientRequest, opt RoundTripOpt) (*ClientResponse, error) {
541 switch req.URL.Scheme {
542 case "https":
543
544 case "http":
545 if !t.AllowHTTP && !opt.allowHTTP {
546 return nil, errors.New("http2: unencrypted HTTP/2 not enabled")
547 }
548 default:
549 return nil, errors.New("http2: unsupported scheme")
550 }
551
552 addr := authorityAddr(req.URL.Scheme, req.URL.Host)
553 for retry := 0; ; retry++ {
554 cc, err := t.connPool().GetClientConn(req, addr)
555 if err != nil {
556 t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
557 return nil, err
558 }
559 reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
560 traceGotConn(req, cc, reused)
561 res, err := cc.RoundTrip(req)
562 if err != nil && retry <= 6 {
563 roundTripErr := err
564 if req, err = shouldRetryRequest(req, err); err == nil {
565
566 if retry == 0 {
567 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
568 continue
569 }
570 backoff := float64(uint(1) << (uint(retry) - 1))
571 backoff += backoff * (0.1 * mathrand.Float64())
572 d := time.Second * time.Duration(backoff)
573 tm := time.NewTimer(d)
574 select {
575 case <-tm.C:
576 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
577 continue
578 case <-req.Context.Done():
579 tm.Stop()
580 err = req.Context.Err()
581 }
582 }
583 }
584 if err == errClientConnNotEstablished {
585
586
587
588
589
590
591
592
593
594
595 if cc.idleTimer != nil {
596 cc.idleTimer.Stop()
597 }
598 t.connPool().MarkDead(cc)
599 }
600 if err != nil {
601 t.vlogf("RoundTrip failure: %v", err)
602 return nil, err
603 }
604 return res, nil
605 }
606 }
607
608 func (t *Transport) IdleConnStrsForTesting() []string {
609 pool, ok := t.connPool().(noDialClientConnPool)
610 if !ok {
611 return nil
612 }
613
614 var ret []string
615 pool.mu.Lock()
616 defer pool.mu.Unlock()
617 for k, ccs := range pool.conns {
618 for _, cc := range ccs {
619 if cc.idleState().canTakeNewRequest {
620 ret = append(ret, k)
621 }
622 }
623 }
624 slices.Sort(ret)
625 return ret
626 }
627
628
629
630
631 func (t *Transport) CloseIdleConnections() {
632 if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
633 cp.closeIdleConnections()
634 }
635 }
636
637 var (
638 errClientConnClosed = errors.New("http2: client conn is closed")
639 errClientConnUnusable = errors.New("http2: client conn not usable")
640 errClientConnNotEstablished = errors.New("http2: client conn could not be established")
641 errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
642 errClientConnForceClosed = errors.New("http2: client connection force closed via ClientConn.Close")
643 )
644
645
646
647
648
649 func shouldRetryRequest(req *ClientRequest, err error) (*ClientRequest, error) {
650 if !canRetryError(err) {
651 return nil, err
652 }
653
654
655 if req.Body == nil || req.Body == NoBody {
656 return req, nil
657 }
658
659
660
661 if req.GetBody != nil {
662 body, err := req.GetBody()
663 if err != nil {
664 return nil, err
665 }
666 newReq := req.Clone()
667 newReq.Body = body
668 return newReq, nil
669 }
670
671
672
673
674 if err == errClientConnUnusable {
675 return req, nil
676 }
677
678 return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
679 }
680
681 func canRetryError(err error) bool {
682 if err == errClientConnUnusable || err == errClientConnGotGoAway {
683 return true
684 }
685 if se, ok := err.(StreamError); ok {
686 return se.Code == ErrCodeRefusedStream
687 }
688 return false
689 }
690
691 func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
692 if t.transportTestHooks != nil {
693 return t.newClientConn(nil, singleUse, nil)
694 }
695 host, _, err := net.SplitHostPort(addr)
696 if err != nil {
697 return nil, err
698 }
699 tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
700 if err != nil {
701 return nil, err
702 }
703 return t.newClientConn(tconn, singleUse, nil)
704 }
705
706 func (t *Transport) newTLSConfig(host string) *tls.Config {
707 cfg := new(tls.Config)
708 if t.TLSClientConfig != nil {
709 *cfg = *t.TLSClientConfig.Clone()
710 }
711 if !strSliceContains(cfg.NextProtos, NextProtoTLS) {
712 cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
713 }
714 if cfg.ServerName == "" {
715 cfg.ServerName = host
716 }
717 return cfg
718 }
719
720 func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
721 if t.DialTLSContext != nil {
722 return t.DialTLSContext(ctx, network, addr, tlsCfg)
723 } else if t.DialTLS != nil {
724 return t.DialTLS(network, addr, tlsCfg)
725 }
726
727 tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
728 if err != nil {
729 return nil, err
730 }
731 state := tlsCn.ConnectionState()
732 if p := state.NegotiatedProtocol; p != NextProtoTLS {
733 return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
734 }
735 if !state.NegotiatedProtocolIsMutual {
736 return nil, errors.New("http2: could not negotiate protocol mutually")
737 }
738 return tlsCn, nil
739 }
740
741
742
743 func (t *Transport) disableKeepAlives() bool {
744 return t.t1 != nil && t.t1.DisableKeepAlives()
745 }
746
747 func (t *Transport) expectContinueTimeout() time.Duration {
748 if t.t1 == nil {
749 return 0
750 }
751 return t.t1.ExpectContinueTimeout()
752 }
753
754 func (t *Transport) NewClientConn(c net.Conn, internalStateHook func()) (NetHTTPClientConn, error) {
755 cc, err := t.newClientConn(c, t.disableKeepAlives(), internalStateHook)
756 if err != nil {
757 return NetHTTPClientConn{}, err
758 }
759
760
761
762 cc.strictMaxConcurrentStreams = true
763
764 return NetHTTPClientConn{cc}, nil
765 }
766
767 func (t *Transport) newClientConn(c net.Conn, singleUse bool, internalStateHook func()) (*ClientConn, error) {
768 conf := configFromTransport(t)
769 cc := &ClientConn{
770 t: t,
771 tconn: c,
772 readerDone: make(chan struct{}),
773 nextStreamID: 1,
774 maxFrameSize: 16 << 10,
775 initialWindowSize: 65535,
776 initialStreamRecvWindowSize: int32(conf.MaxReceiveBufferPerStream),
777 maxConcurrentStreams: initialMaxConcurrentStreams,
778 strictMaxConcurrentStreams: conf.StrictMaxConcurrentRequests,
779 peerMaxHeaderListSize: 0xffffffffffffffff,
780 streams: make(map[uint32]*clientStream),
781 singleUse: singleUse,
782 seenSettingsChan: make(chan struct{}),
783 wantSettingsAck: true,
784 readIdleTimeout: conf.SendPingTimeout,
785 pingTimeout: conf.PingTimeout,
786 pings: make(map[[8]byte]chan struct{}),
787 reqHeaderMu: make(chan struct{}, 1),
788 lastActive: time.Now(),
789 internalStateHook: internalStateHook,
790 }
791 if t.transportTestHooks != nil {
792 t.transportTestHooks.newclientconn(cc)
793 c = cc.tconn
794 }
795 if VerboseLogs {
796 t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
797 }
798
799 cc.cond = sync.NewCond(&cc.mu)
800 cc.flow.add(int32(initialWindowSize))
801
802
803
804 cc.bw = bufio.NewWriter(stickyErrWriter{
805 conn: c,
806 timeout: conf.WriteByteTimeout,
807 err: &cc.werr,
808 })
809 cc.br = bufio.NewReader(c)
810 cc.fr = NewFramer(cc.bw, cc.br)
811 cc.fr.SetMaxReadFrameSize(uint32(conf.MaxReadFrameSize))
812 if t.CountError != nil {
813 cc.fr.countError = t.CountError
814 }
815 maxHeaderTableSize := uint32(conf.MaxDecoderHeaderTableSize)
816 cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
817 cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
818
819 cc.henc = hpack.NewEncoder(&cc.hbuf)
820 cc.henc.SetMaxDynamicTableSizeLimit(uint32(conf.MaxEncoderHeaderTableSize))
821 cc.peerMaxHeaderTableSize = initialHeaderTableSize
822
823 if cs, ok := c.(connectionStater); ok {
824 state := cs.ConnectionState()
825 cc.tlsState = &state
826 }
827
828 initialSettings := []Setting{
829 {ID: SettingEnablePush, Val: 0},
830 {ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
831 }
832 initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: uint32(conf.MaxReadFrameSize)})
833 if max := t.maxHeaderListSize(); max != 0 {
834 initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
835 }
836 if maxHeaderTableSize != initialHeaderTableSize {
837 initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
838 }
839
840 cc.bw.Write(clientPreface)
841 cc.fr.WriteSettings(initialSettings...)
842 cc.fr.WriteWindowUpdate(0, uint32(conf.MaxReceiveBufferPerConnection))
843 cc.inflow.init(int32(conf.MaxReceiveBufferPerConnection) + initialWindowSize)
844 cc.bw.Flush()
845 if cc.werr != nil {
846 cc.Close()
847 return nil, cc.werr
848 }
849
850
851 if d := t.idleConnTimeout(); d != 0 {
852 cc.idleTimeout = d
853 cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
854 }
855
856 go cc.readLoop()
857 return cc, nil
858 }
859
860 func (cc *ClientConn) healthCheck() {
861 pingTimeout := cc.pingTimeout
862
863
864 ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
865 defer cancel()
866 cc.vlogf("http2: Transport sending health check")
867 err := cc.Ping(ctx)
868 if err != nil {
869 cc.vlogf("http2: Transport health check failure: %v", err)
870 cc.closeForLostPing()
871 } else {
872 cc.vlogf("http2: Transport health check success")
873 }
874 }
875
876
877 func (cc *ClientConn) SetDoNotReuse() {
878 cc.mu.Lock()
879 defer cc.mu.Unlock()
880 cc.doNotReuse = true
881 }
882
883 func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
884 cc.mu.Lock()
885 defer cc.mu.Unlock()
886
887 old := cc.goAway
888 cc.goAway = f
889
890
891 if cc.goAwayDebug == "" {
892 cc.goAwayDebug = string(f.DebugData())
893 }
894 if old != nil && old.ErrCode != ErrCodeNo {
895 cc.goAway.ErrCode = old.ErrCode
896 }
897 last := f.LastStreamID
898 for streamID, cs := range cc.streams {
899 if streamID <= last {
900
901
902
903 continue
904 }
905 if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
906
907
908
909 cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
910 } else {
911
912
913 cs.abortStreamLocked(errClientConnGotGoAway)
914 }
915 }
916 }
917
918
919
920
921
922
923 func (cc *ClientConn) CanTakeNewRequest() bool {
924 cc.mu.Lock()
925 defer cc.mu.Unlock()
926 return cc.canTakeNewRequestLocked()
927 }
928
929
930
931
932 func (cc *ClientConn) ReserveNewRequest() bool {
933 cc.mu.Lock()
934 defer cc.mu.Unlock()
935 if st := cc.idleStateLocked(); !st.canTakeNewRequest {
936 return false
937 }
938 cc.streamsReserved++
939 return true
940 }
941
942
943 type ClientConnState struct {
944
945 Closed bool
946
947
948
949
950
951 Closing bool
952
953
954 StreamsActive int
955
956
957
958 StreamsReserved int
959
960
961
962
963 StreamsPending int
964
965
966
967
968 MaxConcurrentStreams uint32
969
970
971
972 LastIdle time.Time
973 }
974
975
976 func (cc *ClientConn) State() ClientConnState {
977 cc.wmu.Lock()
978 maxConcurrent := cc.maxConcurrentStreams
979 if !cc.seenSettings {
980 maxConcurrent = 0
981 }
982 cc.wmu.Unlock()
983
984 cc.mu.Lock()
985 defer cc.mu.Unlock()
986 return ClientConnState{
987 Closed: cc.closed,
988 Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
989 StreamsActive: len(cc.streams) + cc.pendingResets,
990 StreamsReserved: cc.streamsReserved,
991 StreamsPending: cc.pendingRequests,
992 LastIdle: cc.lastIdle,
993 MaxConcurrentStreams: maxConcurrent,
994 }
995 }
996
997
998
999 type clientConnIdleState struct {
1000 canTakeNewRequest bool
1001 }
1002
1003 func (cc *ClientConn) idleState() clientConnIdleState {
1004 cc.mu.Lock()
1005 defer cc.mu.Unlock()
1006 return cc.idleStateLocked()
1007 }
1008
1009 func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
1010 if cc.singleUse && cc.nextStreamID > 1 {
1011 return
1012 }
1013 var maxConcurrentOkay bool
1014 if cc.strictMaxConcurrentStreams {
1015
1016
1017
1018
1019 maxConcurrentOkay = true
1020 } else {
1021
1022
1023
1024
1025
1026
1027 maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
1028 }
1029
1030 st.canTakeNewRequest = maxConcurrentOkay && cc.isUsableLocked()
1031
1032
1033
1034
1035
1036
1037
1038
1039 if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed && !cc.closedOnIdle {
1040 st.canTakeNewRequest = true
1041 }
1042
1043 return
1044 }
1045
1046 func (cc *ClientConn) isUsableLocked() bool {
1047 return cc.goAway == nil &&
1048 !cc.closed &&
1049 !cc.closing &&
1050 !cc.doNotReuse &&
1051 int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
1052 !cc.tooIdleLocked()
1053 }
1054
1055
1056
1057
1058
1059
1060
1061 func (cc *ClientConn) canReserveLocked() bool {
1062 if cc.currentRequestCountLocked() >= int(cc.maxConcurrentStreams) {
1063 return false
1064 }
1065 if !cc.isUsableLocked() {
1066 return false
1067 }
1068 return true
1069 }
1070
1071
1072
1073 func (cc *ClientConn) currentRequestCountLocked() int {
1074 return len(cc.streams) + cc.streamsReserved + cc.pendingResets
1075 }
1076
1077 func (cc *ClientConn) canTakeNewRequestLocked() bool {
1078 st := cc.idleStateLocked()
1079 return st.canTakeNewRequest
1080 }
1081
1082
1083 func (cc *ClientConn) availableLocked() int {
1084 if !cc.canTakeNewRequestLocked() {
1085 return 0
1086 }
1087 return max(0, int(cc.maxConcurrentStreams)-cc.currentRequestCountLocked())
1088 }
1089
1090
1091
1092 func (cc *ClientConn) tooIdleLocked() bool {
1093
1094
1095
1096
1097 return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
1098 }
1099
1100
1101
1102
1103
1104
1105
1106 func (cc *ClientConn) onIdleTimeout() {
1107 cc.closeIfIdle()
1108 }
1109
1110 func (cc *ClientConn) closeConn() {
1111 t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
1112 defer t.Stop()
1113 cc.tconn.Close()
1114 cc.maybeCallStateHook()
1115 }
1116
1117
1118
1119 func (cc *ClientConn) forceCloseConn() {
1120 tc, ok := cc.tconn.(*tls.Conn)
1121 if !ok {
1122 return
1123 }
1124 if nc := tc.NetConn(); nc != nil {
1125 nc.Close()
1126 }
1127 }
1128
1129 func (cc *ClientConn) closeIfIdle() {
1130 cc.mu.Lock()
1131 if len(cc.streams) > 0 || cc.streamsReserved > 0 {
1132 cc.mu.Unlock()
1133 return
1134 }
1135 cc.closed = true
1136 cc.closedOnIdle = true
1137 nextID := cc.nextStreamID
1138
1139 cc.mu.Unlock()
1140
1141 if VerboseLogs {
1142 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
1143 }
1144 cc.closeConn()
1145 }
1146
1147 func (cc *ClientConn) isDoNotReuseAndIdle() bool {
1148 cc.mu.Lock()
1149 defer cc.mu.Unlock()
1150 return cc.doNotReuse && len(cc.streams) == 0
1151 }
1152
1153 var shutdownEnterWaitStateHook = func() {}
1154
1155
1156 func (cc *ClientConn) Shutdown(ctx context.Context) error {
1157 if err := cc.sendGoAway(); err != nil {
1158 return err
1159 }
1160
1161 done := make(chan struct{})
1162 cancelled := false
1163 go func() {
1164 cc.mu.Lock()
1165 defer cc.mu.Unlock()
1166 for {
1167 if len(cc.streams) == 0 || cc.closed {
1168 cc.closed = true
1169 close(done)
1170 break
1171 }
1172 if cancelled {
1173 break
1174 }
1175 cc.cond.Wait()
1176 }
1177 }()
1178 shutdownEnterWaitStateHook()
1179 select {
1180 case <-done:
1181 cc.closeConn()
1182 return nil
1183 case <-ctx.Done():
1184 cc.mu.Lock()
1185
1186 cancelled = true
1187 cc.cond.Broadcast()
1188 cc.mu.Unlock()
1189 return ctx.Err()
1190 }
1191 }
1192
1193 func (cc *ClientConn) sendGoAway() error {
1194 cc.mu.Lock()
1195 closing := cc.closing
1196 cc.closing = true
1197 maxStreamID := cc.nextStreamID
1198 cc.mu.Unlock()
1199 if closing {
1200
1201 return nil
1202 }
1203
1204 cc.wmu.Lock()
1205 defer cc.wmu.Unlock()
1206
1207 if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
1208 return err
1209 }
1210 if err := cc.bw.Flush(); err != nil {
1211 return err
1212 }
1213
1214 return nil
1215 }
1216
1217
1218
1219 func (cc *ClientConn) closeForError(err error) {
1220 cc.mu.Lock()
1221 cc.closed = true
1222 for _, cs := range cc.streams {
1223 cs.abortStreamLocked(err)
1224 }
1225 cc.cond.Broadcast()
1226 cc.mu.Unlock()
1227 cc.closeConn()
1228 }
1229
1230
1231
1232
1233 func (cc *ClientConn) Close() error {
1234 cc.closeForError(errClientConnForceClosed)
1235 return nil
1236 }
1237
1238
1239 func (cc *ClientConn) closeForLostPing() {
1240 err := errors.New("http2: client connection lost")
1241 if f := cc.t.CountError; f != nil {
1242 f("conn_close_lost_ping")
1243 }
1244 cc.closeForError(err)
1245 }
1246
1247
1248
1249 var errRequestCanceled = internal.ErrRequestCanceled
1250
1251 func (cc *ClientConn) responseHeaderTimeout() time.Duration {
1252 if cc.t.t1 != nil {
1253 return cc.t.t1.ResponseHeaderTimeout()
1254 }
1255
1256
1257
1258
1259 return 0
1260 }
1261
1262
1263
1264
1265 func actualContentLength(req *ClientRequest) int64 {
1266 if req.Body == nil || req.Body == NoBody {
1267 return 0
1268 }
1269 if req.ContentLength != 0 {
1270 return req.ContentLength
1271 }
1272 return -1
1273 }
1274
1275 func (cc *ClientConn) decrStreamReservations() {
1276 cc.mu.Lock()
1277 defer cc.mu.Unlock()
1278 cc.decrStreamReservationsLocked()
1279 }
1280
1281 func (cc *ClientConn) decrStreamReservationsLocked() {
1282 if cc.streamsReserved > 0 {
1283 cc.streamsReserved--
1284 }
1285 }
1286
1287 func (cc *ClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
1288 return cc.roundTrip(req, nil)
1289 }
1290
1291 func (cc *ClientConn) roundTrip(req *ClientRequest, streamf func(*clientStream)) (*ClientResponse, error) {
1292 ctx := req.Context
1293 req.stream = clientStream{
1294 cc: cc,
1295 ctx: ctx,
1296 reqCancel: req.Cancel,
1297 isHead: req.Method == "HEAD",
1298 reqBody: req.Body,
1299 reqBodyContentLength: actualContentLength(req),
1300 trace: httptrace.ContextClientTrace(ctx),
1301 peerClosed: make(chan struct{}),
1302 abort: make(chan struct{}),
1303 respHeaderRecv: make(chan struct{}),
1304 donec: make(chan struct{}),
1305 resTrailer: req.ResTrailer,
1306 }
1307 cs := &req.stream
1308
1309 cs.requestedGzip = httpcommon.IsRequestGzip(req.Method, req.Header, cc.t.disableCompression())
1310
1311 go cs.doRequest(req, streamf)
1312
1313 waitDone := func() error {
1314 select {
1315 case <-cs.donec:
1316 return nil
1317 case <-ctx.Done():
1318 return ctx.Err()
1319 case <-cs.reqCancel:
1320 return errRequestCanceled
1321 }
1322 }
1323
1324 handleResponseHeaders := func() (*ClientResponse, error) {
1325 res := cs.res
1326 if res.StatusCode > 299 {
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336 cs.abortRequestBodyWrite()
1337 }
1338 res.TLS = cc.tlsState
1339 if res.Body == NoBody && actualContentLength(req) == 0 {
1340
1341
1342
1343 if err := waitDone(); err != nil {
1344 return nil, err
1345 }
1346 }
1347 return res, nil
1348 }
1349
1350 cancelRequest := func(cs *clientStream, err error) error {
1351 cs.cc.mu.Lock()
1352 bodyClosed := cs.reqBodyClosed
1353 cs.cc.mu.Unlock()
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367 if bodyClosed != nil {
1368 <-bodyClosed
1369 }
1370 return err
1371 }
1372
1373 for {
1374 select {
1375 case <-cs.respHeaderRecv:
1376 return handleResponseHeaders()
1377 case <-cs.abort:
1378 select {
1379 case <-cs.respHeaderRecv:
1380
1381
1382
1383
1384 return handleResponseHeaders()
1385 default:
1386 waitDone()
1387 return nil, cs.abortErr
1388 }
1389 case <-ctx.Done():
1390 err := ctx.Err()
1391 cs.abortStream(err)
1392 return nil, cancelRequest(cs, err)
1393 case <-cs.reqCancel:
1394 cs.abortStream(errRequestCanceled)
1395 return nil, cancelRequest(cs, errRequestCanceled)
1396 }
1397 }
1398 }
1399
1400
1401
1402
1403 func (cs *clientStream) doRequest(req *ClientRequest, streamf func(*clientStream)) {
1404 err := cs.writeRequest(req, streamf)
1405 cs.cleanupWriteRequest(err)
1406 }
1407
1408 var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")
1409
1410
1411
1412
1413
1414
1415
1416
1417 func (cs *clientStream) writeRequest(req *ClientRequest, streamf func(*clientStream)) (err error) {
1418 cc := cs.cc
1419 ctx := cs.ctx
1420
1421
1422
1423 var isExtendedConnect bool
1424 if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
1425 isExtendedConnect = true
1426 }
1427
1428
1429
1430
1431 if cc.reqHeaderMu == nil {
1432 panic("RoundTrip on uninitialized ClientConn")
1433 }
1434 if isExtendedConnect {
1435 select {
1436 case <-cs.reqCancel:
1437 return errRequestCanceled
1438 case <-ctx.Done():
1439 return ctx.Err()
1440 case <-cc.seenSettingsChan:
1441 if !cc.extendedConnectAllowed {
1442 return errExtendedConnectNotSupported
1443 }
1444 }
1445 }
1446 select {
1447 case cc.reqHeaderMu <- struct{}{}:
1448 case <-cs.reqCancel:
1449 return errRequestCanceled
1450 case <-ctx.Done():
1451 return ctx.Err()
1452 }
1453
1454 cc.mu.Lock()
1455 if cc.idleTimer != nil {
1456 cc.idleTimer.Stop()
1457 }
1458 cc.decrStreamReservationsLocked()
1459 if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
1460 cc.mu.Unlock()
1461 <-cc.reqHeaderMu
1462 return err
1463 }
1464 cc.addStreamLocked(cs)
1465 if isConnectionCloseRequest(req) {
1466 cc.doNotReuse = true
1467 }
1468 cc.mu.Unlock()
1469
1470 if streamf != nil {
1471 streamf(cs)
1472 }
1473
1474 continueTimeout := cc.t.expectContinueTimeout()
1475 if continueTimeout != 0 {
1476 if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
1477 continueTimeout = 0
1478 } else {
1479 cs.on100 = make(chan struct{}, 1)
1480 }
1481 }
1482
1483
1484
1485
1486
1487 err = cs.encodeAndWriteHeaders(req)
1488 <-cc.reqHeaderMu
1489 if err != nil {
1490 return err
1491 }
1492
1493 hasBody := cs.reqBodyContentLength != 0
1494 if !hasBody {
1495 cs.sentEndStream = true
1496 } else {
1497 if continueTimeout != 0 {
1498 traceWait100Continue(cs.trace)
1499 timer := time.NewTimer(continueTimeout)
1500 select {
1501 case <-timer.C:
1502 err = nil
1503 case <-cs.on100:
1504 err = nil
1505 case <-cs.abort:
1506 err = cs.abortErr
1507 case <-ctx.Done():
1508 err = ctx.Err()
1509 case <-cs.reqCancel:
1510 err = errRequestCanceled
1511 }
1512 timer.Stop()
1513 if err != nil {
1514 traceWroteRequest(cs.trace, err)
1515 return err
1516 }
1517 }
1518
1519 if err = cs.writeRequestBody(req); err != nil {
1520 if err != errStopReqBodyWrite {
1521 traceWroteRequest(cs.trace, err)
1522 return err
1523 }
1524 } else {
1525 cs.sentEndStream = true
1526 }
1527 }
1528
1529 traceWroteRequest(cs.trace, err)
1530
1531 var respHeaderTimer <-chan time.Time
1532 var respHeaderRecv chan struct{}
1533 if d := cc.responseHeaderTimeout(); d != 0 {
1534 timer := time.NewTimer(d)
1535 defer timer.Stop()
1536 respHeaderTimer = timer.C
1537 respHeaderRecv = cs.respHeaderRecv
1538 }
1539
1540
1541
1542 for {
1543 select {
1544 case <-cs.peerClosed:
1545 return nil
1546 case <-respHeaderTimer:
1547 return errTimeout
1548 case <-respHeaderRecv:
1549 respHeaderRecv = nil
1550 respHeaderTimer = nil
1551 case <-cs.abort:
1552 return cs.abortErr
1553 case <-ctx.Done():
1554 return ctx.Err()
1555 case <-cs.reqCancel:
1556 return errRequestCanceled
1557 }
1558 }
1559 }
1560
1561 func (cs *clientStream) encodeAndWriteHeaders(req *ClientRequest) error {
1562 cc := cs.cc
1563 ctx := cs.ctx
1564
1565 cc.wmu.Lock()
1566 defer cc.wmu.Unlock()
1567
1568
1569 select {
1570 case <-cs.abort:
1571 return cs.abortErr
1572 case <-ctx.Done():
1573 return ctx.Err()
1574 case <-cs.reqCancel:
1575 return errRequestCanceled
1576 default:
1577 }
1578
1579
1580
1581
1582
1583
1584 cc.hbuf.Reset()
1585 res, err := encodeRequestHeaders(req, cs.requestedGzip, cc.peerMaxHeaderListSize, func(name, value string) {
1586 cc.writeHeader(name, value)
1587 })
1588 if err != nil {
1589 return fmt.Errorf("http2: %w", err)
1590 }
1591 hdrs := cc.hbuf.Bytes()
1592
1593
1594 endStream := !res.HasBody && !res.HasTrailers
1595 cs.sentHeaders = true
1596 err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
1597 traceWroteHeaders(cs.trace)
1598 return err
1599 }
1600
1601 func encodeRequestHeaders(req *ClientRequest, addGzipHeader bool, peerMaxHeaderListSize uint64, headerf func(name, value string)) (httpcommon.EncodeHeadersResult, error) {
1602 return httpcommon.EncodeHeaders(req.Context, httpcommon.EncodeHeadersParam{
1603 Request: httpcommon.Request{
1604 Header: req.Header,
1605 Trailer: req.Trailer,
1606 URL: req.URL,
1607 Host: req.Host,
1608 Method: req.Method,
1609 ActualContentLength: actualContentLength(req),
1610 },
1611 AddGzipHeader: addGzipHeader,
1612 PeerMaxHeaderListSize: peerMaxHeaderListSize,
1613 DefaultUserAgent: defaultUserAgent,
1614 }, headerf)
1615 }
1616
1617
1618
1619
1620
1621 func (cs *clientStream) cleanupWriteRequest(err error) {
1622 cc := cs.cc
1623
1624 if cs.ID == 0 {
1625
1626 cc.decrStreamReservations()
1627 }
1628
1629
1630
1631
1632
1633 cc.mu.Lock()
1634 mustCloseBody := false
1635 if cs.reqBody != nil && cs.reqBodyClosed == nil {
1636 mustCloseBody = true
1637 cs.reqBodyClosed = make(chan struct{})
1638 }
1639 bodyClosed := cs.reqBodyClosed
1640 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
1641
1642 readSinceStream := cc.readBeforeStreamID > cs.ID
1643 cc.mu.Unlock()
1644 if mustCloseBody {
1645 cs.reqBody.Close()
1646 close(bodyClosed)
1647 }
1648 if bodyClosed != nil {
1649 <-bodyClosed
1650 }
1651
1652 if err != nil && cs.sentEndStream {
1653
1654
1655
1656 select {
1657 case <-cs.peerClosed:
1658 err = nil
1659 default:
1660 }
1661 }
1662 if err != nil {
1663 cs.abortStream(err)
1664 if cs.sentHeaders {
1665 if se, ok := err.(StreamError); ok {
1666 if se.Cause != errFromPeer {
1667 cc.writeStreamReset(cs.ID, se.Code, false, err)
1668 }
1669 } else {
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687 ping := false
1688 if !closeOnIdle && !readSinceStream {
1689 cc.mu.Lock()
1690
1691
1692 if !cc.rstStreamPingsBlocked {
1693 if cc.pendingResets == 0 {
1694 ping = true
1695 }
1696 cc.pendingResets++
1697 }
1698 cc.mu.Unlock()
1699 }
1700 cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
1701 }
1702 }
1703 cs.bufPipe.CloseWithError(err)
1704 } else {
1705 if cs.sentHeaders && !cs.sentEndStream {
1706 cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
1707 }
1708 cs.bufPipe.CloseWithError(errRequestCanceled)
1709 }
1710 if cs.ID != 0 {
1711 cc.forgetStreamID(cs.ID)
1712 }
1713
1714 cc.wmu.Lock()
1715 werr := cc.werr
1716 cc.wmu.Unlock()
1717 if werr != nil {
1718 cc.Close()
1719 }
1720
1721 close(cs.donec)
1722 cc.maybeCallStateHook()
1723 }
1724
1725
1726
1727 func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
1728 for {
1729 if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
1730
1731
1732 return errClientConnNotEstablished
1733 }
1734 cc.lastActive = time.Now()
1735 if cc.closed || !cc.canTakeNewRequestLocked() {
1736 return errClientConnUnusable
1737 }
1738 cc.lastIdle = time.Time{}
1739 if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
1740 return nil
1741 }
1742 cc.pendingRequests++
1743 cc.cond.Wait()
1744 cc.pendingRequests--
1745 select {
1746 case <-cs.abort:
1747 return cs.abortErr
1748 default:
1749 }
1750 }
1751 }
1752
1753
1754 func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
1755 first := true
1756 for len(hdrs) > 0 && cc.werr == nil {
1757 chunk := hdrs
1758 if len(chunk) > maxFrameSize {
1759 chunk = chunk[:maxFrameSize]
1760 }
1761 hdrs = hdrs[len(chunk):]
1762 endHeaders := len(hdrs) == 0
1763 if first {
1764 cc.fr.WriteHeaders(HeadersFrameParam{
1765 StreamID: streamID,
1766 BlockFragment: chunk,
1767 EndStream: endStream,
1768 EndHeaders: endHeaders,
1769 })
1770 first = false
1771 } else {
1772 cc.fr.WriteContinuation(streamID, endHeaders, chunk)
1773 }
1774 }
1775 cc.bw.Flush()
1776 return cc.werr
1777 }
1778
1779
1780 var (
1781
1782 errStopReqBodyWrite = errors.New("http2: aborting request body write")
1783
1784
1785 errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
1786
1787 errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
1788 )
1789
1790
1791
1792
1793
1794
1795 func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
1796 const max = 512 << 10
1797 n := int64(maxFrameSize)
1798 if n > max {
1799 n = max
1800 }
1801 if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
1802
1803
1804
1805
1806 n = cl + 1
1807 }
1808 if n < 1 {
1809 return 1
1810 }
1811 return int(n)
1812 }
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822 var bufPools [7]sync.Pool
1823 func bufPoolIndex(size int) int {
1824 if size <= 16384 {
1825 return 0
1826 }
1827 size -= 1
1828 bits := bits.Len(uint(size))
1829 index := bits - 14
1830 if index >= len(bufPools) {
1831 return len(bufPools) - 1
1832 }
1833 return index
1834 }
1835
1836 func (cs *clientStream) writeRequestBody(req *ClientRequest) (err error) {
1837 cc := cs.cc
1838 body := cs.reqBody
1839 sentEnd := false
1840
1841 hasTrailers := req.Trailer != nil
1842 remainLen := cs.reqBodyContentLength
1843 hasContentLen := remainLen != -1
1844
1845 cc.mu.Lock()
1846 maxFrameSize := int(cc.maxFrameSize)
1847 cc.mu.Unlock()
1848
1849
1850 scratchLen := cs.frameScratchBufferLen(maxFrameSize)
1851 var buf []byte
1852 index := bufPoolIndex(scratchLen)
1853 if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
1854 defer bufPools[index].Put(bp)
1855 buf = *bp
1856 } else {
1857 buf = make([]byte, scratchLen)
1858 defer bufPools[index].Put(&buf)
1859 }
1860
1861 var sawEOF bool
1862 for !sawEOF {
1863 n, err := body.Read(buf)
1864 if hasContentLen {
1865 remainLen -= int64(n)
1866 if remainLen == 0 && err == nil {
1867
1868
1869
1870
1871
1872
1873
1874 var scratch [1]byte
1875 var n1 int
1876 n1, err = body.Read(scratch[:])
1877 remainLen -= int64(n1)
1878 }
1879 if remainLen < 0 {
1880 err = errReqBodyTooLong
1881 return err
1882 }
1883 }
1884 if err != nil {
1885 cc.mu.Lock()
1886 bodyClosed := cs.reqBodyClosed != nil
1887 cc.mu.Unlock()
1888 switch {
1889 case bodyClosed:
1890 return errStopReqBodyWrite
1891 case err == io.EOF:
1892 sawEOF = true
1893 err = nil
1894 default:
1895 return err
1896 }
1897 }
1898
1899 remain := buf[:n]
1900 for len(remain) > 0 && err == nil {
1901 var allowed int32
1902 allowed, err = cs.awaitFlowControl(len(remain))
1903 if err != nil {
1904 return err
1905 }
1906 cc.wmu.Lock()
1907 data := remain[:allowed]
1908 remain = remain[allowed:]
1909 sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
1910 err = cc.fr.WriteData(cs.ID, sentEnd, data)
1911 if err == nil {
1912
1913
1914
1915
1916
1917
1918 err = cc.bw.Flush()
1919 }
1920 cc.wmu.Unlock()
1921 }
1922 if err != nil {
1923 return err
1924 }
1925 }
1926
1927 if sentEnd {
1928
1929
1930
1931 return nil
1932 }
1933
1934
1935
1936
1937 cc.mu.Lock()
1938 trailer := req.Trailer
1939 err = cs.abortErr
1940 cc.mu.Unlock()
1941 if err != nil {
1942 return err
1943 }
1944
1945 cc.wmu.Lock()
1946 defer cc.wmu.Unlock()
1947 var trls []byte
1948 if len(trailer) > 0 {
1949 trls, err = cc.encodeTrailers(trailer)
1950 if err != nil {
1951 return err
1952 }
1953 }
1954
1955
1956
1957 if len(trls) > 0 {
1958 err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
1959 } else {
1960 err = cc.fr.WriteData(cs.ID, true, nil)
1961 }
1962 if ferr := cc.bw.Flush(); ferr != nil && err == nil {
1963 err = ferr
1964 }
1965 return err
1966 }
1967
1968
1969
1970
1971
1972 func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
1973 cc := cs.cc
1974 ctx := cs.ctx
1975 cc.mu.Lock()
1976 defer cc.mu.Unlock()
1977 for {
1978 if cc.closed {
1979 return 0, errClientConnClosed
1980 }
1981 if cs.reqBodyClosed != nil {
1982 return 0, errStopReqBodyWrite
1983 }
1984 select {
1985 case <-cs.abort:
1986 return 0, cs.abortErr
1987 case <-ctx.Done():
1988 return 0, ctx.Err()
1989 case <-cs.reqCancel:
1990 return 0, errRequestCanceled
1991 default:
1992 }
1993 if a := cs.flow.available(); a > 0 {
1994 take := a
1995 if int(take) > maxBytes {
1996
1997 take = int32(maxBytes)
1998 }
1999 if take > int32(cc.maxFrameSize) {
2000 take = int32(cc.maxFrameSize)
2001 }
2002 cs.flow.take(take)
2003 return take, nil
2004 }
2005 cc.cond.Wait()
2006 }
2007 }
2008
2009
2010 func (cc *ClientConn) encodeTrailers(trailer Header) ([]byte, error) {
2011 cc.hbuf.Reset()
2012
2013 hlSize := uint64(0)
2014 for k, vv := range trailer {
2015 for _, v := range vv {
2016 hf := hpack.HeaderField{Name: k, Value: v}
2017 hlSize += uint64(hf.Size())
2018 }
2019 }
2020 if hlSize > cc.peerMaxHeaderListSize {
2021 return nil, errRequestHeaderListSize
2022 }
2023
2024 for k, vv := range trailer {
2025 lowKey, ascii := httpcommon.LowerHeader(k)
2026 if !ascii {
2027
2028
2029 continue
2030 }
2031
2032
2033 for _, v := range vv {
2034 cc.writeHeader(lowKey, v)
2035 }
2036 }
2037 return cc.hbuf.Bytes(), nil
2038 }
2039
2040 func (cc *ClientConn) writeHeader(name, value string) {
2041 if VerboseLogs {
2042 log.Printf("http2: Transport encoding header %q = %q", name, value)
2043 }
2044 cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
2045 }
2046
2047 type resAndError struct {
2048 _ incomparable
2049 res *ClientResponse
2050 err error
2051 }
2052
2053
2054 func (cc *ClientConn) addStreamLocked(cs *clientStream) {
2055 cs.flow.add(int32(cc.initialWindowSize))
2056 cs.flow.setConnFlow(&cc.flow)
2057 cs.inflow.init(cc.initialStreamRecvWindowSize)
2058 cs.ID = cc.nextStreamID
2059 cc.nextStreamID += 2
2060 cc.streams[cs.ID] = cs
2061 if cs.ID == 0 {
2062 panic("assigned stream ID 0")
2063 }
2064 }
2065
2066 func (cc *ClientConn) forgetStreamID(id uint32) {
2067 cc.mu.Lock()
2068 slen := len(cc.streams)
2069 delete(cc.streams, id)
2070 if len(cc.streams) != slen-1 {
2071 panic("forgetting unknown stream id")
2072 }
2073 cc.lastActive = time.Now()
2074 if len(cc.streams) == 0 && cc.idleTimer != nil {
2075 cc.idleTimer.Reset(cc.idleTimeout)
2076 cc.lastIdle = time.Now()
2077 }
2078
2079
2080 cc.cond.Broadcast()
2081
2082 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
2083 if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
2084 if VerboseLogs {
2085 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
2086 }
2087 cc.closed = true
2088 defer cc.closeConn()
2089 }
2090
2091 cc.mu.Unlock()
2092 }
2093
2094
2095 type clientConnReadLoop struct {
2096 _ incomparable
2097 cc *ClientConn
2098 }
2099
2100
2101 func (cc *ClientConn) readLoop() {
2102 rl := &clientConnReadLoop{cc: cc}
2103 defer rl.cleanup()
2104 cc.readerErr = rl.run()
2105 if ce, ok := cc.readerErr.(ConnectionError); ok {
2106 cc.wmu.Lock()
2107 cc.fr.WriteGoAway(0, ErrCode(ce), nil)
2108 cc.wmu.Unlock()
2109 }
2110 }
2111
2112
2113
2114 type GoAwayError struct {
2115 LastStreamID uint32
2116 ErrCode ErrCode
2117 DebugData string
2118 }
2119
2120 func (e GoAwayError) Error() string {
2121 return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
2122 e.LastStreamID, e.ErrCode, e.DebugData)
2123 }
2124
2125 func isEOFOrNetReadError(err error) bool {
2126 if err == io.EOF {
2127 return true
2128 }
2129 ne, ok := err.(*net.OpError)
2130 return ok && ne.Op == "read"
2131 }
2132
2133 func (rl *clientConnReadLoop) cleanup() {
2134 cc := rl.cc
2135 defer cc.closeConn()
2136 defer close(cc.readerDone)
2137
2138 if cc.idleTimer != nil {
2139 cc.idleTimer.Stop()
2140 }
2141
2142
2143
2144
2145 err := cc.readerErr
2146 cc.mu.Lock()
2147 if cc.goAway != nil && isEOFOrNetReadError(err) {
2148 err = GoAwayError{
2149 LastStreamID: cc.goAway.LastStreamID,
2150 ErrCode: cc.goAway.ErrCode,
2151 DebugData: cc.goAwayDebug,
2152 }
2153 } else if err == io.EOF {
2154 err = io.ErrUnexpectedEOF
2155 }
2156 cc.closed = true
2157
2158
2159
2160
2161
2162
2163
2164 unusedWaitTime := 5 * time.Second
2165 if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
2166 unusedWaitTime = cc.idleTimeout
2167 }
2168 idleTime := time.Now().Sub(cc.lastActive)
2169 if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
2170 cc.idleTimer = time.AfterFunc(unusedWaitTime-idleTime, func() {
2171 cc.t.connPool().MarkDead(cc)
2172 })
2173 } else {
2174 cc.mu.Unlock()
2175 cc.t.connPool().MarkDead(cc)
2176 cc.mu.Lock()
2177 }
2178
2179 for _, cs := range cc.streams {
2180 select {
2181 case <-cs.peerClosed:
2182
2183
2184 default:
2185 cs.abortStreamLocked(err)
2186 }
2187 }
2188 cc.cond.Broadcast()
2189 cc.mu.Unlock()
2190
2191 if !cc.seenSettings {
2192
2193
2194 cc.extendedConnectAllowed = true
2195 close(cc.seenSettingsChan)
2196 }
2197 }
2198
2199
2200
2201 func (cc *ClientConn) countReadFrameError(err error) {
2202 f := cc.t.CountError
2203 if f == nil || err == nil {
2204 return
2205 }
2206 if ce, ok := err.(ConnectionError); ok {
2207 errCode := ErrCode(ce)
2208 f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
2209 return
2210 }
2211 if errors.Is(err, io.EOF) {
2212 f("read_frame_eof")
2213 return
2214 }
2215 if errors.Is(err, io.ErrUnexpectedEOF) {
2216 f("read_frame_unexpected_eof")
2217 return
2218 }
2219 if errors.Is(err, ErrFrameTooLarge) {
2220 f("read_frame_too_large")
2221 return
2222 }
2223 f("read_frame_other")
2224 }
2225
2226 func (rl *clientConnReadLoop) run() error {
2227 cc := rl.cc
2228 gotSettings := false
2229 readIdleTimeout := cc.readIdleTimeout
2230 var t *time.Timer
2231 if readIdleTimeout != 0 {
2232 t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
2233 }
2234 for {
2235 f, err := cc.fr.ReadFrame()
2236 if t != nil {
2237 t.Reset(readIdleTimeout)
2238 }
2239 if err != nil {
2240 cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
2241 }
2242 if se, ok := err.(StreamError); ok {
2243 if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
2244 if se.Cause == nil {
2245 se.Cause = cc.fr.errDetail
2246 }
2247 rl.endStreamError(cs, se)
2248 }
2249 continue
2250 } else if err != nil {
2251 cc.countReadFrameError(err)
2252 return err
2253 }
2254 if VerboseLogs {
2255 cc.vlogf("http2: Transport received %s", summarizeFrame(f))
2256 }
2257 if !gotSettings {
2258 if _, ok := f.(*SettingsFrame); !ok {
2259 cc.logf("protocol error: received %T before a SETTINGS frame", f)
2260 return ConnectionError(ErrCodeProtocol)
2261 }
2262 gotSettings = true
2263 }
2264
2265 switch f := f.(type) {
2266 case *MetaHeadersFrame:
2267 err = rl.processHeaders(f)
2268 case *DataFrame:
2269 err = rl.processData(f)
2270 case *GoAwayFrame:
2271 err = rl.processGoAway(f)
2272 case *RSTStreamFrame:
2273 err = rl.processResetStream(f)
2274 case *SettingsFrame:
2275 err = rl.processSettings(f)
2276 case *PushPromiseFrame:
2277 err = rl.processPushPromise(f)
2278 case *WindowUpdateFrame:
2279 err = rl.processWindowUpdate(f)
2280 case *PingFrame:
2281 err = rl.processPing(f)
2282 default:
2283 cc.logf("Transport: unhandled response frame type %T", f)
2284 }
2285 if err != nil {
2286 if VerboseLogs {
2287 cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
2288 }
2289 return err
2290 }
2291 }
2292 }
2293
2294 func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
2295 cs := rl.streamByID(f.StreamID, headerOrDataFrame)
2296 if cs == nil {
2297
2298
2299
2300 return nil
2301 }
2302 if cs.readClosed {
2303 rl.endStreamError(cs, StreamError{
2304 StreamID: f.StreamID,
2305 Code: ErrCodeProtocol,
2306 Cause: errors.New("protocol error: headers after END_STREAM"),
2307 })
2308 return nil
2309 }
2310 if !cs.firstByte {
2311 if cs.trace != nil {
2312
2313
2314
2315
2316 traceFirstResponseByte(cs.trace)
2317 }
2318 cs.firstByte = true
2319 }
2320 if !cs.pastHeaders {
2321 cs.pastHeaders = true
2322 } else {
2323 return rl.processTrailers(cs, f)
2324 }
2325
2326 res, err := rl.handleResponse(cs, f)
2327 if err != nil {
2328 if _, ok := err.(ConnectionError); ok {
2329 return err
2330 }
2331
2332 rl.endStreamError(cs, StreamError{
2333 StreamID: f.StreamID,
2334 Code: ErrCodeProtocol,
2335 Cause: err,
2336 })
2337 return nil
2338 }
2339 if res == nil {
2340
2341 return nil
2342 }
2343 cs.res = res
2344 close(cs.respHeaderRecv)
2345 if f.StreamEnded() {
2346 rl.endStream(cs)
2347 }
2348 return nil
2349 }
2350
2351
2352
2353
2354
2355
2356
2357 func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*ClientResponse, error) {
2358 if f.Truncated {
2359 return nil, errResponseHeaderListSize
2360 }
2361
2362 status := f.PseudoValue("status")
2363 if status == "" {
2364 return nil, errors.New("malformed response from server: missing status pseudo header")
2365 }
2366 statusCode, err := strconv.Atoi(status)
2367 if err != nil {
2368 return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
2369 }
2370
2371 regularFields := f.RegularFields()
2372 strs := make([]string, len(regularFields))
2373 header := make(Header, len(regularFields))
2374 res := &cs.staticResp
2375 cs.staticResp = ClientResponse{
2376 Header: header,
2377 StatusCode: statusCode,
2378 Status: status,
2379 }
2380 for _, hf := range regularFields {
2381 key := httpcommon.CanonicalHeader(hf.Name)
2382 if key == "Trailer" {
2383 t := res.Trailer
2384 if t == nil {
2385 t = make(Header)
2386 res.Trailer = t
2387 }
2388 foreachHeaderElement(hf.Value, func(v string) {
2389 t[httpcommon.CanonicalHeader(v)] = nil
2390 })
2391 } else {
2392 vv := header[key]
2393 if vv == nil && len(strs) > 0 {
2394
2395
2396
2397
2398 vv, strs = strs[:1:1], strs[1:]
2399 vv[0] = hf.Value
2400 header[key] = vv
2401 } else {
2402 header[key] = append(vv, hf.Value)
2403 }
2404 }
2405 }
2406
2407 if statusCode >= 100 && statusCode <= 199 {
2408 if f.StreamEnded() {
2409 return nil, errors.New("1xx informational response with END_STREAM flag")
2410 }
2411 if fn := cs.get1xxTraceFunc(); fn != nil {
2412
2413
2414
2415 if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
2416 return nil, err
2417 }
2418 } else {
2419
2420
2421
2422
2423
2424
2425
2426 limit := int64(cs.cc.t.maxHeaderListSize())
2427 if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes() > limit {
2428 limit = t1.MaxResponseHeaderBytes()
2429 }
2430 for _, h := range f.Fields {
2431 cs.totalHeaderSize += int64(h.Size())
2432 }
2433 if cs.totalHeaderSize > limit {
2434 if VerboseLogs {
2435 log.Printf("http2: 1xx informational responses too large")
2436 }
2437 return nil, errors.New("header list too large")
2438 }
2439 }
2440 if statusCode == 100 {
2441 traceGot100Continue(cs.trace)
2442 select {
2443 case cs.on100 <- struct{}{}:
2444 default:
2445 }
2446 }
2447 cs.pastHeaders = false
2448 return nil, nil
2449 }
2450
2451 res.ContentLength = -1
2452 if clens := res.Header["Content-Length"]; len(clens) == 1 {
2453 if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
2454 res.ContentLength = int64(cl)
2455 } else {
2456
2457
2458 }
2459 } else if len(clens) > 1 {
2460
2461
2462 } else if f.StreamEnded() && !cs.isHead {
2463 res.ContentLength = 0
2464 }
2465
2466 if cs.isHead {
2467 res.Body = NoBody
2468 return res, nil
2469 }
2470
2471 if f.StreamEnded() {
2472 if res.ContentLength > 0 {
2473 res.Body = missingBody{}
2474 } else {
2475 res.Body = NoBody
2476 }
2477 return res, nil
2478 }
2479
2480 cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
2481 cs.bytesRemain = res.ContentLength
2482 res.Body = transportResponseBody{cs}
2483
2484 if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
2485 res.Header.Del("Content-Encoding")
2486 res.Header.Del("Content-Length")
2487 res.ContentLength = -1
2488 res.Body = &gzipReader{body: res.Body}
2489 res.Uncompressed = true
2490 }
2491 return res, nil
2492 }
2493
2494 func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
2495 if cs.pastTrailers {
2496
2497 return ConnectionError(ErrCodeProtocol)
2498 }
2499 cs.pastTrailers = true
2500 if !f.StreamEnded() {
2501
2502
2503 return ConnectionError(ErrCodeProtocol)
2504 }
2505 if len(f.PseudoFields()) > 0 {
2506
2507
2508 return ConnectionError(ErrCodeProtocol)
2509 }
2510
2511 trailer := make(Header)
2512 for _, hf := range f.RegularFields() {
2513 key := httpcommon.CanonicalHeader(hf.Name)
2514 trailer[key] = append(trailer[key], hf.Value)
2515 }
2516 cs.trailer = trailer
2517
2518 rl.endStream(cs)
2519 return nil
2520 }
2521
2522
2523
2524 type transportResponseBody struct {
2525 cs *clientStream
2526 }
2527
2528 func (b transportResponseBody) Read(p []byte) (n int, err error) {
2529 cs := b.cs
2530 cc := cs.cc
2531
2532 if cs.readErr != nil {
2533 return 0, cs.readErr
2534 }
2535 n, err = b.cs.bufPipe.Read(p)
2536 if cs.bytesRemain != -1 {
2537 if int64(n) > cs.bytesRemain {
2538 n = int(cs.bytesRemain)
2539 if err == nil {
2540 err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
2541 cs.abortStream(err)
2542 }
2543 cs.readErr = err
2544 return int(cs.bytesRemain), err
2545 }
2546 cs.bytesRemain -= int64(n)
2547 if err == io.EOF && cs.bytesRemain > 0 {
2548 err = io.ErrUnexpectedEOF
2549 cs.readErr = err
2550 return n, err
2551 }
2552 }
2553 if n == 0 {
2554
2555 return
2556 }
2557
2558 cc.mu.Lock()
2559 connAdd := cc.inflow.add(n)
2560 var streamAdd int32
2561 if err == nil {
2562 streamAdd = cs.inflow.add(n)
2563 }
2564 cc.mu.Unlock()
2565
2566 if connAdd != 0 || streamAdd != 0 {
2567 cc.wmu.Lock()
2568 defer cc.wmu.Unlock()
2569 if connAdd != 0 {
2570 cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
2571 }
2572 if streamAdd != 0 {
2573 cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
2574 }
2575 cc.bw.Flush()
2576 }
2577 return
2578 }
2579
2580 var errClosedResponseBody = errors.New("http2: response body closed")
2581
2582 func (b transportResponseBody) Close() error {
2583 cs := b.cs
2584 cc := cs.cc
2585
2586 cs.bufPipe.BreakWithError(errClosedResponseBody)
2587 cs.abortStream(errClosedResponseBody)
2588
2589 unread := cs.bufPipe.Len()
2590 if unread > 0 {
2591 cc.mu.Lock()
2592
2593 connAdd := cc.inflow.add(unread)
2594 cc.mu.Unlock()
2595
2596
2597
2598 cc.wmu.Lock()
2599
2600 if connAdd > 0 {
2601 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2602 }
2603 cc.bw.Flush()
2604 cc.wmu.Unlock()
2605 }
2606
2607 select {
2608 case <-cs.donec:
2609 case <-cs.ctx.Done():
2610
2611
2612
2613 return nil
2614 case <-cs.reqCancel:
2615 return errRequestCanceled
2616 }
2617 return nil
2618 }
2619
2620 func (rl *clientConnReadLoop) processData(f *DataFrame) error {
2621 cc := rl.cc
2622 cs := rl.streamByID(f.StreamID, headerOrDataFrame)
2623 data := f.Data()
2624 if cs == nil {
2625 cc.mu.Lock()
2626 neverSent := cc.nextStreamID
2627 cc.mu.Unlock()
2628 if f.StreamID >= neverSent {
2629
2630 cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
2631 return ConnectionError(ErrCodeProtocol)
2632 }
2633
2634
2635
2636
2637
2638
2639 if f.Length > 0 {
2640 cc.mu.Lock()
2641 ok := cc.inflow.take(f.Length)
2642 connAdd := cc.inflow.add(int(f.Length))
2643 cc.mu.Unlock()
2644 if !ok {
2645 return ConnectionError(ErrCodeFlowControl)
2646 }
2647 if connAdd > 0 {
2648 cc.wmu.Lock()
2649 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2650 cc.bw.Flush()
2651 cc.wmu.Unlock()
2652 }
2653 }
2654 return nil
2655 }
2656 if cs.readClosed {
2657 cc.logf("protocol error: received DATA after END_STREAM")
2658 rl.endStreamError(cs, StreamError{
2659 StreamID: f.StreamID,
2660 Code: ErrCodeProtocol,
2661 })
2662 return nil
2663 }
2664 if !cs.pastHeaders {
2665 cc.logf("protocol error: received DATA before a HEADERS frame")
2666 rl.endStreamError(cs, StreamError{
2667 StreamID: f.StreamID,
2668 Code: ErrCodeProtocol,
2669 })
2670 return nil
2671 }
2672 if f.Length > 0 {
2673 if cs.isHead && len(data) > 0 {
2674 cc.logf("protocol error: received DATA on a HEAD request")
2675 rl.endStreamError(cs, StreamError{
2676 StreamID: f.StreamID,
2677 Code: ErrCodeProtocol,
2678 })
2679 return nil
2680 }
2681
2682 cc.mu.Lock()
2683 if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
2684 cc.mu.Unlock()
2685 return ConnectionError(ErrCodeFlowControl)
2686 }
2687
2688
2689 var refund int
2690 if pad := int(f.Length) - len(data); pad > 0 {
2691 refund += pad
2692 }
2693
2694 didReset := false
2695 var err error
2696 if len(data) > 0 {
2697 if _, err = cs.bufPipe.Write(data); err != nil {
2698
2699
2700 didReset = true
2701 refund += len(data)
2702 }
2703 }
2704
2705 sendConn := cc.inflow.add(refund)
2706 var sendStream int32
2707 if !didReset {
2708 sendStream = cs.inflow.add(refund)
2709 }
2710 cc.mu.Unlock()
2711
2712 if sendConn > 0 || sendStream > 0 {
2713 cc.wmu.Lock()
2714 if sendConn > 0 {
2715 cc.fr.WriteWindowUpdate(0, uint32(sendConn))
2716 }
2717 if sendStream > 0 {
2718 cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
2719 }
2720 cc.bw.Flush()
2721 cc.wmu.Unlock()
2722 }
2723
2724 if err != nil {
2725 rl.endStreamError(cs, err)
2726 return nil
2727 }
2728 }
2729
2730 if f.StreamEnded() {
2731 rl.endStream(cs)
2732 }
2733 return nil
2734 }
2735
2736 func (rl *clientConnReadLoop) endStream(cs *clientStream) {
2737
2738
2739 if !cs.readClosed {
2740 cs.readClosed = true
2741
2742
2743
2744
2745 rl.cc.mu.Lock()
2746 defer rl.cc.mu.Unlock()
2747 cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
2748 close(cs.peerClosed)
2749 }
2750 }
2751
2752 func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
2753 cs.readAborted = true
2754 cs.abortStream(err)
2755 }
2756
2757 func (rl *clientConnReadLoop) endStreamErrorLocked(cs *clientStream, err error) {
2758 cs.readAborted = true
2759 cs.abortStreamLocked(err)
2760 }
2761
2762
2763 const (
2764 headerOrDataFrame = true
2765 notHeaderOrDataFrame = false
2766 )
2767
2768
2769
2770 func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
2771 rl.cc.mu.Lock()
2772 defer rl.cc.mu.Unlock()
2773 if headerOrData {
2774
2775
2776 rl.cc.rstStreamPingsBlocked = false
2777 }
2778 rl.cc.readBeforeStreamID = rl.cc.nextStreamID
2779 cs := rl.cc.streams[id]
2780 if cs != nil && !cs.readAborted {
2781 return cs
2782 }
2783 return nil
2784 }
2785
2786 func (cs *clientStream) copyTrailers() {
2787 for k, vv := range cs.trailer {
2788 t := cs.resTrailer
2789 if *t == nil {
2790 *t = make(Header)
2791 }
2792 (*t)[k] = vv
2793 }
2794 }
2795
2796 func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
2797 cc := rl.cc
2798 cc.t.connPool().MarkDead(cc)
2799 if f.ErrCode != 0 {
2800
2801 cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
2802 if fn := cc.t.CountError; fn != nil {
2803 fn("recv_goaway_" + f.ErrCode.stringToken())
2804 }
2805 }
2806 cc.setGoAway(f)
2807 return nil
2808 }
2809
2810 func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
2811 cc := rl.cc
2812
2813
2814 cc.wmu.Lock()
2815 defer cc.wmu.Unlock()
2816
2817 if err := rl.processSettingsNoWrite(f); err != nil {
2818 return err
2819 }
2820 if !f.IsAck() {
2821 cc.fr.WriteSettingsAck()
2822 cc.bw.Flush()
2823 }
2824 return nil
2825 }
2826
2827 func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
2828 cc := rl.cc
2829 defer cc.maybeCallStateHook()
2830 cc.mu.Lock()
2831 defer cc.mu.Unlock()
2832
2833 if f.IsAck() {
2834 if cc.wantSettingsAck {
2835 cc.wantSettingsAck = false
2836 return nil
2837 }
2838 return ConnectionError(ErrCodeProtocol)
2839 }
2840
2841 var seenMaxConcurrentStreams bool
2842 err := f.ForeachSetting(func(s Setting) error {
2843 switch s.ID {
2844 case SettingMaxFrameSize:
2845 cc.maxFrameSize = s.Val
2846 case SettingMaxConcurrentStreams:
2847 cc.maxConcurrentStreams = s.Val
2848 seenMaxConcurrentStreams = true
2849 case SettingMaxHeaderListSize:
2850 cc.peerMaxHeaderListSize = uint64(s.Val)
2851 case SettingInitialWindowSize:
2852
2853
2854
2855
2856 if s.Val > math.MaxInt32 {
2857 return ConnectionError(ErrCodeFlowControl)
2858 }
2859
2860
2861
2862
2863 delta := int32(s.Val) - int32(cc.initialWindowSize)
2864 for _, cs := range cc.streams {
2865 cs.flow.add(delta)
2866 }
2867 cc.cond.Broadcast()
2868
2869 cc.initialWindowSize = s.Val
2870 case SettingHeaderTableSize:
2871 cc.henc.SetMaxDynamicTableSize(s.Val)
2872 cc.peerMaxHeaderTableSize = s.Val
2873 case SettingEnableConnectProtocol:
2874 if err := s.Valid(); err != nil {
2875 return err
2876 }
2877
2878
2879
2880
2881
2882
2883
2884
2885 if !cc.seenSettings {
2886 cc.extendedConnectAllowed = s.Val == 1
2887 }
2888 default:
2889 cc.vlogf("Unhandled Setting: %v", s)
2890 }
2891 return nil
2892 })
2893 if err != nil {
2894 return err
2895 }
2896
2897 if !cc.seenSettings {
2898 if !seenMaxConcurrentStreams {
2899
2900
2901
2902
2903 cc.maxConcurrentStreams = defaultMaxConcurrentStreams
2904 }
2905 close(cc.seenSettingsChan)
2906 cc.seenSettings = true
2907 }
2908
2909 return nil
2910 }
2911
2912 func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
2913 cc := rl.cc
2914 cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
2915 if f.StreamID != 0 && cs == nil {
2916 return nil
2917 }
2918
2919 cc.mu.Lock()
2920 defer cc.mu.Unlock()
2921
2922 fl := &cc.flow
2923 if cs != nil {
2924 fl = &cs.flow
2925 }
2926 if !fl.add(int32(f.Increment)) {
2927
2928 if cs != nil {
2929 rl.endStreamErrorLocked(cs, StreamError{
2930 StreamID: f.StreamID,
2931 Code: ErrCodeFlowControl,
2932 })
2933 return nil
2934 }
2935
2936 return ConnectionError(ErrCodeFlowControl)
2937 }
2938 cc.cond.Broadcast()
2939 return nil
2940 }
2941
2942 func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
2943 cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
2944 if cs == nil {
2945
2946 return nil
2947 }
2948 serr := streamError(cs.ID, f.ErrCode)
2949 serr.Cause = errFromPeer
2950 if f.ErrCode == ErrCodeProtocol {
2951 rl.cc.SetDoNotReuse()
2952 }
2953 if fn := cs.cc.t.CountError; fn != nil {
2954 fn("recv_rststream_" + f.ErrCode.stringToken())
2955 }
2956 cs.abortStream(serr)
2957
2958 cs.bufPipe.CloseWithError(serr)
2959 return nil
2960 }
2961
2962
2963 func (cc *ClientConn) Ping(ctx context.Context) error {
2964 c := make(chan struct{})
2965
2966 var p [8]byte
2967 for {
2968 if _, err := rand.Read(p[:]); err != nil {
2969 return err
2970 }
2971 cc.mu.Lock()
2972
2973 if _, found := cc.pings[p]; !found {
2974 cc.pings[p] = c
2975 cc.mu.Unlock()
2976 break
2977 }
2978 cc.mu.Unlock()
2979 }
2980 var pingError error
2981 errc := make(chan struct{})
2982 go func() {
2983 cc.wmu.Lock()
2984 defer cc.wmu.Unlock()
2985 if pingError = cc.fr.WritePing(false, p); pingError != nil {
2986 close(errc)
2987 return
2988 }
2989 if pingError = cc.bw.Flush(); pingError != nil {
2990 close(errc)
2991 return
2992 }
2993 }()
2994 select {
2995 case <-c:
2996 return nil
2997 case <-errc:
2998 return pingError
2999 case <-ctx.Done():
3000 return ctx.Err()
3001 case <-cc.readerDone:
3002
3003 return cc.readerErr
3004 }
3005 }
3006
3007 func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
3008 if f.IsAck() {
3009 cc := rl.cc
3010 defer cc.maybeCallStateHook()
3011 cc.mu.Lock()
3012 defer cc.mu.Unlock()
3013
3014 if c, ok := cc.pings[f.Data]; ok {
3015 close(c)
3016 delete(cc.pings, f.Data)
3017 }
3018 if cc.pendingResets > 0 {
3019
3020 cc.pendingResets = 0
3021 cc.rstStreamPingsBlocked = true
3022 cc.cond.Broadcast()
3023 }
3024 return nil
3025 }
3026 cc := rl.cc
3027 cc.wmu.Lock()
3028 defer cc.wmu.Unlock()
3029 if err := cc.fr.WritePing(true, f.Data); err != nil {
3030 return err
3031 }
3032 return cc.bw.Flush()
3033 }
3034
3035 func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
3036
3037
3038
3039
3040
3041
3042
3043 return ConnectionError(ErrCodeProtocol)
3044 }
3045
3046
3047
3048 func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
3049
3050
3051
3052
3053 cc.wmu.Lock()
3054 cc.fr.WriteRSTStream(streamID, code)
3055 if ping {
3056 var payload [8]byte
3057 rand.Read(payload[:])
3058 cc.fr.WritePing(false, payload)
3059 }
3060 cc.bw.Flush()
3061 cc.wmu.Unlock()
3062 }
3063
3064 var (
3065 errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
3066 errRequestHeaderListSize = httpcommon.ErrRequestHeaderListSize
3067 )
3068
3069 func (cc *ClientConn) logf(format string, args ...interface{}) {
3070 cc.t.logf(format, args...)
3071 }
3072
3073 func (cc *ClientConn) vlogf(format string, args ...interface{}) {
3074 cc.t.vlogf(format, args...)
3075 }
3076
3077 func (t *Transport) vlogf(format string, args ...interface{}) {
3078 if VerboseLogs {
3079 t.logf(format, args...)
3080 }
3081 }
3082
3083 func (t *Transport) logf(format string, args ...interface{}) {
3084 log.Printf(format, args...)
3085 }
3086
3087 type missingBody struct{}
3088
3089 func (missingBody) Close() error { return nil }
3090 func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
3091
3092 func strSliceContains(ss []string, s string) bool {
3093 for _, v := range ss {
3094 if v == s {
3095 return true
3096 }
3097 }
3098 return false
3099 }
3100
3101 type erringRoundTripper struct{ err error }
3102
3103 func (rt erringRoundTripper) RoundTripErr() error { return rt.err }
3104 func (rt erringRoundTripper) RoundTrip(*ClientRequest) (*ClientResponse, error) { return nil, rt.err }
3105
3106 var errConcurrentReadOnResBody = errors.New("http2: concurrent read on response body")
3107
3108
3109
3110
3111
3112 type gzipReader struct {
3113 _ incomparable
3114 body io.ReadCloser
3115 mu sync.Mutex
3116 zr *gzip.Reader
3117 zerr error
3118 }
3119
3120 type eofReader struct{}
3121
3122 func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
3123 func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
3124
3125 var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
3126
3127
3128 func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
3129 zr := gzipPool.Get().(*gzip.Reader)
3130 if err := zr.Reset(r); err != nil {
3131 gzipPoolPut(zr)
3132 return nil, err
3133 }
3134 return zr, nil
3135 }
3136
3137
3138 func gzipPoolPut(zr *gzip.Reader) {
3139
3140
3141 var r flate.Reader = eofReader{}
3142 zr.Reset(r)
3143 gzipPool.Put(zr)
3144 }
3145
3146
3147
3148 func (gz *gzipReader) acquire() (*gzip.Reader, error) {
3149 gz.mu.Lock()
3150 defer gz.mu.Unlock()
3151 if gz.zerr != nil {
3152 return nil, gz.zerr
3153 }
3154 if gz.zr == nil {
3155 gz.zr, gz.zerr = gzipPoolGet(gz.body)
3156 if gz.zerr != nil {
3157 return nil, gz.zerr
3158 }
3159 }
3160 ret := gz.zr
3161 gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
3162 return ret, nil
3163 }
3164
3165
3166 func (gz *gzipReader) release(zr *gzip.Reader) {
3167 gz.mu.Lock()
3168 defer gz.mu.Unlock()
3169 if gz.zerr == errConcurrentReadOnResBody {
3170 gz.zr, gz.zerr = zr, nil
3171 } else {
3172 gzipPoolPut(zr)
3173 }
3174 }
3175
3176
3177
3178 func (gz *gzipReader) close() {
3179 gz.mu.Lock()
3180 defer gz.mu.Unlock()
3181 if gz.zerr == nil && gz.zr != nil {
3182 gzipPoolPut(gz.zr)
3183 gz.zr = nil
3184 }
3185 gz.zerr = fs.ErrClosed
3186 }
3187
3188 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3189 zr, err := gz.acquire()
3190 if err != nil {
3191 return 0, err
3192 }
3193 defer gz.release(zr)
3194
3195 return zr.Read(p)
3196 }
3197
3198 func (gz *gzipReader) Close() error {
3199 gz.close()
3200
3201 return gz.body.Close()
3202 }
3203
3204
3205
3206 func isConnectionCloseRequest(req *ClientRequest) bool {
3207 return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
3208 }
3209
3210
3211
3212 type NetHTTPClientConn struct {
3213 cc *ClientConn
3214 }
3215
3216 func (cc NetHTTPClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
3217 return cc.cc.RoundTrip(req)
3218 }
3219
3220 func (cc NetHTTPClientConn) Close() error {
3221 return cc.cc.Close()
3222 }
3223
3224 func (cc NetHTTPClientConn) Err() error {
3225 cc.cc.mu.Lock()
3226 defer cc.cc.mu.Unlock()
3227 if cc.cc.closed {
3228 return errors.New("connection closed")
3229 }
3230 return nil
3231 }
3232
3233 func (cc NetHTTPClientConn) Reserve() error {
3234 defer cc.cc.maybeCallStateHook()
3235 cc.cc.mu.Lock()
3236 defer cc.cc.mu.Unlock()
3237 if !cc.cc.canReserveLocked() {
3238 return errors.New("connection is unavailable")
3239 }
3240 cc.cc.streamsReserved++
3241 return nil
3242 }
3243
3244 func (cc NetHTTPClientConn) Release() {
3245 defer cc.cc.maybeCallStateHook()
3246 cc.cc.mu.Lock()
3247 defer cc.cc.mu.Unlock()
3248
3249
3250
3251
3252 if cc.cc.streamsReserved > 0 {
3253 cc.cc.streamsReserved--
3254 }
3255 }
3256
3257 func (cc NetHTTPClientConn) Available() int {
3258 cc.cc.mu.Lock()
3259 defer cc.cc.mu.Unlock()
3260 return cc.cc.availableLocked()
3261 }
3262
3263 func (cc NetHTTPClientConn) InFlight() int {
3264 cc.cc.mu.Lock()
3265 defer cc.cc.mu.Unlock()
3266 return cc.cc.currentRequestCountLocked()
3267 }
3268
3269 func (cc *ClientConn) maybeCallStateHook() {
3270 if cc.internalStateHook != nil {
3271 cc.internalStateHook()
3272 }
3273 }
3274
3275 func (t *Transport) idleConnTimeout() time.Duration {
3276
3277
3278
3279 if t.IdleConnTimeout != 0 {
3280 return t.IdleConnTimeout
3281 }
3282
3283 if t.t1 != nil {
3284 return t.t1.IdleConnTimeout()
3285 }
3286
3287 return 0
3288 }
3289
3290 func traceGetConn(req *ClientRequest, hostPort string) {
3291 trace := httptrace.ContextClientTrace(req.Context)
3292 if trace == nil || trace.GetConn == nil {
3293 return
3294 }
3295 trace.GetConn(hostPort)
3296 }
3297
3298 func traceGotConn(req *ClientRequest, cc *ClientConn, reused bool) {
3299 trace := httptrace.ContextClientTrace(req.Context)
3300 if trace == nil || trace.GotConn == nil {
3301 return
3302 }
3303 ci := httptrace.GotConnInfo{Conn: cc.tconn}
3304 ci.Reused = reused
3305 cc.mu.Lock()
3306 ci.WasIdle = len(cc.streams) == 0 && reused
3307 if ci.WasIdle && !cc.lastActive.IsZero() {
3308 ci.IdleTime = time.Since(cc.lastActive)
3309 }
3310 cc.mu.Unlock()
3311
3312 trace.GotConn(ci)
3313 }
3314
3315 func traceWroteHeaders(trace *httptrace.ClientTrace) {
3316 if trace != nil && trace.WroteHeaders != nil {
3317 trace.WroteHeaders()
3318 }
3319 }
3320
3321 func traceGot100Continue(trace *httptrace.ClientTrace) {
3322 if trace != nil && trace.Got100Continue != nil {
3323 trace.Got100Continue()
3324 }
3325 }
3326
3327 func traceWait100Continue(trace *httptrace.ClientTrace) {
3328 if trace != nil && trace.Wait100Continue != nil {
3329 trace.Wait100Continue()
3330 }
3331 }
3332
3333 func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
3334 if trace != nil && trace.WroteRequest != nil {
3335 trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
3336 }
3337 }
3338
3339 func traceFirstResponseByte(trace *httptrace.ClientTrace) {
3340 if trace != nil && trace.GotFirstResponseByte != nil {
3341 trace.GotFirstResponseByte()
3342 }
3343 }
3344
3345 func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
3346 if trace != nil {
3347 return trace.Got1xxResponse
3348 }
3349 return nil
3350 }
3351
3352
3353
3354 func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
3355 dialer := &tls.Dialer{
3356 Config: cfg,
3357 }
3358 cn, err := dialer.DialContext(ctx, network, addr)
3359 if err != nil {
3360 return nil, err
3361 }
3362 tlsCn := cn.(*tls.Conn)
3363 return tlsCn, nil
3364 }
3365
View as plain text