$ go version

go version go1.20.5 darwin/arm64

DefaultTransport

看一下 DefaultTransport 的 RoundTrip 方法。

  1// src/net/http/roundtrip.go#L8
  2// RoundTrip implements the RoundTripper interface.
  3//
  4// For higher-level HTTP client support (such as handling of cookies
  5// and redirects), see Get, Post, and the Client type.
  6//
  7// Like the RoundTripper interface, the error types returned
  8// by RoundTrip are unspecified.
  9func (t *Transport) RoundTrip(req *Request) (*Response, error) {
 10	return t.roundTrip(req)
 11}
 12
 13// src/net/http/transport.go#L510
 14// roundTrip implements a RoundTripper over HTTP.
 15func (t *Transport) roundTrip(req *Request) (*Response, error) {
 16	t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
 17	ctx := req.Context()
 18	trace := httptrace.ContextClientTrace(ctx)
 19
 20	if req.URL == nil {
 21		req.closeBody()
 22		return nil, errors.New("http: nil Request.URL")
 23	}
 24	if req.Header == nil {
 25		req.closeBody()
 26		return nil, errors.New("http: nil Request.Header")
 27	}
 28	scheme := req.URL.Scheme
 29	isHTTP := scheme == "http" || scheme == "https"
 30	if isHTTP {
 31		for k, vv := range req.Header {
 32			if !httpguts.ValidHeaderFieldName(k) {
 33				req.closeBody()
 34				return nil, fmt.Errorf("net/http: invalid header field name %q", k)
 35			}
 36			for _, v := range vv {
 37				if !httpguts.ValidHeaderFieldValue(v) {
 38					req.closeBody()
 39					// Don't include the value in the error, because it may be sensitive.
 40					return nil, fmt.Errorf("net/http: invalid header field value for %q", k)
 41				}
 42			}
 43		}
 44	}
 45
 46	origReq := req
 47	cancelKey := cancelKey{origReq}
 48	req = setupRewindBody(req)
 49
 50	if altRT := t.alternateRoundTripper(req); altRT != nil {
 51		if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
 52			return resp, err
 53		}
 54		var err error
 55		req, err = rewindBody(req)
 56		if err != nil {
 57			return nil, err
 58		}
 59	}
 60	if !isHTTP {
 61		req.closeBody()
 62		return nil, badStringError("unsupported protocol scheme", scheme)
 63	}
 64	if req.Method != "" && !validMethod(req.Method) {
 65		req.closeBody()
 66		return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
 67	}
 68	if req.URL.Host == "" {
 69		req.closeBody()
 70		return nil, errors.New("http: no Host in request URL")
 71	}
 72
 73	for {
 74		select {
 75		case <-ctx.Done():
 76			req.closeBody()
 77			return nil, ctx.Err()
 78		default:
 79		}
 80
 81		// treq gets modified by roundTrip, so we need to recreate for each retry.
 82		treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
 83		cm, err := t.connectMethodForRequest(treq)
 84		if err != nil {
 85			req.closeBody()
 86			return nil, err
 87		}
 88
 89		// Get the cached or newly-created connection to either the
 90		// host (for http or https), the http proxy, or the http proxy
 91		// pre-CONNECTed to https server. In any case, we'll be ready
 92		// to send it requests.
 93		pconn, err := t.getConn(treq, cm)
 94		if err != nil {
 95			t.setReqCanceler(cancelKey, nil)
 96			req.closeBody()
 97			return nil, err
 98		}
 99
100		var resp *Response
101		if pconn.alt != nil {
102			// HTTP/2 path.
103			t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
104			resp, err = pconn.alt.RoundTrip(req)
105		} else {
106			resp, err = pconn.roundTrip(treq)
107		}
108		if err == nil {
109			resp.Request = origReq
110			return resp, nil
111		}
112
113		// Failed. Clean up and determine whether to retry.
114		if http2isNoCachedConnError(err) {
115			if t.removeIdleConn(pconn) {
116				t.decConnsPerHost(pconn.cacheKey)
117			}
118		} else if !pconn.shouldRetryRequest(req, err) {
119			// Issue 16465: return underlying net.Conn.Read error from peek,
120			// as we've historically done.
121			if e, ok := err.(nothingWrittenError); ok {
122				err = e.error
123			}
124			if e, ok := err.(transportReadFromServerError); ok {
125				err = e.err
126			}
127			return nil, err
128		}
129		testHookRoundTripRetried()
130
131		// Rewind the body if we're able to.
132		req, err = rewindBody(req)
133		if err != nil {
134			return nil, err
135		}
136	}
137}

