$ go version
go version go1.20.5 darwin/arm64
DefaultTransport
看一下 DefaultTransport 的 RoundTrip 方法。
1// src/net/http/roundtrip.go#L8
2// RoundTrip implements the RoundTripper interface.
3//
4// For higher-level HTTP client support (such as handling of cookies
5// and redirects), see Get, Post, and the Client type.
6//
7// Like the RoundTripper interface, the error types returned
8// by RoundTrip are unspecified.
9func (t *Transport) RoundTrip(req *Request) (*Response, error) {
10 return t.roundTrip(req)
11}
12
13// src/net/http/transport.go#L510
14// roundTrip implements a RoundTripper over HTTP.
15func (t *Transport) roundTrip(req *Request) (*Response, error) {
16 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
17 ctx := req.Context()
18 trace := httptrace.ContextClientTrace(ctx)
19
20 if req.URL == nil {
21 req.closeBody()
22 return nil, errors.New("http: nil Request.URL")
23 }
24 if req.Header == nil {
25 req.closeBody()
26 return nil, errors.New("http: nil Request.Header")
27 }
28 scheme := req.URL.Scheme
29 isHTTP := scheme == "http" || scheme == "https"
30 if isHTTP {
31 for k, vv := range req.Header {
32 if !httpguts.ValidHeaderFieldName(k) {
33 req.closeBody()
34 return nil, fmt.Errorf("net/http: invalid header field name %q", k)
35 }
36 for _, v := range vv {
37 if !httpguts.ValidHeaderFieldValue(v) {
38 req.closeBody()
39 // Don't include the value in the error, because it may be sensitive.
40 return nil, fmt.Errorf("net/http: invalid header field value for %q", k)
41 }
42 }
43 }
44 }
45
46 origReq := req
47 cancelKey := cancelKey{origReq}
48 req = setupRewindBody(req)
49
50 if altRT := t.alternateRoundTripper(req); altRT != nil {
51 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
52 return resp, err
53 }
54 var err error
55 req, err = rewindBody(req)
56 if err != nil {
57 return nil, err
58 }
59 }
60 if !isHTTP {
61 req.closeBody()
62 return nil, badStringError("unsupported protocol scheme", scheme)
63 }
64 if req.Method != "" && !validMethod(req.Method) {
65 req.closeBody()
66 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
67 }
68 if req.URL.Host == "" {
69 req.closeBody()
70 return nil, errors.New("http: no Host in request URL")
71 }
72
73 for {
74 select {
75 case <-ctx.Done():
76 req.closeBody()
77 return nil, ctx.Err()
78 default:
79 }
80
81 // treq gets modified by roundTrip, so we need to recreate for each retry.
82 treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
83 cm, err := t.connectMethodForRequest(treq)
84 if err != nil {
85 req.closeBody()
86 return nil, err
87 }
88
89 // Get the cached or newly-created connection to either the
90 // host (for http or https), the http proxy, or the http proxy
91 // pre-CONNECTed to https server. In any case, we'll be ready
92 // to send it requests.
93 pconn, err := t.getConn(treq, cm)
94 if err != nil {
95 t.setReqCanceler(cancelKey, nil)
96 req.closeBody()
97 return nil, err
98 }
99
100 var resp *Response
101 if pconn.alt != nil {
102 // HTTP/2 path.
103 t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
104 resp, err = pconn.alt.RoundTrip(req)
105 } else {
106 resp, err = pconn.roundTrip(treq)
107 }
108 if err == nil {
109 resp.Request = origReq
110 return resp, nil
111 }
112
113 // Failed. Clean up and determine whether to retry.
114 if http2isNoCachedConnError(err) {
115 if t.removeIdleConn(pconn) {
116 t.decConnsPerHost(pconn.cacheKey)
117 }
118 } else if !pconn.shouldRetryRequest(req, err) {
119 // Issue 16465: return underlying net.Conn.Read error from peek,
120 // as we've historically done.
121 if e, ok := err.(nothingWrittenError); ok {
122 err = e.error
123 }
124 if e, ok := err.(transportReadFromServerError); ok {
125 err = e.err
126 }
127 return nil, err
128 }
129 testHookRoundTripRetried()
130
131 // Rewind the body if we're able to.
132 req, err = rewindBody(req)
133 if err != nil {
134 return nil, err
135 }
136 }
137}
因为 RoundTripper 说明了请求的 URL 和 Header 必须初始化,所以一开始检查报错。再往后,如果 schema 是 http 的话,检查了一下 header 的键值对的非法字符。再往后,注意到有一个 setupRewindBody,是用于恢复请求 body 的
rewindBody
1// src/net/http/transport.go#L635
2var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
3
4type readTrackingBody struct {
5 io.ReadCloser
6 didRead bool
7 didClose bool
8}
9
10func (r *readTrackingBody) Read(data []byte) (int, error) {
11 r.didRead = true
12 return r.ReadCloser.Read(data)
13}
14
15func (r *readTrackingBody) Close() error {
16 r.didClose = true
17 return r.ReadCloser.Close()
18}
19
20// setupRewindBody returns a new request with a custom body wrapper
21// that can report whether the body needs rewinding.
22// This lets rewindBody avoid an error result when the request
23// does not have GetBody but the body hasn't been read at all yet.
24func setupRewindBody(req *Request) *Request {
25 if req.Body == nil || req.Body == NoBody {
26 return req
27 }
28 newReq := *req
29 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
30 return &newReq
31}
32
33// rewindBody returns a new request with the body rewound.
34// It returns req unmodified if the body does not need rewinding.
35// rewindBody takes care of closing req.Body when appropriate
36// (in all cases except when rewindBody returns req unmodified).
37func rewindBody(req *Request) (rewound *Request, err error) {
38 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose) {
39 return req, nil // nothing to rewind
40 }
41 if !req.Body.(*readTrackingBody).didClose {
42 req.closeBody()
43 }
44 if req.GetBody == nil {
45 return nil, errCannotRewind
46 }
47 body, err := req.GetBody()
48 if err != nil {
49 return nil, err
50 }
51 newReq := *req
52 newReq.Body = &readTrackingBody{ReadCloser: body}
53 return &newReq, nil
54}
setup 很简单,就是用 readTrackingBody 对原来的 body 包装了一下。再看 rewind,先判断了是否使用过,如果没读过也没关闭,不需要 rewind,如果读过没关闭,关闭掉原来的,再调用 GetBody 来获取一份新的 body,重新包装进 readTrackingBody 返回,方便下一次 rewind。 继续看 roundTrip。有一个判断 alternative round tripper 的,如果找到可供替代的 round tripper 就直接调用这个 round tripper 的 RoundTrip 方法。看看 alternateRoundTripper 是怎么找的。
alternative round tripper
1// src/net/http/transport.go#L486
2// useRegisteredProtocol reports whether an alternate protocol (as registered
3// with Transport.RegisterProtocol) should be respected for this request.
4func (t *Transport) useRegisteredProtocol(req *Request) bool {
5 if req.URL.Scheme == "https" && req.requiresHTTP1() {
6 // If this request requires HTTP/1, don't use the
7 // "https" alternate protocol, which is used by the
8 // HTTP/2 code to take over requests if there's an
9 // existing cached HTTP/2 connection.
10 return false
11 }
12 return true
13}
14
15// alternateRoundTripper returns the alternate RoundTripper to use
16// for this request if the Request's URL scheme requires one,
17// or nil for the normal case of using the Transport.
18func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
19 if !t.useRegisteredProtocol(req) {
20 return nil
21 }
22 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
23 return altProto[req.URL.Scheme]
24}
从 altProto 这个 map 里按 schema 去查找,再看看谁往 altProto 里存的数据。
1// src/net/http/transport.go#L742
2// RegisterProtocol registers a new protocol with scheme.
3// The Transport will pass requests using the given scheme to rt.
4// It is rt's responsibility to simulate HTTP request semantics.
5//
6// RegisterProtocol can be used by other packages to provide
7// implementations of protocol schemes like "ftp" or "file".
8//
9// If rt.RoundTrip returns ErrSkipAltProtocol, the Transport will
10// handle the RoundTrip itself for that one request, as if the
11// protocol were not registered.
12func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
13 t.altMu.Lock()
14 defer t.altMu.Unlock()
15 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
16 if _, exists := oldMap[scheme]; exists {
17 panic("protocol " + scheme + " already registered")
18 }
19 newMap := make(map[string]RoundTripper)
20 for k, v := range oldMap {
21 newMap[k] = v
22 }
23 newMap[scheme] = rt
24 t.altProto.Store(newMap)
25}
所以其实我们可以通过 RegisterProtocol 来增加除 http, https 之外的 schema 的处理机制。比如下面这样来来处理 file:// 类型的链接,http.Get 就可以直接读取本地的文件内容。
1http.DefaultTransport.(*http.Transport).RegisterProtocol("file", http.NewFileTransport(http.Dir("/")))
2http.Get("file:///path/to/file")
再接着看 roundTrip。又是一些简单的检查,就进入到一个循环,循环加入了 select 来实现超时控制,循环里主要又调用了这几个方法,具体看看
Transport.connectMethodForRequest
1// src/net/http/transport.go#L839
2func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
3 cm.targetScheme = treq.URL.Scheme
4 cm.targetAddr = canonicalAddr(treq.URL)
5 if t.Proxy != nil {
6 cm.proxyURL, err = t.Proxy(treq.Request)
7 }
8 cm.onlyH1 = treq.requiresHTTP1()
9 return cm, err
10}
其实很简单,主要是判断了要不要用代理。如果 Transport.Proxy 不为 nil,就调用获取代理地址。建立连接的时候就会通过代理。
Transport.getConn
1// src/net/http/transport.go#L1333
2// getConn dials and creates a new persistConn to the target as
3// specified in the connectMethod. This includes doing a proxy CONNECT
4// and/or setting up TLS. If this doesn't return an error, the persistConn
5// is ready to write requests to.
6func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
7 req := treq.Request
8 trace := treq.trace
9 ctx := req.Context()
10 if trace != nil && trace.GetConn != nil {
11 trace.GetConn(cm.addr())
12 }
13
14 w := &wantConn{
15 cm: cm,
16 key: cm.key(),
17 ctx: ctx,
18 ready: make(chan struct{}, 1),
19 beforeDial: testHookPrePendingDial,
20 afterDial: testHookPostPendingDial,
21 }
22 defer func() {
23 if err != nil {
24 w.cancel(t, err)
25 }
26 }()
27
28 // Queue for idle connection.
29 if delivered := t.queueForIdleConn(w); delivered {
30 pc := w.pc
31 // Trace only for HTTP/1.
32 // HTTP/2 calls trace.GotConn itself.
33 if pc.alt == nil && trace != nil && trace.GotConn != nil {
34 trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
35 }
36 // set request canceler to some non-nil function so we
37 // can detect whether it was cleared between now and when
38 // we enter roundTrip
39 t.setReqCanceler(treq.cancelKey, func(error) {})
40 return pc, nil
41 }
42
43 cancelc := make(chan error, 1)
44 t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })
45
46 // Queue for permission to dial.
47 t.queueForDial(w)
48
49 // Wait for completion or cancellation.
50 select {
51 case <-w.ready:
52 // Trace success but only for HTTP/1.
53 // HTTP/2 calls trace.GotConn itself.
54 if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
55 trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
56 }
57 if w.err != nil {
58 // If the request has been canceled, that's probably
59 // what caused w.err; if so, prefer to return the
60 // cancellation error (see golang.org/issue/16049).
61 select {
62 case <-req.Cancel:
63 return nil, errRequestCanceledConn
64 case <-req.Context().Done():
65 return nil, req.Context().Err()
66 case err := <-cancelc:
67 if err == errRequestCanceled {
68 err = errRequestCanceledConn
69 }
70 return nil, err
71 default:
72 // return below
73 }
74 }
75 return w.pc, w.err
76 case <-req.Cancel:
77 return nil, errRequestCanceledConn
78 case <-req.Context().Done():
79 return nil, req.Context().Err()
80 case err := <-cancelc:
81 if err == errRequestCanceled {
82 err = errRequestCanceledConn
83 }
84 return nil, err
85 }
86}
这个方法主要是做连接复用,检查有没有适合的缓存住的已经建立好的连接,如果有的话就直接使用,不再创建新连接。如果没有,则创建新的连接并且看看可不可以缓存住。主要看看建立新连接的部分。
dialConn
1// src/net/http/transport.go#L1419
2// queueForDial queues w to wait for permission to begin dialing.
3// Once w receives permission to dial, it will do so in a separate goroutine.
4func (t *Transport) queueForDial(w *wantConn) {
5 w.beforeDial()
6 if t.MaxConnsPerHost <= 0 {
7 go t.dialConnFor(w)
8 return
9 }
10
11 t.connsPerHostMu.Lock()
12 defer t.connsPerHostMu.Unlock()
13
14 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
15 if t.connsPerHost == nil {
16 t.connsPerHost = make(map[connectMethodKey]int)
17 }
18 t.connsPerHost[w.key] = n + 1
19 go t.dialConnFor(w)
20 return
21 }
22
23 if t.connsPerHostWait == nil {
24 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
25 }
26 q := t.connsPerHostWait[w.key]
27 q.cleanFront()
28 q.pushBack(w)
29 t.connsPerHostWait[w.key] = q
30}
31
32// dialConnFor dials on behalf of w and delivers the result to w.
33// dialConnFor has received permission to dial w.cm and is counted in t.connCount[w.cm.key()].
34// If the dial is canceled or unsuccessful, dialConnFor decrements t.connCount[w.cm.key()].
35func (t *Transport) dialConnFor(w *wantConn) {
36 defer w.afterDial()
37
38 pc, err := t.dialConn(w.ctx, w.cm)
39 delivered := w.tryDeliver(pc, err)
40 if err == nil && (!delivered || pc.alt != nil) {
41 // pconn was not passed to w,
42 // or it is HTTP/2 and can be shared.
43 // Add to the idle connection pool.
44 t.putOrCloseIdleConn(pc)
45 }
46 if err != nil {
47 t.decConnsPerHost(w.key)
48 }
49}
50
51// src/net/http/transport.go#L1569
52func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
53 pconn = &persistConn{
54 t: t,
55 cacheKey: cm.key(),
56 reqch: make(chan requestAndChan, 1),
57 writech: make(chan writeRequest, 1),
58 closech: make(chan struct{}),
59 writeErrCh: make(chan error, 1),
60 writeLoopDone: make(chan struct{}),
61 }
62 trace := httptrace.ContextClientTrace(ctx)
63 wrapErr := func(err error) error {
64 if cm.proxyURL != nil {
65 // Return a typed error, per Issue 16997
66 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
67 }
68 return err
69 }
70 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
71 var err error
72 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
73 if err != nil {
74 return nil, wrapErr(err)
75 }
76 if tc, ok := pconn.conn.(*tls.Conn); ok {
77 // Handshake here, in case DialTLS didn't. TLSNextProto below
78 // depends on it for knowing the connection state.
79 if trace != nil && trace.TLSHandshakeStart != nil {
80 trace.TLSHandshakeStart()
81 }
82 if err := tc.HandshakeContext(ctx); err != nil {
83 go pconn.conn.Close()
84 if trace != nil && trace.TLSHandshakeDone != nil {
85 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
86 }
87 return nil, err
88 }
89 cs := tc.ConnectionState()
90 if trace != nil && trace.TLSHandshakeDone != nil {
91 trace.TLSHandshakeDone(cs, nil)
92 }
93 pconn.tlsState = &cs
94 }
95 } else {
96 conn, err := t.dial(ctx, "tcp", cm.addr())
97 if err != nil {
98 return nil, wrapErr(err)
99 }
100 pconn.conn = conn
101 if cm.scheme() == "https" {
102 var firstTLSHost string
103 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
104 return nil, wrapErr(err)
105 }
106 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
107 return nil, wrapErr(err)
108 }
109 }
110 }
111
112 // Proxy setup.
113 switch {
114 case cm.proxyURL == nil:
115 // Do nothing. Not using a proxy.
116 case cm.proxyURL.Scheme == "socks5":
117 conn := pconn.conn
118 d := socksNewDialer("tcp", conn.RemoteAddr().String())
119 if u := cm.proxyURL.User; u != nil {
120 auth := &socksUsernamePassword{
121 Username: u.Username(),
122 }
123 auth.Password, _ = u.Password()
124 d.AuthMethods = []socksAuthMethod{
125 socksAuthMethodNotRequired,
126 socksAuthMethodUsernamePassword,
127 }
128 d.Authenticate = auth.Authenticate
129 }
130 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
131 conn.Close()
132 return nil, err
133 }
134 case cm.targetScheme == "http":
135 pconn.isProxy = true
136 if pa := cm.proxyAuth(); pa != "" {
137 pconn.mutateHeaderFunc = func(h Header) {
138 h.Set("Proxy-Authorization", pa)
139 }
140 }
141 case cm.targetScheme == "https":
142 conn := pconn.conn
143 var hdr Header
144 if t.GetProxyConnectHeader != nil {
145 var err error
146 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
147 if err != nil {
148 conn.Close()
149 return nil, err
150 }
151 } else {
152 hdr = t.ProxyConnectHeader
153 }
154 if hdr == nil {
155 hdr = make(Header)
156 }
157 if pa := cm.proxyAuth(); pa != "" {
158 hdr = hdr.Clone()
159 hdr.Set("Proxy-Authorization", pa)
160 }
161 connectReq := &Request{
162 Method: "CONNECT",
163 URL: &url.URL{Opaque: cm.targetAddr},
164 Host: cm.targetAddr,
165 Header: hdr,
166 }
167
168 // If there's no done channel (no deadline or cancellation
169 // from the caller possible), at least set some (long)
170 // timeout here. This will make sure we don't block forever
171 // and leak a goroutine if the connection stops replying
172 // after the TCP connect.
173 connectCtx := ctx
174 if ctx.Done() == nil {
175 newCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
176 defer cancel()
177 connectCtx = newCtx
178 }
179
180 didReadResponse := make(chan struct{}) // closed after CONNECT write+read is done or fails
181 var (
182 resp *Response
183 err error // write or read error
184 )
185 // Write the CONNECT request & read the response.
186 go func() {
187 defer close(didReadResponse)
188 err = connectReq.Write(conn)
189 if err != nil {
190 return
191 }
192 // Okay to use and discard buffered reader here, because
193 // TLS server will not speak until spoken to.
194 br := bufio.NewReader(conn)
195 resp, err = ReadResponse(br, connectReq)
196 }()
197 select {
198 case <-connectCtx.Done():
199 conn.Close()
200 <-didReadResponse
201 return nil, connectCtx.Err()
202 case <-didReadResponse:
203 // resp or err now set
204 }
205 if err != nil {
206 conn.Close()
207 return nil, err
208 }
209
210 if t.OnProxyConnectResponse != nil {
211 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
212 if err != nil {
213 return nil, err
214 }
215 }
216
217 if resp.StatusCode != 200 {
218 _, text, ok := strings.Cut(resp.Status, " ")
219 conn.Close()
220 if !ok {
221 return nil, errors.New("unknown status code")
222 }
223 return nil, errors.New(text)
224 }
225 }
226
227 if cm.proxyURL != nil && cm.targetScheme == "https" {
228 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
229 return nil, err
230 }
231 }
232
233 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
234 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
235 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
236 if e, ok := alt.(erringRoundTripper); ok {
237 // pconn.conn was closed by next (http2configureTransports.upgradeFn).
238 return nil, e.RoundTripErr()
239 }
240 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
241 }
242 }
243
244 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
245 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
246
247 go pconn.readLoop()
248 go pconn.writeLoop()
249 return pconn, nil
250}
也很清楚,先是做了一些连接限制的判断,再检查要不要用代理,tls 握手什么的。最主要的是连接建立完成后,又 go 了两个 goroutine 去负责从读取和写入数据。可以看看具体是干什么。
readLoop & writeLoop
1// src/net/http/transport.go#L2071
2func (pc *persistConn) readLoop() {
3 closeErr := errReadLoopExiting // default value, if not changed below
4 defer func() {
5 pc.close(closeErr)
6 pc.t.removeIdleConn(pc)
7 }()
8
9 tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
10 if err := pc.t.tryPutIdleConn(pc); err != nil {
11 closeErr = err
12 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
13 trace.PutIdleConn(err)
14 }
15 return false
16 }
17 if trace != nil && trace.PutIdleConn != nil {
18 trace.PutIdleConn(nil)
19 }
20 return true
21 }
22
23 // eofc is used to block caller goroutines reading from Response.Body
24 // at EOF until this goroutines has (potentially) added the connection
25 // back to the idle pool.
26 eofc := make(chan struct{})
27 defer close(eofc) // unblock reader on errors
28
29 // Read this once, before loop starts. (to avoid races in tests)
30 testHookMu.Lock()
31 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
32 testHookMu.Unlock()
33
34 alive := true
35 for alive {
36 pc.readLimit = pc.maxHeaderResponseSize()
37 _, err := pc.br.Peek(1)
38
39 pc.mu.Lock()
40 if pc.numExpectedResponses == 0 {
41 pc.readLoopPeekFailLocked(err)
42 pc.mu.Unlock()
43 return
44 }
45 pc.mu.Unlock()
46
47 rc := <-pc.reqch
48 trace := httptrace.ContextClientTrace(rc.req.Context())
49
50 var resp *Response
51 if err == nil {
52 resp, err = pc.readResponse(rc, trace)
53 } else {
54 err = transportReadFromServerError{err}
55 closeErr = err
56 }
57
58 if err != nil {
59 if pc.readLimit <= 0 {
60 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
61 }
62
63 select {
64 case rc.ch <- responseAndError{err: err}:
65 case <-rc.callerGone:
66 return
67 }
68 return
69 }
70 pc.readLimit = maxInt64 // effectively no limit for response bodies
71
72 pc.mu.Lock()
73 pc.numExpectedResponses--
74 pc.mu.Unlock()
75
76 bodyWritable := resp.bodyIsWritable()
77 hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
78
79 if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
80 // Don't do keep-alive on error if either party requested a close
81 // or we get an unexpected informational (1xx) response.
82 // StatusCode 100 is already handled above.
83 alive = false
84 }
85
86 if !hasBody || bodyWritable {
87 replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil)
88
89 // Put the idle conn back into the pool before we send the response
90 // so if they process it quickly and make another request, they'll
91 // get this same conn. But we use the unbuffered channel 'rc'
92 // to guarantee that persistConn.roundTrip got out of its select
93 // potentially waiting for this persistConn to close.
94 alive = alive &&
95 !pc.sawEOF &&
96 pc.wroteRequest() &&
97 replaced && tryPutIdleConn(trace)
98
99 if bodyWritable {
100 closeErr = errCallerOwnsConn
101 }
102
103 select {
104 case rc.ch <- responseAndError{res: resp}:
105 case <-rc.callerGone:
106 return
107 }
108
109 // Now that they've read from the unbuffered channel, they're safely
110 // out of the select that also waits on this goroutine to die, so
111 // we're allowed to exit now if needed (if alive is false)
112 testHookReadLoopBeforeNextRead()
113 continue
114 }
115
116 waitForBodyRead := make(chan bool, 2)
117 body := &bodyEOFSignal{
118 body: resp.Body,
119 earlyCloseFn: func() error {
120 waitForBodyRead <- false
121 <-eofc // will be closed by deferred call at the end of the function
122 return nil
123
124 },
125 fn: func(err error) error {
126 isEOF := err == io.EOF
127 waitForBodyRead <- isEOF
128 if isEOF {
129 <-eofc // see comment above eofc declaration
130 } else if err != nil {
131 if cerr := pc.canceled(); cerr != nil {
132 return cerr
133 }
134 }
135 return err
136 },
137 }
138
139 resp.Body = body
140 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
141 resp.Body = &gzipReader{body: body}
142 resp.Header.Del("Content-Encoding")
143 resp.Header.Del("Content-Length")
144 resp.ContentLength = -1
145 resp.Uncompressed = true
146 }
147
148 select {
149 case rc.ch <- responseAndError{res: resp}:
150 case <-rc.callerGone:
151 return
152 }
153
154 // Before looping back to the top of this function and peeking on
155 // the bufio.Reader, wait for the caller goroutine to finish
156 // reading the response body. (or for cancellation or death)
157 select {
158 case bodyEOF := <-waitForBodyRead:
159 replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
160 alive = alive &&
161 bodyEOF &&
162 !pc.sawEOF &&
163 pc.wroteRequest() &&
164 replaced && tryPutIdleConn(trace)
165 if bodyEOF {
166 eofc <- struct{}{}
167 }
168 case <-rc.req.Cancel:
169 alive = false
170 pc.t.CancelRequest(rc.req)
171 case <-rc.req.Context().Done():
172 alive = false
173 pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
174 case <-pc.closech:
175 alive = false
176 }
177
178 testHookReadLoopBeforeNextRead()
179 }
180}
181
182// src/net/http/transport.go#L2406
183func (pc *persistConn) writeLoop() {
184 defer close(pc.writeLoopDone)
185 for {
186 select {
187 case wr := <-pc.writech:
188 startBytesWritten := pc.nwrite
189 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
190 if bre, ok := err.(requestBodyReadError); ok {
191 err = bre.error
192 // Errors reading from the user's
193 // Request.Body are high priority.
194 // Set it here before sending on the
195 // channels below or calling
196 // pc.close() which tears down
197 // connections and causes other
198 // errors.
199 wr.req.setError(err)
200 }
201 if err == nil {
202 err = pc.bw.Flush()
203 }
204 if err != nil {
205 if pc.nwrite == startBytesWritten {
206 err = nothingWrittenError{err}
207 }
208 }
209 pc.writeErrCh <- err // to the body reader, which might recycle us
210 wr.ch <- err // to the roundTrip function
211 if err != nil {
212 pc.close(err)
213 return
214 }
215 case <-pc.closech:
216 return
217 }
218 }
219}
很多协议细节的部分,主要是把通过 channel 读到的新请求写入 tcp conn,和从 tcp conn 读到 response 后从 channel 传出去。需要注意的是在对 request 做一些修改之前,都是先 copy 了副本再做修改,因为前面讲了 RoundTripper 只能消费和关闭请求的 body,不能修改 Request。还有在读到 response body 后,对 body 进行了包装,来检查 body 是否被关闭,只有前一个 response 的 body 被关闭之后,才会从这个 conn 去读取下一个 response,所以不关闭 response body 会造成连接泄漏,一直是 alive 等待关闭的,下一个请求找不到 idle conn,又新创建连接,导致连接越来越多。
bodyEOFSignal
1// src/net/http/transport.go#L2758
2// bodyEOFSignal is used by the HTTP/1 transport when reading response
3// bodies to make sure we see the end of a response body before
4// proceeding and reading on the connection again.
5//
6// It wraps a ReadCloser but runs fn (if non-nil) at most
7// once, right before its final (error-producing) Read or Close call
8// returns. fn should return the new error to return from Read or Close.
9//
10// If earlyCloseFn is non-nil and Close is called before io.EOF is
11// seen, earlyCloseFn is called instead of fn, and its return value is
12// the return value from Close.
13type bodyEOFSignal struct {
14 body io.ReadCloser
15 mu sync.Mutex // guards following 4 fields
16 closed bool // whether Close has been called
17 rerr error // sticky Read error
18 fn func(error) error // err will be nil on Read io.EOF
19 earlyCloseFn func() error // optional alt Close func used if io.EOF not seen
20}
21
22var errReadOnClosedResBody = errors.New("http: read on closed response body")
23
24func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
25 es.mu.Lock()
26 closed, rerr := es.closed, es.rerr
27 es.mu.Unlock()
28 if closed {
29 return 0, errReadOnClosedResBody
30 }
31 if rerr != nil {
32 return 0, rerr
33 }
34
35 n, err = es.body.Read(p)
36 if err != nil {
37 es.mu.Lock()
38 defer es.mu.Unlock()
39 if es.rerr == nil {
40 es.rerr = err
41 }
42 err = es.condfn(err)
43 }
44 return
45}
46
47func (es *bodyEOFSignal) Close() error {
48 es.mu.Lock()
49 defer es.mu.Unlock()
50 if es.closed {
51 return nil
52 }
53 es.closed = true
54 if es.earlyCloseFn != nil && es.rerr != io.EOF {
55 return es.earlyCloseFn()
56 }
57 err := es.body.Close()
58 return es.condfn(err)
59}
persistConn.roundTrip
1// src/net/http/transport.go#L2551
2func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
3 testHookEnterRoundTrip()
4 if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) {
5 pc.t.putOrCloseIdleConn(pc)
6 return nil, errRequestCanceled
7 }
8 pc.mu.Lock()
9 pc.numExpectedResponses++
10 headerFn := pc.mutateHeaderFunc
11 pc.mu.Unlock()
12
13 if headerFn != nil {
14 headerFn(req.extraHeaders())
15 }
16
17 // Ask for a compressed version if the caller didn't set their
18 // own value for Accept-Encoding. We only attempt to
19 // uncompress the gzip stream if we were the layer that
20 // requested it.
21 requestedGzip := false
22 if !pc.t.DisableCompression &&
23 req.Header.Get("Accept-Encoding") == "" &&
24 req.Header.Get("Range") == "" &&
25 req.Method != "HEAD" {
26 // Request gzip only, not deflate. Deflate is ambiguous and
27 // not as universally supported anyway.
28 // See: https://zlib.net/zlib_faq.html#faq39
29 //
30 // Note that we don't request this for HEAD requests,
31 // due to a bug in nginx:
32 // https://trac.nginx.org/nginx/ticket/358
33 // https://golang.org/issue/5522
34 //
35 // We don't request gzip if the request is for a range, since
36 // auto-decoding a portion of a gzipped document will just fail
37 // anyway. See https://golang.org/issue/8923
38 requestedGzip = true
39 req.extraHeaders().Set("Accept-Encoding", "gzip")
40 }
41
42 var continueCh chan struct{}
43 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
44 continueCh = make(chan struct{}, 1)
45 }
46
47 if pc.t.DisableKeepAlives &&
48 !req.wantsClose() &&
49 !isProtocolSwitchHeader(req.Header) {
50 req.extraHeaders().Set("Connection", "close")
51 }
52
53 gone := make(chan struct{})
54 defer close(gone)
55
56 defer func() {
57 if err != nil {
58 pc.t.setReqCanceler(req.cancelKey, nil)
59 }
60 }()
61
62 const debugRoundTrip = false
63
64 // Write the request concurrently with waiting for a response,
65 // in case the server decides to reply before reading our full
66 // request body.
67 startBytesWritten := pc.nwrite
68 writeErrCh := make(chan error, 1)
69 pc.writech <- writeRequest{req, writeErrCh, continueCh}
70
71 resc := make(chan responseAndError)
72 pc.reqch <- requestAndChan{
73 req: req.Request,
74 cancelKey: req.cancelKey,
75 ch: resc,
76 addedGzip: requestedGzip,
77 continueCh: continueCh,
78 callerGone: gone,
79 }
80
81 var respHeaderTimer <-chan time.Time
82 cancelChan := req.Request.Cancel
83 ctxDoneChan := req.Context().Done()
84 pcClosed := pc.closech
85 canceled := false
86 for {
87 testHookWaitResLoop()
88 select {
89 case err := <-writeErrCh:
90 if debugRoundTrip {
91 req.logf("writeErrCh resv: %T/%#v", err, err)
92 }
93 if err != nil {
94 pc.close(fmt.Errorf("write error: %w", err))
95 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
96 }
97 if d := pc.t.ResponseHeaderTimeout; d > 0 {
98 if debugRoundTrip {
99 req.logf("starting timer for %v", d)
100 }
101 timer := time.NewTimer(d)
102 defer timer.Stop() // prevent leaks
103 respHeaderTimer = timer.C
104 }
105 case <-pcClosed:
106 pcClosed = nil
107 if canceled || pc.t.replaceReqCanceler(req.cancelKey, nil) {
108 if debugRoundTrip {
109 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
110 }
111 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
112 }
113 case <-respHeaderTimer:
114 if debugRoundTrip {
115 req.logf("timeout waiting for response headers.")
116 }
117 pc.close(errTimeout)
118 return nil, errTimeout
119 case re := <-resc:
120 if (re.res == nil) == (re.err == nil) {
121 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
122 }
123 if debugRoundTrip {
124 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
125 }
126 if re.err != nil {
127 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
128 }
129 return re.res, nil
130 case <-cancelChan:
131 canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
132 cancelChan = nil
133 case <-ctxDoneChan:
134 canceled = pc.t.cancelRequest(req.cancelKey, req.Context().Err())
135 cancelChan = nil
136 ctxDoneChan = nil
137 }
138 }
139}
前面已经实现了连接的服用和 req resp 的收发,这里就只要简单处理,发送请求和等待结果就好了,还有全链路的超时控制。