0%

okhttp详解5

之前一张分析了http1.1协议下的流程,接下来分析http2.0的数据交互流程。

一、重要的类简述

1.FramedTransport包装版的数据请求通道,包含httpConnection成员变量,所有的数据操作都是通过FramedConnection操作。

2.FramedConnection数据操作,包含FrameWriter和FrameReader,FramedConnection操作数据通过FrameWriter和FrameReader处理,而FrameWriter和FrameReader则是通过socket的source和sink处理数据。

3.FramedStream。http2.0支持多路复用技术,所以FramedStream是保存每个请求数据交互数据。

二、流程分析

1.FramedTransport.java

1
2
3
4
5
6
7
8
9
10
11
12
@Override public void writeRequestHeaders(Request request) throws IOException {
if (stream != null) return;

httpEngine.writingRequestHeaders();
boolean permitsRequestBody = httpEngine.permitsRequestBody(request);
List<Header> requestHeaders = framedConnection.getProtocol() == Protocol.HTTP_2
? http2HeadersList(request)
: spdy3HeadersList(request);
boolean hasResponseBody = true;
stream = framedConnection.newStream(requestHeaders, permitsRequestBody, hasResponseBody);
stream.readTimeout().timeout(httpEngine.client.getReadTimeout(), TimeUnit.MILLISECONDS);
}

(1)首先生成请求头。
(2)生成stream。

2.FramedConnection.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private FramedStream newStream(int associatedStreamId, List<Header> requestHeaders, boolean out,
boolean in) throws IOException {
boolean outFinished = !out;
boolean inFinished = !in;
FramedStream stream;
int streamId;

synchronized (frameWriter) {
synchronized (this) {
if (shutdown) {
throw new IOException("shutdown");
}
streamId = nextStreamId;
nextStreamId += 2;
stream = new FramedStream(streamId, this, outFinished, inFinished, requestHeaders);
if (stream.isOpen()) {
streams.put(streamId, stream);
setIdle(false);
}
}
if (associatedStreamId == 0) {
frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId,
requestHeaders);
} else if (client) {
throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
} else { // HTTP/2 has a PUSH_PROMISE frame.
frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders);
}
}

if (!out) {
frameWriter.flush();
}

return stream;
}

(1)创建stream。streamId是stream的唯一标识,每个请求对应一个streamId,传输数据的时候需要传递这个参数。
(2)保存到streams中,并将连接状态设置为非限制。
(3)发送请求头数据。

3.Http2.Writer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
   @Override public synchronized void synStream(boolean outFinished, boolean inFinished,
int streamId, int associatedStreamId, List<Header> headerBlock)
throws IOException {
if (inFinished) throw new UnsupportedOperationException();
if (closed) throw new IOException("closed");
headers(outFinished, streamId, headerBlock);
}

void headers(boolean outFinished, int streamId, List<Header> headerBlock) throws IOException {
if (closed) throw new IOException("closed");
hpackWriter.writeHeaders(headerBlock);

long byteCount = hpackBuffer.size();
int length = (int) Math.min(maxFrameSize, byteCount);
byte type = TYPE_HEADERS;
byte flags = byteCount == length ? FLAG_END_HEADERS : 0;
if (outFinished) flags |= FLAG_END_STREAM;
frameHeader(streamId, length, type, flags);
sink.write(hpackBuffer, length);

if (byteCount > length) writeContinuationFrames(streamId, byteCount - length);
}

void frameHeader(int streamId, int length, byte type, byte flags) throws IOException {
if (logger.isLoggable(FINE)) logger.fine(formatHeader(false, streamId, length, type, flags));
if (length > maxFrameSize) {
throw illegalArgument("FRAME_SIZE_ERROR length > %d: %d", maxFrameSize, length);
}
if ((streamId & 0x80000000) != 0) throw illegalArgument("reserved bit set: %s", streamId);
writeMedium(sink, length);
sink.writeByte(type & 0xff);
sink.writeByte(flags & 0xff);
sink.writeInt(streamId & 0x7fffffff);
}
}

(1)首先将请求头写入hpackBuffer中。hpackWriter其实就是将请求头数据压缩处理存放到hpackBuffer中。
(2)frameHeader发送请求数据基本参数。
(3)发送请求头数据。

  1. FramedTransport.java

    1
    2
    3
    @Override public Sink createRequestBody(Request request, long contentLength) throws IOException {
    return stream.getSink();
    }

    获取Sink, 每个请求对应一个stream。

  2. FramedTransport.java

    1
    2
    3
    @Override public void finishRequest() throws IOException {
    stream.getSink().close();
    }

    获取Sink, 每个请求对应一个stream。