因为 RoundTripper 说明了请求的 URL 和 Header 必须初始化,所以一开始检查报错。再往后,如果 schema 是 http 的话,检查了一下 header 的键值对的非法字符。再往后,注意到有一个 setupRewindBody,是用于恢复请求 body 的

rewindBody

 1// src/net/http/transport.go#L635
 2var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
 3
 4type readTrackingBody struct {
 5	io.ReadCloser
 6	didRead  bool
 7	didClose bool
 8}
 9
10func (r *readTrackingBody) Read(data []byte) (int, error) {
11	r.didRead = true
12	return r.ReadCloser.Read(data)
13}
14
15func (r *readTrackingBody) Close() error {
16	r.didClose = true
17	return r.ReadCloser.Close()
18}
19
20// setupRewindBody returns a new request with a custom body wrapper
21// that can report whether the body needs rewinding.
22// This lets rewindBody avoid an error result when the request
23// does not have GetBody but the body hasn't been read at all yet.
24func setupRewindBody(req *Request) *Request {
25	if req.Body == nil || req.Body == NoBody {
26		return req
27	}
28	newReq := *req
29	newReq.Body = &readTrackingBody{ReadCloser: req.Body}
30	return &newReq
31}
32
33// rewindBody returns a new request with the body rewound.
34// It returns req unmodified if the body does not need rewinding.
35// rewindBody takes care of closing req.Body when appropriate
36// (in all cases except when rewindBody returns req unmodified).
37func rewindBody(req *Request) (rewound *Request, err error) {
38	if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose) {
39		return req, nil // nothing to rewind
40	}
41	if !req.Body.(*readTrackingBody).didClose {
42		req.closeBody()
43	}
44	if req.GetBody == nil {
45		return nil, errCannotRewind
46	}
47	body, err := req.GetBody()
48	if err != nil {
49		return nil, err
50	}
51	newReq := *req
52	newReq.Body = &readTrackingBody{ReadCloser: body}
53	return &newReq, nil
54}

setup 很简单,就是用 readTrackingBody 对原来的 body 包装了一下。再看 rewind,先判断了是否使用过,如果没读过也没关闭,不需要 rewind,如果读过没关闭,关闭掉原来的,再调用 GetBody 来获取一份新的 body,重新包装进 readTrackingBody 返回,方便下一次 rewind。 继续看 roundTrip。有一个判断 alternative round tripper 的,如果找到可供替代的 round tripper 就直接调用这个 round tripper 的 RoundTrip 方法。看看 alternateRoundTripper 是怎么找的。

alternative round tripper

 1// src/net/http/transport.go#L486
 2// useRegisteredProtocol reports whether an alternate protocol (as registered
 3// with Transport.RegisterProtocol) should be respected for this request.
 4func (t *Transport) useRegisteredProtocol(req *Request) bool {
 5	if req.URL.Scheme == "https" && req.requiresHTTP1() {
 6		// If this request requires HTTP/1, don't use the
 7		// "https" alternate protocol, which is used by the
 8		// HTTP/2 code to take over requests if there's an
 9		// existing cached HTTP/2 connection.
10		return false
11	}
12	return true
13}
14
15// alternateRoundTripper returns the alternate RoundTripper to use
16// for this request if the Request's URL scheme requires one,
17// or nil for the normal case of using the Transport.
18func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
19	if !t.useRegisteredProtocol(req) {
20		return nil
21	}
22	altProto, _ := t.altProto.Load().(map[string]RoundTripper)
23	return altProto[req.URL.Scheme]
24}

从 altProto 这个 map 里按 schema 去查找,再看看谁往 altProto 里存的数据。

 1// src/net/http/transport.go#L742
 2// RegisterProtocol registers a new protocol with scheme.
 3// The Transport will pass requests using the given scheme to rt.
 4// It is rt's responsibility to simulate HTTP request semantics.
 5//
 6// RegisterProtocol can be used by other packages to provide
 7// implementations of protocol schemes like "ftp" or "file".
 8//
 9// If rt.RoundTrip returns ErrSkipAltProtocol, the Transport will
