之前一张分析了http1.1协议下的流程,接下来分析http2.0的数据交互流程。
一、重要的类简述 1.FramedTransport包装版的数据请求通道,包含httpConnection成员变量,所有的数据操作都是通过FramedConnection操作。
2.FramedConnection数据操作,包含FrameWriter和FrameReader,FramedConnection操作数据通过FrameWriter和FrameReader处理,而FrameWriter和FrameReader则是通过socket的source和sink处理数据。
3.FramedStream。http2.0支持多路复用技术,所以FramedStream是保存每个请求数据交互数据。
二、流程分析 1.FramedTransport.java
1 2 3 4 5 6 7 8 9 10 11 12 @Override public void writeRequestHeaders (Request request) throws IOException { if (stream != null ) return ; httpEngine.writingRequestHeaders(); boolean permitsRequestBody = httpEngine.permitsRequestBody(request); List<Header> requestHeaders = framedConnection.getProtocol() == Protocol.HTTP_2 ? http2HeadersList(request) : spdy3HeadersList(request); boolean hasResponseBody = true ; stream = framedConnection.newStream(requestHeaders, permitsRequestBody, hasResponseBody); stream.readTimeout().timeout(httpEngine.client.getReadTimeout(), TimeUnit.MILLISECONDS); }
(1)首先生成请求头。 (2)生成stream。
2.FramedConnection.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private FramedStream newStream (int associatedStreamId, List<Header> requestHeaders, boolean out, boolean in) throws IOException { boolean outFinished = !out; boolean inFinished = !in; FramedStream stream; int streamId; synchronized (frameWriter) { synchronized (this ) { if (shutdown) { throw new IOException("shutdown" ); } streamId = nextStreamId; nextStreamId += 2 ; stream = new FramedStream(streamId, this , outFinished, inFinished, requestHeaders); if (stream.isOpen()) { streams.put(streamId, stream); setIdle(false ); } } if (associatedStreamId == 0 ) { frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, requestHeaders); } else if (client) { throw new IllegalArgumentException("client streams shouldn't have associated stream IDs" ); } else { frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders); } } if (!out) { frameWriter.flush(); } return stream; }
(1)创建stream。streamId是stream的唯一标识,每个请求对应一个streamId,传输数据的时候需要传递这个参数。 (2)保存到streams中,并将连接状态设置为非限制。 (3)发送请求头数据。
3.Http2.Writer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Override public synchronized void synStream (boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock) throws IOException { if (inFinished) throw new UnsupportedOperationException(); if (closed) throw new IOException("closed" ); headers(outFinished, streamId, headerBlock); } void headers (boolean outFinished, int streamId, List<Header> headerBlock) throws IOException { if (closed) throw new IOException("closed" ); hpackWriter.writeHeaders(headerBlock); long byteCount = hpackBuffer.size(); int length = (int ) Math.min(maxFrameSize, byteCount); byte type = TYPE_HEADERS; byte flags = byteCount == length ? FLAG_END_HEADERS : 0 ; if (outFinished) flags |= FLAG_END_STREAM; frameHeader(streamId, length, type, flags); sink.write(hpackBuffer, length); if (byteCount > length) writeContinuationFrames(streamId, byteCount - length); } void frameHeader (int streamId, int length, byte type, byte flags) throws IOException { if (logger.isLoggable(FINE)) logger.fine(formatHeader(false , streamId, length, type, flags)); if (length > maxFrameSize) { throw illegalArgument("FRAME_SIZE_ERROR length > %d: %d" , maxFrameSize, length); } if ((streamId & 0x80000000 ) != 0 ) throw illegalArgument("reserved bit set: %s" , streamId); writeMedium(sink, length); sink.writeByte(type & 0xff ); sink.writeByte(flags & 0xff ); sink.writeInt(streamId & 0x7fffffff ); } }
(1)首先将请求头写入hpackBuffer中。hpackWriter其实就是将请求头数据压缩处理存放到hpackBuffer中。 (2)frameHeader发送请求数据基本参数。 (3)发送请求头数据。
FramedTransport.java
1 2 3 @Override public Sink createRequestBody (Request request, long contentLength) throws IOException { return stream.getSink(); }
获取Sink, 每个请求对应一个stream。
FramedTransport.java
1 2 3 @Override public void finishRequest () throws IOException { stream.getSink().close(); }
获取Sink, 每个请求对应一个stream。
6.FramedDataSource.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public void close () throws IOException { assert (!Thread.holdsLock(FramedStream.this )); synchronized (FramedStream.this ) { if (closed) return ; } if (!sink.finished) { if (sendBuffer.size() > 0 ) { while (sendBuffer.size() > 0 ) { emitDataFrame(true ); } } else { connection.writeData(id, true , null , 0 ); } } synchronized (FramedStream.this ) { closed = true ; } connection.flush(); cancelStreamIfNecessary(); }
7.FramedTransport.java
1 2 3 4 5 @Override public Response.Builder readResponseHeaders () throws IOException { return framedConnection.getProtocol() == Protocol.HTTP_2 ? readHttp2HeadersList(stream.getResponseHeaders()) : readSpdy3HeadersList(stream.getResponseHeaders()); }
FramedStream.java 1 2 3 4 5 6 7 8 9 10 11 12 public synchronized List<Header> getResponseHeaders () throws IOException { readTimeout.enter(); try { while (responseHeaders == null && errorCode == null ) { waitForIo(); } } finally { readTimeout.exitAndThrowIfTimedOut(); } if (responseHeaders != null ) return responseHeaders; throw new IOException("stream was reset: " + errorCode); }
这里如果响应头还未返回,阻塞等待。接下来看接收数据。
9.Http2.Reader.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 @Override public boolean nextFrame (Handler handler) throws IOException { try { source.require(9 ); } catch (IOException e) { return false ; } int length = readMedium(source); if (length < 0 || length > INITIAL_MAX_FRAME_SIZE) { throw ioException("FRAME_SIZE_ERROR: %s" , length); } byte type = (byte ) (source.readByte() & 0xff ); byte flags = (byte ) (source.readByte() & 0xff ); int streamId = (source.readInt() & 0x7fffffff ); if (logger.isLoggable(FINE)) logger.fine(formatHeader(true , streamId, length, type, flags)); switch (type) { case TYPE_DATA: readData(handler, length, flags, streamId); break ; case TYPE_HEADERS: readHeaders(handler, length, flags, streamId); break ; case TYPE_PRIORITY: readPriority(handler, length, flags, streamId); break ; case TYPE_RST_STREAM: readRstStream(handler, length, flags, streamId); break ; case TYPE_SETTINGS: readSettings(handler, length, flags, streamId); break ; case TYPE_PUSH_PROMISE: readPushPromise(handler, length, flags, streamId); break ; case TYPE_PING: readPing(handler, length, flags, streamId); break ; case TYPE_GOAWAY: readGoAway(handler, length, flags, streamId); break ; case TYPE_WINDOW_UPDATE: readWindowUpdate(handler, length, flags, streamId); break ; default : source.skip(length); } return true ; }
这里根据服务器返回的类型保存数据。
10.Http2.Reader.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private void readHeaders (Handler handler, int length, byte flags, int streamId) throws IOException { if (streamId == 0 ) throw ioException("PROTOCOL_ERROR: TYPE_HEADERS streamId == 0" ); boolean endStream = (flags & FLAG_END_STREAM) != 0 ; short padding = (flags & FLAG_PADDED) != 0 ? (short ) (source.readByte() & 0xff ) : 0 ; if ((flags & FLAG_PRIORITY) != 0 ) { readPriority(handler, streamId); length -= 5 ; } length = lengthWithoutPadding(length, flags, padding); List<Header> headerBlock = readHeaderBlock(length, padding, flags, streamId); handler.headers(false , endStream, streamId, -1 , headerBlock, HeadersMode.HTTP_20_HEADERS); }
11.FramedConnection.Reader.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public void headers (boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode) {... FramedStream stream; synchronized (FramedConnection.this ) { if (shutdown) return ; stream = getStream(streamId); if (stream == null ) { ... } } .... stream.receiveHeaders(headerBlock, headersMode); if (inFinished) stream.receiveFin(); }
(1)根据streamId查找已存在的stream。 (2)将数据保存在stream中。
12.FramedStream.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void receiveHeaders (List<Header> headers, HeadersMode headersMode) { assert (!Thread.holdsLock(FramedStream.this )); ErrorCode errorCode = null ; boolean open = true ; synchronized (this ) { if (responseHeaders == null ) { if (headersMode.failIfHeadersAbsent()) { errorCode = ErrorCode.PROTOCOL_ERROR; } else { responseHeaders = headers; open = isOpen(); notifyAll(); } } else { if (headersMode.failIfHeadersPresent()) { errorCode = ErrorCode.STREAM_IN_USE; } else { List<Header> newHeaders = new ArrayList<>(); newHeaders.addAll(responseHeaders); newHeaders.addAll(headers); this .responseHeaders = newHeaders; } } } if (errorCode != null ) { closeLater(errorCode); } else if (!open) { connection.removeStream(id); } }
(1)保存headers。 (2)唤醒等待。
13.FramedTransport.java
1 2 3 @Override public ResponseBody openResponseBody (Response response) throws IOException { return new RealResponseBody(response.headers(), Okio.buffer(stream.getSource())); }
获取stream中的Source。下面继续分析Source中的数据获取。
14.FramedConnection.Reader.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void readData (Handler handler, int length, byte flags, int streamId) throws IOException { boolean inFinished = (flags & FLAG_END_STREAM) != 0 ; boolean gzipped = (flags & FLAG_COMPRESSED) != 0 ; if (gzipped) { throw ioException("PROTOCOL_ERROR: FLAG_COMPRESSED without SETTINGS_COMPRESS_DATA" ); } short padding = (flags & FLAG_PADDED) != 0 ? (short ) (source.readByte() & 0xff ) : 0 ; length = lengthWithoutPadding(length, flags, padding); handler.data(inFinished, streamId, source, length); source.skip(padding); }
还是通过第9步中获取数据。
15.FramedConnection.Reader.java
1 2 3 4 5 6 7 8 9 10 @Override public void data (boolean inFinished, int streamId, BufferedSource source, int length) throws IOException { ... FramedStream dataStream = getStream(streamId); ... dataStream.receiveData(source, length); if (inFinished) { dataStream.receiveFin(); } }
16.FramedStream.java
1 2 3 4 void receiveData (BufferedSource in, int length) throws IOException { assert (!Thread.holdsLock(FramedStream.this )); this .source.receive(in, length); }
17.FramedDataSource.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 void receive (BufferedSource in, long byteCount) throws IOException { assert (!Thread.holdsLock(FramedStream.this )); while (byteCount > 0 ) { boolean finished; boolean flowControlError; synchronized (FramedStream.this ) { finished = this .finished; flowControlError = byteCount + readBuffer.size() > maxByteCount; } if (flowControlError) { in.skip(byteCount); closeLater(ErrorCode.FLOW_CONTROL_ERROR); return ; } if (finished) { in.skip(byteCount); return ; } long read = in.read(receiveBuffer, byteCount); if (read == -1 ) throw new EOFException(); byteCount -= read; synchronized (FramedStream.this ) { boolean wasEmpty = readBuffer.size() == 0 ; readBuffer.writeAll(receiveBuffer); if (wasEmpty) { FramedStream.this .notifyAll(); } } } }
这里是一个自定义的Source来保存数据,不像http1.1直接使用socket包装的source。
18.HttpEngine.java
1 2 3 4 public Response getResponse () { if (userResponse == null ) throw new IllegalStateException(); return userResponse; }
之前几步已经分析,最后请求的数据会赋值给userResponse,最后获取的时候直接返回userResponse。
19.HttpEngine.java
1 2 3 4 5 6 public void releaseConnection () throws IOException { if (transport != null && connection != null ) { transport.releaseConnectionOnIdle(); } connection = null ; }
20.FramedTransport.java
1 2 @Override public void releaseConnectionOnIdle () {}
因为http2.0每个连接默认复用,所以不需要释放。