6.FramedDataSource.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override public void close() throws IOException {
assert (!Thread.holdsLock(FramedStream.this));
synchronized (FramedStream.this) {
if (closed) return;
}
if (!sink.finished) {
// Emit the remaining data, setting the END_STREAM flag on the last frame.
if (sendBuffer.size() > 0) {
while (sendBuffer.size() > 0) {
emitDataFrame(true);
}
} else {
// Send an empty frame just so we can set the END_STREAM flag.
connection.writeData(id, true, null, 0);
}
}
synchronized (FramedStream.this) {
closed = true;
}
connection.flush();
cancelStreamIfNecessary();
}

7.FramedTransport.java

1
2
3
4
5
@Override public Response.Builder readResponseHeaders() throws IOException {
return framedConnection.getProtocol() == Protocol.HTTP_2
? readHttp2HeadersList(stream.getResponseHeaders())
: readSpdy3HeadersList(stream.getResponseHeaders());
}
  1. FramedStream.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public synchronized List<Header> getResponseHeaders() throws IOException {
    readTimeout.enter();
    try {
    while (responseHeaders == null && errorCode == null) {
    waitForIo();
    }
    } finally {
    readTimeout.exitAndThrowIfTimedOut();
    }
    if (responseHeaders != null) return responseHeaders;
    throw new IOException("stream was reset: " + errorCode);
    }
    这里如果响应头还未返回,阻塞等待。接下来看接收数据。

9.Http2.Reader.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
  @Override public boolean nextFrame(Handler handler) throws IOException {
try {
source.require(9); // Frame header size
} catch (IOException e) {
return false; // This might be a normal socket close.
}

/* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | Length (24) |
---
* | Type (8) | Flags (8) |
---
* |R| Stream Identifier (31) |
* +=+=============================================================+
* | Frame Payload (0...) ...
---
*/
int length = readMedium(source);
if (length < 0 || length > INITIAL_MAX_FRAME_SIZE) {
throw ioException("FRAME_SIZE_ERROR: %s", length);
}
byte type = (byte) (source.readByte() & 0xff);
byte flags = (byte) (source.readByte() & 0xff);
int streamId = (source.readInt() & 0x7fffffff); // Ignore reserved bit.
if (logger.isLoggable(FINE)) logger.fine(formatHeader(true, streamId, length, type, flags));

switch (type) {
case TYPE_DATA:
readData(handler, length, flags, streamId);
break;

case TYPE_HEADERS:
readHeaders(handler, length, flags, streamId);
break;

case TYPE_PRIORITY:
readPriority(handler, length, flags, streamId);
break;

case TYPE_RST_STREAM:
readRstStream(handler, length, flags, streamId);
break;

case TYPE_SETTINGS:
readSettings(handler, length, flags, streamId);
break;

case TYPE_PUSH_PROMISE:
readPushPromise(handler, length, flags, streamId);
break;

case TYPE_PING:
readPing(handler, length, flags, streamId);
break;

case TYPE_GOAWAY:
readGoAway(handler, length, flags, streamId);
break;

case TYPE_WINDOW_UPDATE:
readWindowUpdate(handler, length, flags, streamId);
break;

default:
// Implementations MUST discard frames that have unknown or unsupported types.
source.skip(length);
}
return true;
}

这里根据服务器返回的类型保存数据。

10.Http2.Reader.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void readHeaders(Handler handler, int length, byte flags, int streamId)
throws IOException {
if (streamId == 0) throw ioException("PROTOCOL_ERROR: TYPE_HEADERS streamId == 0");

boolean endStream = (flags & FLAG_END_STREAM) != 0;

short padding = (flags & FLAG_PADDED) != 0 ? (short) (source.readByte() & 0xff) : 0;

if ((flags & FLAG_PRIORITY) != 0) {
readPriority(handler, streamId);
length -= 5; // account for above read.
}

length = lengthWithoutPadding(length, flags, padding);

List<Header> headerBlock = readHeaderBlock(length, padding, flags, streamId);

handler.headers(false, endStream, streamId, -1, headerBlock, HeadersMode.HTTP_20_HEADERS);
}

11.FramedConnection.Reader.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 @Override public void headers(boolean outFinished, boolean inFinished, int streamId,
int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode) {
...
FramedStream stream;
synchronized (FramedConnection.this) {
// If we're shutdown, don't bother with this stream.
if (shutdown) return;

stream = getStream(streamId);

if (stream == null) {
...
}
}

....

// Update an existing stream.
stream.receiveHeaders(headerBlock, headersMode);
if (inFinished) stream.receiveFin();
}