10// handle the RoundTrip itself for that one request, as if the
11// protocol were not registered.
12func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
13	t.altMu.Lock()
14	defer t.altMu.Unlock()
15	oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
16	if _, exists := oldMap[scheme]; exists {
17		panic("protocol " + scheme + " already registered")
18	}
19	newMap := make(map[string]RoundTripper)
20	for k, v := range oldMap {
21		newMap[k] = v
22	}
23	newMap[scheme] = rt
24	t.altProto.Store(newMap)
25}

所以其实我们可以通过 RegisterProtocol 来增加除 http, https 之外的 schema 的处理机制。比如下面这样来来处理 file:// 类型的链接,http.Get 就可以直接读取本地的文件内容。

1http.DefaultTransport.(*http.Transport).RegisterProtocol("file", http.NewFileTransport(http.Dir("/")))
2http.Get("file:///path/to/file")

再接着看 roundTrip。又是一些简单的检查,就进入到一个循环,循环加入了 select 来实现超时控制,循环里主要又调用了这几个方法,具体看看

Transport.connectMethodForRequest

 1// src/net/http/transport.go#L839
 2func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
 3	cm.targetScheme = treq.URL.Scheme
 4	cm.targetAddr = canonicalAddr(treq.URL)
 5	if t.Proxy != nil {
 6		cm.proxyURL, err = t.Proxy(treq.Request)
 7	}
 8	cm.onlyH1 = treq.requiresHTTP1()
 9	return cm, err
10}

其实很简单,主要是判断了要不要用代理。如果 Transport.Proxy 不为 nil,就调用获取代理地址。建立连接的时候就会通过代理。

Transport.getConn

 1// src/net/http/transport.go#L1333
 2// getConn dials and creates a new persistConn to the target as
 3// specified in the connectMethod. This includes doing a proxy CONNECT
 4// and/or setting up TLS.  If this doesn't return an error, the persistConn
 5// is ready to write requests to.
 6func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
 7	req := treq.Request
 8	trace := treq.trace
 9	ctx := req.Context()
10	if trace != nil && trace.GetConn != nil {
11		trace.GetConn(cm.addr())
12	}
13
14	w := &wantConn{
15		cm:         cm,
16		key:        cm.key(),
17		ctx:        ctx,
18		ready:      make(chan struct{}, 1),
19		beforeDial: testHookPrePendingDial,
20		afterDial:  testHookPostPendingDial,
21	}
22	defer func() {
23		if err != nil {
24			w.cancel(t, err)
25		}
26	}()
27
28	// Queue for idle connection.
29	if delivered := t.queueForIdleConn(w); delivered {
30		pc := w.pc
31		// Trace only for HTTP/1.
32		// HTTP/2 calls trace.GotConn itself.
33		if pc.alt == nil && trace != nil && trace.GotConn != nil {
34			trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
35		}
36		// set request canceler to some non-nil function so we
37		// can detect whether it was cleared between now and when
38		// we enter roundTrip
39		t.setReqCanceler(treq.cancelKey, func(error) {})
40		return pc, nil
41	}
42
43	cancelc := make(chan error, 1)
44	t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })
45
46	// Queue for permission to dial.
47	t.queueForDial(w)
48
49	// Wait for completion or cancellation.
50	select {
51	case <-w.ready:
52		// Trace success but only for HTTP/1.
53		// HTTP/2 calls trace.GotConn itself.
54		if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
55			trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
56		}
57		if w.err != nil {
58			// If the request has been canceled, that's probably
59			// what caused w.err; if so, prefer to return the
60			// cancellation error (see golang.org/issue/16049).
61			select {
62			case <-req.Cancel:
63				return nil, errRequestCanceledConn
64			case <-req.Context().Done():
65				return nil, req.Context().Err()
66			case err := <-cancelc:
67				if err == errRequestCanceled {
68					err = errRequestCanceledConn
69				}
70				return nil, err
71			default:
72				// return below
73			}
74		}
75		return w.pc, w.err
76	case <-req.Cancel:
77		return nil, errRequestCanceledConn
78	case <-req.Context().Done():
79		return nil, req.Context().Err()
80	case err := <-cancelc:
81		if err == errRequestCanceled {
82			err = errRequestCanceledConn
83		}
84		return nil, err
85	}
86}

这个方法主要是做连接复用,检查有没有适合的缓存住的已经建立好的连接,如果有的话就直接使用,不再创建新连接。如果没有,则创建新的连接并且看看可不可以缓存住。主要看看建立新连接的部分。

dialConn

  1// src/net/http/transport.go#L1419
  2// queueForDial queues w to wait for permission to begin dialing.
  3// Once w receives permission to dial, it will do so in a separate goroutine.
  4func (t *Transport) queueForDial(w *wantConn) {
  5	w.beforeDial()
  6	if t.MaxConnsPerHost <= 0 {
  7		go t.dialConnFor(w)
  8		return
  9	}
 10
 11	t.connsPerHostMu.Lock()
 12	defer t.connsPerHostMu.Unlock()
 13
 14	if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
 15		if t.connsPerHost == nil {
 16			t.connsPerHost = make(map[connectMethodKey]int)
 17		}
 18		t.connsPerHost[w.key] = n + 1
 19		go t.dialConnFor(w)
 20		return
 21	}
 22
 23	if t.connsPerHostWait == nil {
 24		t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
 25	}
 26	q := t.connsPerHostWait[w.key]
 27	q.cleanFront()
 28	q.pushBack(w)
 29	t.connsPerHostWait[w.key] = q
 30}
 31
 32// dialConnFor dials on behalf of w and delivers the result to w.
 33// dialConnFor has received permission to dial w.cm and is counted in t.connCount[w.cm.key()].
 34// If the dial is canceled or unsuccessful, dialConnFor decrements t.connCount[w.cm.key()].
 35func (t *Transport) dialConnFor(w *wantConn) {
 36	defer w.afterDial()
 37
 38	pc, err := t.dialConn(w.ctx, w.cm)
 39	delivered := w.tryDeliver(pc, err)
 40	if err == nil && (!delivered || pc.alt != nil) {
 41		// pconn was not passed to w,
 42		// or it is HTTP/2 and can be shared.
 43		// Add to the idle connection pool.
 44		t.putOrCloseIdleConn(pc)
 45	}
 46	if err != nil {
 47		t.decConnsPerHost(w.key)
 48	}
 49}
 50
 51// src/net/http/transport.go#L1569
 52func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
 53	pconn = &persistConn{
 54		t:             t,
 55		cacheKey:      cm.key(),
 56		reqch:         make(chan requestAndChan, 1),
 57		writech:       make(chan writeRequest, 1),
 58		closech:       make(chan struct{}),
 59		writeErrCh:    make(chan error, 1),
 60		writeLoopDone: make(chan struct{}),
 61	}
 62	trace := httptrace.ContextClientTrace(ctx)
 63	wrapErr := func(err error) error {
 64		if cm.proxyURL != nil {
 65			// Return a typed error, per Issue 16997
 66			return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
 67		}
 68		return err
 69	}
 70	if cm.scheme() == "https" && t.hasCustomTLSDialer() {
 71		var err error
 72		pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
 73		if err != nil {
 74			return nil, wrapErr(err)
 75		}
 76		if tc, ok := pconn.conn.(*tls.Conn); ok {
 77			// Handshake here, in case DialTLS didn't. TLSNextProto below
 78			// depends on it for knowing the connection state.
 79			if trace != nil && trace.TLSHandshakeStart != nil {
 80				trace.TLSHandshakeStart()
 81			}
 82			if err := tc.HandshakeContext(ctx); err != nil {
 83				go pconn.conn.Close()
 84				if trace != nil && trace.TLSHandshakeDone != nil {
 85					trace.TLSHandshakeDone(tls.ConnectionState{}, err)
 86				}
 87				return nil, err
 88			}
 89			cs := tc.ConnectionState()
 90			if trace != nil && trace.TLSHandshakeDone != nil {
 91				trace.TLSHandshakeDone(cs, nil)
 92			}
 93			pconn.tlsState = &cs
 94		}
 95	} else {
 96		conn, err := t.dial(ctx, "tcp", cm.addr())
 97		if err != nil {
 98			return nil, wrapErr(err)
 99		}
100		pconn.conn = conn
101		if cm.scheme() == "https" {
102			var firstTLSHost string
103			if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
104				return nil, wrapErr(err)
105			}
106			if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
107				return nil, wrapErr(err)
108			}
109		}
110	}
111
112	// Proxy setup.
113	switch {
114	case cm.proxyURL == nil:
115		// Do nothing. Not using a proxy.
116	case cm.proxyURL.Scheme == "socks5":
117		conn := pconn.conn
118		d := socksNewDialer("tcp", conn.RemoteAddr().String())
119		if u := cm.proxyURL.User; u != nil {
120			auth := &socksUsernamePassword{
121				Username: u.Username(),
122			}
123			auth.Password, _ = u.Password()
124			d.AuthMethods = []socksAuthMethod{
125				socksAuthMethodNotRequired,
126				socksAuthMethodUsernamePassword,
127			}
128			d.Authenticate = auth.Authenticate
129		}
130		if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
131			conn.Close()
132			return nil, err
133		}
134	case cm.targetScheme == "http":
135		pconn.isProxy = true
136		if pa := cm.proxyAuth(); pa != "" {
137			pconn.mutateHeaderFunc = func(h Header) {
138				h.Set("Proxy-Authorization", pa)
139			}
140		}
141	case cm.targetScheme == "https":
142		conn := pconn.conn
143		var hdr Header
144		if t.GetProxyConnectHeader != nil {
145			var err error
146			hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
147			if err != nil {
148				conn.Close()
149				return nil, err
150			}
151		} else {
152			hdr = t.ProxyConnectHeader
153		}
154		if hdr == nil {
155			hdr = make(Header)
156		}
157		if pa := cm.proxyAuth(); pa != "" {
158			hdr = hdr.Clone()
159			hdr.Set("Proxy-Authorization", pa)
160		}
161		connectReq := &Request{
162			Method: "CONNECT",
163			URL:    &url.URL{Opaque: cm.targetAddr},
164			Host:   cm.targetAddr,
165			Header: hdr,
166		}
167
168		// If there's no done channel (no deadline or cancellation
169		// from the caller possible), at least set some (long)
170		// timeout here. This will make sure we don't block forever
171		// and leak a goroutine if the connection stops replying
172		// after the TCP connect.
173		connectCtx := ctx
174		if ctx.Done() == nil {
175			newCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
176			defer cancel()
177			connectCtx = newCtx
178		}
179
180		didReadResponse := make(chan struct{}) // closed after CONNECT write+read is done or fails
181		var (
182			resp *Response
183			err  error // write or read error
184		)
185		// Write the CONNECT request & read the response.
186		go func() {
187			defer close(didReadResponse)
188			err = connectReq.Write(conn)
189			if err != nil {
190				return
191			}
192			// Okay to use and discard buffered reader here, because
193			// TLS server will not speak until spoken to.
194			br := bufio.NewReader(conn)
195			resp, err = ReadResponse(br, connectReq)
196		}()
197		select {
198		case <-connectCtx.Done():
199			conn.Close()
200			<-didReadResponse
201			return nil, connectCtx.Err()
202		case <-didReadResponse:
203			// resp or err now set
204		}
205		if err != nil {
206			conn.Close()
207			return nil, err
208		}
209
210		if t.OnProxyConnectResponse != nil {
211			err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
212			if err != nil {
213				return nil, err
214			}
215		}
216
217		if resp.StatusCode != 200 {
218			_, text, ok := strings.Cut(resp.Status, " ")
219			conn.Close()
220			if !ok {
221				return nil, errors.New("unknown status code")
222			}
223			return nil, errors.New(text)
224		}
225	}
226
227	if cm.proxyURL != nil && cm.targetScheme == "https" {
228		if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
229			return nil, err
230		}
231	}
232
233	if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
234		if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
235			alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
236			if e, ok := alt.(erringRoundTripper); ok {
237				// pconn.conn was closed by next (http2configureTransports.upgradeFn).
238				return nil, e.RoundTripErr()
239			}
240			return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
241		}
242	}
243
244	pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
245	pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
246
247	go pconn.readLoop()
248	go pconn.writeLoop()
249	return pconn, nil
250}