(1)根据streamId查找已存在的stream。
(2)将数据保存在stream中。

12.FramedStream.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void receiveHeaders(List<Header> headers, HeadersMode headersMode) {
assert (!Thread.holdsLock(FramedStream.this));
ErrorCode errorCode = null;
boolean open = true;
synchronized (this) {
if (responseHeaders == null) {
if (headersMode.failIfHeadersAbsent()) {
errorCode = ErrorCode.PROTOCOL_ERROR;
} else {
responseHeaders = headers;
open = isOpen();
notifyAll();
}
} else {
if (headersMode.failIfHeadersPresent()) {
errorCode = ErrorCode.STREAM_IN_USE;
} else {
List<Header> newHeaders = new ArrayList<>();
newHeaders.addAll(responseHeaders);
newHeaders.addAll(headers);
this.responseHeaders = newHeaders;
}
}
}
if (errorCode != null) {
closeLater(errorCode);
} else if (!open) {
connection.removeStream(id);
}
}

(1)保存headers。
(2)唤醒等待。

13.FramedTransport.java

1
2
3
@Override public ResponseBody openResponseBody(Response response) throws IOException {
return new RealResponseBody(response.headers(), Okio.buffer(stream.getSource()));
}

获取stream中的Source。下面继续分析Source中的数据获取。

14.FramedConnection.Reader.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void readData(Handler handler, int length, byte flags, int streamId)
throws IOException {
// TODO: checkState open or half-closed (local) or raise STREAM_CLOSED
boolean inFinished = (flags & FLAG_END_STREAM) != 0;
boolean gzipped = (flags & FLAG_COMPRESSED) != 0;
if (gzipped) {
throw ioException("PROTOCOL_ERROR: FLAG_COMPRESSED without SETTINGS_COMPRESS_DATA");
}

short padding = (flags & FLAG_PADDED) != 0 ? (short) (source.readByte() & 0xff) : 0;
length = lengthWithoutPadding(length, flags, padding);

handler.data(inFinished, streamId, source, length);
source.skip(padding);
}

还是通过第9步中获取数据。

15.FramedConnection.Reader.java

1
2
3
4
5
6
7
8
9
10
@Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
throws IOException {
...
FramedStream dataStream = getStream(streamId);
...
dataStream.receiveData(source, length);
if (inFinished) {
dataStream.receiveFin();
}
}

16.FramedStream.java

1
2
3
4
void receiveData(BufferedSource in, int length) throws IOException {
assert (!Thread.holdsLock(FramedStream.this));
this.source.receive(in, length);
}

17.FramedDataSource.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void receive(BufferedSource in, long byteCount) throws IOException {
assert (!Thread.holdsLock(FramedStream.this));

while (byteCount > 0) {
boolean finished;
boolean flowControlError;
synchronized (FramedStream.this) {
finished = this.finished;
flowControlError = byteCount + readBuffer.size() > maxByteCount;
}

// If the peer sends more data than we can handle, discard it and close the connection.
if (flowControlError) {
in.skip(byteCount);
closeLater(ErrorCode.FLOW_CONTROL_ERROR);
return;
}

// Discard data received after the stream is finished. It's probably a benign race.
if (finished) {
in.skip(byteCount);
return;
}

// Fill the receive buffer without holding any locks.
long read = in.read(receiveBuffer, byteCount);
if (read == -1) throw new EOFException();
byteCount -= read;

// Move the received data to the read buffer to the reader can read it.
synchronized (FramedStream.this) {
boolean wasEmpty = readBuffer.size() == 0;
readBuffer.writeAll(receiveBuffer);
if (wasEmpty) {
FramedStream.this.notifyAll();
}
}
}
}

这里是一个自定义的Source来保存数据,不像http1.1直接使用socket包装的source。

18.HttpEngine.java

1
2
3
4
public Response getResponse() {
if (userResponse == null) throw new IllegalStateException();
return userResponse;
}

之前几步已经分析,最后请求的数据会赋值给userResponse,最后获取的时候直接返回userResponse。

19.HttpEngine.java

1
2
3
4
5
6
public void releaseConnection() throws IOException {
if (transport != null && connection != null) {
transport.releaseConnectionOnIdle();
}
connection = null;
}

20.FramedTransport.java

1
2
@Override public void releaseConnectionOnIdle() {
}

因为http2.0每个连接默认复用,所以不需要释放。