也很清楚,先是做了一些连接限制的判断,再检查要不要用代理,tls 握手什么的。最主要的是连接建立完成后,又 go 了两个 goroutine 去负责从读取和写入数据。可以看看具体是干什么。

readLoop & writeLoop

  1// src/net/http/transport.go#L2071
  2func (pc *persistConn) readLoop() {
  3	closeErr := errReadLoopExiting // default value, if not changed below
  4	defer func() {
  5		pc.close(closeErr)
  6		pc.t.removeIdleConn(pc)
  7	}()
  8
  9	tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
 10		if err := pc.t.tryPutIdleConn(pc); err != nil {
 11			closeErr = err
 12			if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
 13				trace.PutIdleConn(err)
 14			}
 15			return false
 16		}
 17		if trace != nil && trace.PutIdleConn != nil {
 18			trace.PutIdleConn(nil)
 19		}
 20		return true
 21	}
 22
 23	// eofc is used to block caller goroutines reading from Response.Body
 24	// at EOF until this goroutines has (potentially) added the connection
 25	// back to the idle pool.
 26	eofc := make(chan struct{})
 27	defer close(eofc) // unblock reader on errors
 28
 29	// Read this once, before loop starts. (to avoid races in tests)
 30	testHookMu.Lock()
 31	testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
 32	testHookMu.Unlock()
 33
 34	alive := true
 35	for alive {
 36		pc.readLimit = pc.maxHeaderResponseSize()
 37		_, err := pc.br.Peek(1)
 38
 39		pc.mu.Lock()
 40		if pc.numExpectedResponses == 0 {
 41			pc.readLoopPeekFailLocked(err)
 42			pc.mu.Unlock()
 43			return
 44		}
 45		pc.mu.Unlock()
 46
 47		rc := <-pc.reqch
 48		trace := httptrace.ContextClientTrace(rc.req.Context())
 49
 50		var resp *Response
 51		if err == nil {
 52			resp, err = pc.readResponse(rc, trace)
 53		} else {
 54			err = transportReadFromServerError{err}
 55			closeErr = err
 56		}
 57
 58		if err != nil {
 59			if pc.readLimit <= 0 {
 60				err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
 61			}
 62
 63			select {
 64			case rc.ch <- responseAndError{err: err}:
 65			case <-rc.callerGone:
 66				return
 67			}
 68			return
 69		}
 70		pc.readLimit = maxInt64 // effectively no limit for response bodies
 71
 72		pc.mu.Lock()
 73		pc.numExpectedResponses--
 74		pc.mu.Unlock()
 75
 76		bodyWritable := resp.bodyIsWritable()
 77		hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
 78
 79		if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
 80			// Don't do keep-alive on error if either party requested a close
 81			// or we get an unexpected informational (1xx) response.
 82			// StatusCode 100 is already handled above.
 83			alive = false
 84		}
 85
 86		if !hasBody || bodyWritable {
 87			replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil)
 88
 89			// Put the idle conn back into the pool before we send the response
 90			// so if they process it quickly and make another request, they'll
 91			// get this same conn. But we use the unbuffered channel 'rc'
 92			// to guarantee that persistConn.roundTrip got out of its select
 93			// potentially waiting for this persistConn to close.
 94			alive = alive &&
 95				!pc.sawEOF &&
 96				pc.wroteRequest() &&
 97				replaced && tryPutIdleConn(trace)
 98
 99			if bodyWritable {
100				closeErr = errCallerOwnsConn
101			}
102
103			select {
104			case rc.ch <- responseAndError{res: resp}:
105			case <-rc.callerGone:
106				return
107			}
108
109			// Now that they've read from the unbuffered channel, they're safely
110			// out of the select that also waits on this goroutine to die, so
111			// we're allowed to exit now if needed (if alive is false)
112			testHookReadLoopBeforeNextRead()
113			continue
114		}
115
116		waitForBodyRead := make(chan bool, 2)
117		body := &bodyEOFSignal{
118			body: resp.Body,
119			earlyCloseFn: func() error {
120				waitForBodyRead <- false
121				<-eofc // will be closed by deferred call at the end of the function
122				return nil
123
124			},
125			fn: func(err error) error {
126				isEOF := err == io.EOF
127				waitForBodyRead <- isEOF
128				if isEOF {
129					<-eofc // see comment above eofc declaration
130				} else if err != nil {
131					if cerr := pc.canceled(); cerr != nil {
132						return cerr
133					}
134				}
135				return err
136			},
137		}
138
139		resp.Body = body
140		if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
141			resp.Body = &gzipReader{body: body}
142			resp.Header.Del("Content-Encoding")
143			resp.Header.Del("Content-Length")
144			resp.ContentLength = -1
145			resp.Uncompressed = true
146		}
147
148		select {
149		case rc.ch <- responseAndError{res: resp}:
150		case <-rc.callerGone:
151			return
152		}
153
154		// Before looping back to the top of this function and peeking on
155		// the bufio.Reader, wait for the caller goroutine to finish
156		// reading the response body. (or for cancellation or death)
157		select {
158		case bodyEOF := <-waitForBodyRead:
159			replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
160			alive = alive &&
161				bodyEOF &&
162				!pc.sawEOF &&
163				pc.wroteRequest() &&
164				replaced && tryPutIdleConn(trace)
165			if bodyEOF {
166				eofc <- struct{}{}
167			}
168		case <-rc.req.Cancel:
169			alive = false
170			pc.t.CancelRequest(rc.req)
171		case <-rc.req.Context().Done():
172			alive = false
173			pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
174		case <-pc.closech:
175			alive = false
176		}
177
178		testHookReadLoopBeforeNextRead()
179	}
180}
181
182// src/net/http/transport.go#L2406
183func (pc *persistConn) writeLoop() {
184	defer close(pc.writeLoopDone)
185	for {
186		select {
187		case wr := <-pc.writech:
188			startBytesWritten := pc.nwrite
189			err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
190			if bre, ok := err.(requestBodyReadError); ok {
191				err = bre.error
192				// Errors reading from the user's
193				// Request.Body are high priority.
194				// Set it here before sending on the
195				// channels below or calling
196				// pc.close() which tears down
197				// connections and causes other
198				// errors.
199				wr.req.setError(err)
200			}
201			if err == nil {
202				err = pc.bw.Flush()
203			}
204			if err != nil {
205				if pc.nwrite == startBytesWritten {
206					err = nothingWrittenError{err}
207				}
208			}
209			pc.writeErrCh <- err // to the body reader, which might recycle us
210			wr.ch <- err         // to the roundTrip function
211			if err != nil {
212				pc.close(err)
213				return
214			}
215		case <-pc.closech:
216			return
217		}
218	}
219}

很多协议细节的部分,主要是把通过 channel 读到的新请求写入 tcp conn,和从 tcp conn 读到 response 后从 channel 传出去。需要注意的是在对 request 做一些修改之前,都是先 copy 了副本再做修改,因为前面讲了 RoundTripper 只能消费和关闭请求的 body,不能修改 Request。还有在读到 response body 后,对 body 进行了包装,来检查 body 是否被关闭,只有前一个 response 的 body 被关闭之后,才会从这个 conn 去读取下一个 response,所以不关闭 response body 会造成连接泄漏,一直是 alive 等待关闭的,下一个请求找不到 idle conn,又新创建连接,导致连接越来越多。

bodyEOFSignal

 1// src/net/http/transport.go#L2758
 2// bodyEOFSignal is used by the HTTP/1 transport when reading response
 3// bodies to make sure we see the end of a response body before
 4// proceeding and reading on the connection again.
 5//
 6// It wraps a ReadCloser but runs fn (if non-nil) at most
 7// once, right before its final (error-producing) Read or Close call
 8// returns. fn should return the new error to return from Read or Close.
 9//
10// If earlyCloseFn is non-nil and Close is called before io.EOF is
11// seen, earlyCloseFn is called instead of fn, and its return value is
12// the return value from Close.
13type bodyEOFSignal struct {
14	body         io.ReadCloser
15	mu           sync.Mutex        // guards following 4 fields
16	closed       bool              // whether Close has been called
17	rerr         error             // sticky Read error
18	fn           func(error) error // err will be nil on Read io.EOF
19	earlyCloseFn func() error      // optional alt Close func used if io.EOF not seen
20}
21
22var errReadOnClosedResBody = errors.New("http: read on closed response body")
23
24func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
25	es.mu.Lock()
26	closed, rerr := es.closed, es.rerr
27	es.mu.Unlock()
28	if closed {
29		return 0, errReadOnClosedResBody
30	}
31	if rerr != nil {
32		return 0, rerr
33	}
34
35	n, err = es.body.Read(p)
36	if err != nil {
37		es.mu.Lock()
38		defer es.mu.Unlock()
39		if es.rerr == nil {
40			es.rerr = err
41		}
42		err = es.condfn(err)
43	}
44	return
45}
46
47func (es *bodyEOFSignal) Close() error {
48	es.mu.Lock()
49	defer es.mu.Unlock()
50	if es.closed {
51		return nil
52	}
53	es.closed = true
54	if es.earlyCloseFn != nil && es.rerr != io.EOF {
55		return es.earlyCloseFn()
56	}
57	err := es.body.Close()
58	return es.condfn(err)
59}

persistConn.roundTrip

  1// src/net/http/transport.go#L2551
  2func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
  3	testHookEnterRoundTrip()
  4	if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) {
  5		pc.t.putOrCloseIdleConn(pc)
  6		return nil, errRequestCanceled
  7	}
  8	pc.mu.Lock()
  9	pc.numExpectedResponses++
 10	headerFn := pc.mutateHeaderFunc
 11	pc.mu.Unlock()
 12
 13	if headerFn != nil {
 14		headerFn(req.extraHeaders())
 15	}
 16
 17	// Ask for a compressed version if the caller didn't set their
 18	// own value for Accept-Encoding. We only attempt to
 19	// uncompress the gzip stream if we were the layer that
 20	// requested it.
 21	requestedGzip := false
 22	if !pc.t.DisableCompression &&
 23		req.Header.Get("Accept-Encoding") == "" &&
 24		req.Header.Get("Range") == "" &&
 25		req.Method != "HEAD" {
 26		// Request gzip only, not deflate. Deflate is ambiguous and
 27		// not as universally supported anyway.
 28		// See: https://zlib.net/zlib_faq.html#faq39
 29		//
 30		// Note that we don't request this for HEAD requests,
 31		// due to a bug in nginx:
 32		//   https://trac.nginx.org/nginx/ticket/358
 33		//   https://golang.org/issue/5522
 34		//
 35		// We don't request gzip if the request is for a range, since
 36		// auto-decoding a portion of a gzipped document will just fail
 37		// anyway. See https://golang.org/issue/8923
 38		requestedGzip = true
 39		req.extraHeaders().Set("Accept-Encoding", "gzip")
 40	}
 41
 42	var continueCh chan struct{}
 43	if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
 44		continueCh = make(chan struct{}, 1)
 45	}
 46
 47	if pc.t.DisableKeepAlives &&
 48		!req.wantsClose() &&
 49		!isProtocolSwitchHeader(req.Header) {
 50		req.extraHeaders().Set("Connection", "close")
 51	}
 52
 53	gone := make(chan struct{})
 54	defer close(gone)
 55
 56	defer func() {
 57		if err != nil {
 58			pc.t.setReqCanceler(req.cancelKey, nil)
 59		}
 60	}()
 61
 62	const debugRoundTrip = false
 63
 64	// Write the request concurrently with waiting for a response,
 65	// in case the server decides to reply before reading our full
 66	// request body.
 67	startBytesWritten := pc.nwrite
 68	writeErrCh := make(chan error, 1)
 69	pc.writech <- writeRequest{req, writeErrCh, continueCh}
 70
 71	resc := make(chan responseAndError)
 72	pc.reqch <- requestAndChan{
 73		req:        req.Request,
 74		cancelKey:  req.cancelKey,
 75		ch:         resc,
 76		addedGzip:  requestedGzip,
 77		continueCh: continueCh,
 78		callerGone: gone,
 79	}
 80
 81	var respHeaderTimer <-chan time.Time
 82	cancelChan := req.Request.Cancel
 83	ctxDoneChan := req.Context().Done()
 84	pcClosed := pc.closech
 85	canceled := false
 86	for {
 87		testHookWaitResLoop()
 88		select {
 89		case err := <-writeErrCh:
 90			if debugRoundTrip {
 91				req.logf("writeErrCh resv: %T/%#v", err, err)
 92			}
 93			if err != nil {
 94				pc.close(fmt.Errorf("write error: %w", err))
 95				return nil, pc.mapRoundTripError(req, startBytesWritten, err)
 96			}
 97			if d := pc.t.ResponseHeaderTimeout; d > 0 {
 98				if debugRoundTrip {
 99					req.logf("starting timer for %v", d)
100				}
101				timer := time.NewTimer(d)
102				defer timer.Stop() // prevent leaks
103				respHeaderTimer = timer.C
104			}
105		case <-pcClosed:
106			pcClosed = nil
107			if canceled || pc.t.replaceReqCanceler(req.cancelKey, nil) {
108				if debugRoundTrip {
109					req.logf("closech recv: %T %#v", pc.closed, pc.closed)
110				}
111				return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
112			}
113		case <-respHeaderTimer:
114			if debugRoundTrip {
115				req.logf("timeout waiting for response headers.")
116			}
117			pc.close(errTimeout)
118			return nil, errTimeout
119		case re := <-resc:
120			if (re.res == nil) == (re.err == nil) {
121				panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
122			}
123			if debugRoundTrip {
124				req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
125			}
126			if re.err != nil {
127				return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
128			}
129			return re.res, nil
130		case <-cancelChan:
131			canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
132			cancelChan = nil
133		case <-ctxDoneChan:
134			canceled = pc.t.cancelRequest(req.cancelKey, req.Context().Err())
135			cancelChan = nil
136			ctxDoneChan = nil
137		}
138	}
139}

前面已经实现了连接的服用和 req resp 的收发,这里就只要简单处理,发送请求和等待结果就好了,还有全链路的超时控制。