<strike id="ddzbl"><span id="ddzbl"><em id="ddzbl"></em></span></strike>
    <font id="ddzbl"><sub id="ddzbl"><mark id="ddzbl"></mark></sub></font>

      <address id="ddzbl"></address>
      <listing id="ddzbl"></listing>

        <track id="ddzbl"><span id="ddzbl"><progress id="ddzbl"></progress></span></track>

        首頁技術文章正文

        Java培訓:dubbo源碼解析-網絡通信

        更新時間:2022-09-16 來源:黑馬程序員 瀏覽量:

          在之前的內容中,我們講解了消費者端服務發現與提供者端服務暴露的相關內容,同時也知道消費者端通過內置的負載均衡算法獲取合適的調用invoker進行遠程調用。那么,本章節重點關注的就是遠程調用過程即網絡通信。

          

        1663308753668_1.jpg

          網絡通信位于Remoting模塊:

          - Remoting 實現是 Dubbo 協議的實現,如果你選擇 RMI 協議,整個 Remoting 都不會用上;

          - Remoting 內部再劃為 `Transport 傳輸層` 和 `Exchange 信息交換層`;

          - Transport 層只負責單向消息傳輸,是對 Mina, Netty, Grizzly 的抽象,它也可以擴展 UDP 傳輸;

          - Exchange 層是在傳輸層之上封裝了 Request-Response 語義;

          網絡通信的問題:

          客戶端與服務端連通性問題

          粘包拆包問題

          異步多線程數據一致問題

          通信協議

          dubbo內置,dubbo協議 ,rmi協議,hessian協議,http協議,webservice協議,thrift協議,rest協議,grpc協議,memcached協議,redis協議等10種通訊協議。各個協議特點如下

          dubbo協議

          Dubbo 缺省協議采用單一長連接和 NIO 異步通訊,適合于小數據量大并發的服務調用,以及服務消費者機器數遠大于服務提供者機器數的情況。

          缺省協議,使用基于 mina `1.1.7` 和 hessian `3.2.1` 的 tbremoting 交互。

          - 連接個數:單連接

          - 連接方式:長連接

          - 傳輸協議:TCP

          - 傳輸方式:NIO 異步傳輸

          - 序列化:Hessian 二進制序列化

          - 適用范圍:傳入傳出參數數據包較小(建議小于100K),消費者比提供者個數多,單一消費者無法壓滿提供者,盡量不要用 dubbo 協議傳輸大文件或超大字符串。

          - 適用場景:常規遠程服務方法調用

          rmi協議

          RMI 協議采用 JDK 標準的 `java.rmi.*` 實現,采用阻塞式短連接和 JDK 標準序列化方式。

          - 連接個數:多連接

          - 連接方式:短連接

          - 傳輸協議:TCP

          - 傳輸方式:同步傳輸

          - 序列化:Java 標準二進制序列化

          - 適用范圍:傳入傳出參數數據包大小混合,消費者與提供者個數差不多,可傳文件。

          - 適用場景:常規遠程服務方法調用,與原生RMI服務互操作

          hessian協議

          Hessian 協議用于集成 Hessian 的服務,Hessian 底層采用 Http 通訊,采用 Servlet 暴露服務,Dubbo 缺省內嵌 Jetty 作為服務器實現。

          Dubbo 的 Hessian 協議可以和原生 Hessian 服務互操作,即:

          - 提供者用 Dubbo 的 Hessian 協議暴露服務,消費者直接用標準 Hessian 接口調用

          - 或者提供方用標準 Hessian 暴露服務,消費方用 Dubbo 的 Hessian 協議調用。

          - 連接個數:多連接

          - 連接方式:短連接

          - 傳輸協議:HTTP

          - 傳輸方式:同步傳輸

          - 序列化:Hessian二進制序列化

          - 適用范圍:傳入傳出參數數據包較大,提供者比消費者個數多,提供者壓力較大,可傳文件。

          - 適用場景:頁面傳輸,文件傳輸,或與原生hessian服務互操作

          http協議

          基于 HTTP 表單的遠程調用協議,采用 Spring 的 HttpInvoker 實現

          - 連接個數:多連接

          - 連接方式:短連接

          - 傳輸協議:HTTP

          - 傳輸方式:同步傳輸

          - 序列化:表單序列化

          - 適用范圍:傳入傳出參數數據包大小混合,提供者比消費者個數多,可用瀏覽器查看,可用表單或URL傳入參數,暫不支持傳文件。

          - 適用場景:需同時給應用程序和瀏覽器 JS 使用的服務。

          webservice協議

          基于 WebService 的遠程調用協議,基于 Apache CXF 實現](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/webservice.html#fn2)。

          可以和原生 WebService 服務互操作,即:

          - 提供者用 Dubbo 的 WebService 協議暴露服務,消費者直接用標準 WebService 接口調用,

          - 或者提供方用標準 WebService 暴露服務,消費方用 Dubbo 的 WebService 協議調用。

          - 連接個數:多連接

          - 連接方式:短連接

          - 傳輸協議:HTTP

          - 傳輸方式:同步傳輸

          - 序列化:SOAP 文本序列化(http + xml)

          - 適用場景:系統集成,跨語言調用

          thrift協議

          當前 dubbo 支持 [[1\]](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/thrift.html#fn1)的 thrift 協議是對 thrift 原生協議 [[2\]](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/thrift.html#fn2) 的擴展,在原生協議的基礎上添加了一些額外的頭信息,比如 service name,magic number 等。

          rest協議

          基于標準的Java REST API——JAX-RS 2.0(Java API for RESTful Web Services的簡寫)實現的REST調用支持

          grpc協議

          Dubbo 自 2.7.5 版本開始支持 gRPC 協議,對于計劃使用 HTTP/2 通信,或者想利用 gRPC 帶來的 Stream、反壓、Reactive 編程等能力的開發者來說, 都可以考慮啟用 gRPC 協議。

          - 為期望使用 gRPC 協議的用戶帶來服務治理能力,方便接入 Dubbo 體系

          - 用戶可以使用 Dubbo 風格的,基于接口的編程風格來定義和使用遠程服務

          memcached協議

          基于 memcached實現的 RPC 協議

          redis協議

          基于 Redis 實現的 RPC 協議

          序列化

          序列化就是將對象轉成字節流,用于網絡傳輸,以及將字節流轉為對象,用于在收到字節流數據后還原成對象。序列化的優勢有很多,例如安全性更好、可跨平臺等。我們知道dubbo基于netty進行網絡通訊,在`NettyClient.doOpen()`方法中可以看到Netty的相關類

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });

          然后去看NettyCodecAdapter 類最后進入ExchangeCodec類的encodeRequest方法,如下:

        protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
                Serialization serialization = getSerialization(channel);
                // header.
                byte[] header = new byte[HEADER_LENGTH];

          是的,就是Serialization接口,默認是Hessian2Serialization序列化接口。

        1663309013130_2.jpg

          Dubbo序列化支持java、compactedjava、nativejava、fastjson、dubbo、fst、hessian2、kryo,protostuff其中默認hessian2。其中java、compactedjava、nativejava屬于原生java的序列化。

          - dubbo序列化:阿里尚未開發成熟的高效java序列化實現,阿里不建議在生產環境使用它。

          - **hessian2序列化:hessian是一種跨語言的高效二進制序列化方式。但這里實際不是原生的hessian2序列化,而是阿里修改過的,它是dubbo RPC默認啟用的序列化方式。**

          - json序列化:目前有兩種實現,一種是采用的阿里的fastjson庫,另一種是采用dubbo中自己實現的簡單json庫,但其實現都不是特別成熟,而且json這種文本序列化性能一般不如上面兩種二進制序列化。

          - java序列化:主要是采用JDK自帶的Java序列化實現,性能很不理想。

          最近幾年,各種新的高效序列化方式層出不窮,不斷刷新序列化性能的上限,最典型的包括:

          - 專門針對Java語言的:Kryo,FST等等

          - 跨語言的:Protostuff,ProtoBuf,Thrift,Avro,MsgPack等等

          這些序列化方式的性能多數都顯著優于 hessian2 (甚至包括尚未成熟的dubbo序列化)。所以我們可以為 dubbo 引入 Kryo 和 FST 這兩種高效 Java 來優化 dubbo 的序列化。

          使用Kryo和FST非常簡單,只需要在dubbo RPC的XML配置中添加一個屬性即可:

        <dubbo:protocol name="dubbo" serialization="kryo"/>

          網絡通信

          dubbo中數據格式

          解決socket中數據粘包拆包問題,一般有三種方式

          * 定長協議(數據包長度一致)

          * 定長的協議是指協議內容的長度是固定的,比如協議byte長度是50,當從網絡上讀取50個byte后,就進行decode解碼操作。定長協議在讀取或者寫入時,效率比較高,因為數據緩存的大小基本都確定了,就好比數組一樣,缺陷就是適應性不足,以RPC場景為例,很難估計出定長的長度是多少。

          * 特殊結束符(數據尾:通過特殊的字符標識#)

          * 相比定長協議,如果能夠定義一個特殊字符作為每個協議單元結束的標示,就能夠以變長的方式進行通信,從而在數據傳輸和高效之間取得平衡,比如用特殊字符`\n`。特殊結束符方式的問題是過于簡單的思考了協議傳輸的過程,對于一個協議單元必須要全部讀入才能夠進行處理,除此之外必須要防止用戶傳輸的數據不能同結束符相同,否則就會出現紊亂。

          * 變長協議(協議頭+payload模式)

          * 這種一般是自定義協議,會以定長加不定長的部分組成,其中定長的部分需要描述不定長的內容長度。

          * dubbo就是使用這種形式的數據傳輸格式

          Dubbo 框架定義了私有的RPC協議,其中請求和響應協議的具體內容我們使用表格來展示。

        1663309170930_3.jpg

          Dubbo 數據包分為消息頭和消息體,消息頭用于存儲一些元信息,比如魔數(Magic),數據包類型(Request/Response),消息體長度(Data Length)等。消息體中用于存儲具體的調用消息,比如方法名稱,參數列表等。下面簡單列舉一下消息頭的內容。

          | 偏移量(Bit) | 字段 | 取值 |

          | ----------- | ------------ | ------------------------------------------------------------ |

          | 0 ~ 7 | 魔數高位 | 0xda00 |

          | 8 ~ 15 | 魔數低位 | 0xbb |

          | 16 | 數據包類型 | 0 - Response, 1 - Request |

          | 17 | 調用方式 | 僅在第16位被設為1的情況下有效,0 - 單向調用,1 - 雙向調用 |

          | 18 | 事件標識 | 0 - 當前數據包是請求或響應包,1 - 當前數據包是心跳包 |

          | 19 ~ 23 | 序列化器編號 | 2 - Hessian2Serialization

          3 - JavaSerialization

          4 - CompactedJavaSerialization

          6 - FastJsonSerialization

          7 - NativeJavaSerialization

          8 - KryoSerialization

          9 - FstSerialization |

          | 24 ~ 31 | 狀態 | 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE ...... |

          | 32 ~ 95 | 請求編號 | 共8字節,運行時生成 |

          | 96 ~ 127 | 消息體長度 | 運行時計算

          消費方發送請求

          (1)發送請求

          為了便于大家閱讀代碼,這里以 DemoService 為例,將 sayHello 方法的整個調用路徑貼出來。

        proxy0#sayHello(String)
          —> InvokerInvocationHandler#invoke(Object, Method, Object[])
            —> MockClusterInvoker#invoke(Invocation)
              —> AbstractClusterInvoker#invoke(Invocation)
                —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
                  —> Filter#invoke(Invoker, Invocation)  // 包含多個 Filter 調用
                    —> ListenerInvokerWrapper#invoke(Invocation)
                      —> AbstractInvoker#invoke(Invocation)
                        —> DubboInvoker#doInvoke(Invocation)
                          —> ReferenceCountExchangeClient#request(Object, int)
                            —> HeaderExchangeClient#request(Object, int)
                              —> HeaderExchangeChannel#request(Object, int)
                                —> AbstractPeer#send(Object)
                                  —> AbstractClient#send(Object, boolean)
                                    —> NettyChannel#send(Object, boolean)
                                      —> NioClientSocketChannel#write(Object)

          dubbo消費方,自動生成代碼對象如下

        public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
        
            private InvocationHandler handler;
        
            public String sayHello(String string) {
                // 將參數存儲到 Object 數組中
                Object[] arrobject = new Object[]{string};
                // 調用 InvocationHandler 實現類的 invoke 方法得到調用結果
                Object object = this.handler.invoke(this, methods[0], arrobject);
                // 返回調用結果
                return (String)object;
            }
        }

          InvokerInvocationHandler 中的 invoker 成員變量類型為 MockClusterInvoker,MockClusterInvoker 內部封裝了服務降級邏輯。下面簡單看一下:

        public Result invoke(Invocation invocation) throws RpcException {
                Result result = null;
                // 獲取 mock 配置值
                String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
                if (value.length() == 0 || value.equalsIgnoreCase("false")) {
                     // 無 mock 邏輯,直接調用其他 Invoker 對象的 invoke 方法,
                    // 比如 FailoverClusterInvoker
                    result = this.invoker.invoke(invocation);
                } else if (value.startsWith("force")) {
                    // force:xxx 直接執行 mock 邏輯,不發起遠程調用
                    result = doMockInvoke(invocation, null);
                } else {
                     // fail:xxx 表示消費方對調用服務失敗后,再執行 mock 邏輯,不拋出異常
                    try {
                        result = this.invoker.invoke(invocation);
                    } catch (RpcException e) {
                         // 調用失敗,執行 mock 邏輯
                        result = doMockInvoke(invocation, e);
                    }
                }
                return result;
            }

          考慮到前文已經詳細分析過 FailoverClusterInvoker,因此本節略過 FailoverClusterInvoker,直接分析 DubboInvoker。

        public abstract class AbstractInvoker<T> implements Invoker<T> {
           
            public Result invoke(Invocation inv) throws RpcException {
                if (destroyed.get()) {
                    throw new RpcException("Rpc invoker for service ...");
                }
                RpcInvocation invocation = (RpcInvocation) inv;
                // 設置 Invoker
                invocation.setInvoker(this);
                if (attachment != null && attachment.size() > 0) {
                    // 設置 attachment
                    invocation.addAttachmentsIfAbsent(attachment);
                }
                Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
                if (contextAttachments != null && contextAttachments.size() != 0) {
                    // 添加 contextAttachments 到 RpcInvocation#attachment 變量中
                    invocation.addAttachments(contextAttachments);
                }
                if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
                    // 設置異步信息到 RpcInvocation#attachment 中
                    invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
                }
                RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        
                try {
                    // 抽象方法,由子類實現
                    return doInvoke(invocation);
                } catch (InvocationTargetException e) {
                    // ...
                } catch (RpcException e) {
                    // ...
                } catch (Throwable e) {
                    return new RpcResult(e);
                }
            }
        
            protected abstract Result doInvoke(Invocation invocation) throws Throwable;
           
            // 省略其他方法
        }

          上面的代碼來自 AbstractInvoker 類,其中大部分代碼用于添加信息到 RpcInvocation#attachment 變量中,添加完畢后,調用 doInvoke 執行后續的調用。doInvoke 是一個抽象方法,需要由子類實現,下面到 DubboInvoker 中看一下。

        @Override
            protected Result doInvoke(final Invocation invocation) throws Throwable {
                RpcInvocation inv = (RpcInvocation) invocation;
                final String methodName = RpcUtils.getMethodName(invocation);
                //將目標方法以及版本好作為參數放入到Invocation中
                inv.setAttachment(PATH_KEY, getUrl().getPath());
                inv.setAttachment(VERSION_KEY, version);
        
                //獲得客戶端連接
                ExchangeClient currentClient; //初始化invoker的時候,構建的一個遠程通信連接
                if (clients.length == 1) { //默認
                    currentClient = clients[0];
                } else {
                    //通過取模獲得其中一個連接
                    currentClient = clients[index.getAndIncrement() % clients.length];
                }
                try {
                    //表示當前的方法是否存在返回值
                    boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                    int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
                    //isOneway 為 true,表示“單向”通信
                    if (isOneway) {//異步無返回值
                        boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                        currentClient.send(inv, isSent);
                        RpcContext.getContext().setFuture(null);
                        return AsyncRpcResult.newDefaultAsyncResult(invocation);
                    } else { //存在返回值
                        //是否采用異步
                        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                        CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                        responseFuture.whenComplete((obj, t) -> {
                            if (t != null) {
                                asyncRpcResult.completeExceptionally(t);
                            } else {
                                asyncRpcResult.complete((AppResponse) obj);
                            }
                        });
                        RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
                        return asyncRpcResult;
                    }
                }
                //省略無關代碼
            }

          最終進入到HeaderExchangeChannel#request方法,拼裝Request并將請求發送出去

        public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
                if (closed) {
                    throw new RemotingException(this.getLocalAddress(), null, "Failed tosend request " + request + ", cause: The channel " + this + " is closed!");
                }
                // 創建請求對象
                Request req = new Request();
                req.setVersion(Version.getProtocolVersion());
                req.setTwoWay(true);
                req.setData(request);
                DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
                try {
                    //NettyClient
                    channel.send(req);
                } catch (RemotingException e) {
                    future.cancel();
                    throw e;
                }
                return future;
            }

          (2)請求編碼

          在netty啟動時,我們設置了編解碼器,其中通過ExchangeCodec完成編解碼工作如下:

        public class ExchangeCodec extends TelnetCodec {
        
            // 消息頭長度
            protected static final int HEADER_LENGTH = 16;
            // 魔數內容
            protected static final short MAGIC = (short) 0xdabb;
            protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
            protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
            protected static final byte FLAG_REQUEST = (byte) 0x80;
            protected static final byte FLAG_TWOWAY = (byte) 0x40;
            protected static final byte FLAG_EVENT = (byte) 0x20;
            protected static final int SERIALIZATION_MASK = 0x1f;
            private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);
        
            public Short getMagicCode() {
                return MAGIC;
            }
        
            @Override
            public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
                if (msg instanceof Request) {
                    // 對 Request 對象進行編碼
                    encodeRequest(channel, buffer, (Request) msg);
                } else if (msg instanceof Response) {
                    // 對 Response 對象進行編碼,后面分析
                    encodeResponse(channel, buffer, (Response) msg);
                } else {
                    super.encode(channel, buffer, msg);
                }
            }
        
            protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
                Serialization serialization = getSerialization(channel);
        
                // 創建消息頭字節數組,長度為 16
                byte[] header = new byte[HEADER_LENGTH];
        
                // 設置魔數
                Bytes.short2bytes(MAGIC, header);
        
                // 設置數據包類型(Request/Response)和序列化器編號
                header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
        
                // 設置通信方式(單向/雙向)
                if (req.isTwoWay()) {
                    header[2] |= FLAG_TWOWAY;
                }
               
                // 設置事件標識
                if (req.isEvent()) {
                    header[2] |= FLAG_EVENT;
                }
        
                // 設置請求編號,8個字節,從第4個字節開始設置
                Bytes.long2bytes(req.getId(), header, 4);
        
                // 獲取 buffer 當前的寫位置
                int savedWriteIndex = buffer.writerIndex();
                // 更新 writerIndex,為消息頭預留 16 個字節的空間
                buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
                ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
                // 創建序列化器,比如 Hessian2ObjectOutput
                ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
                if (req.isEvent()) {
                    // 對事件數據進行序列化操作
                    encodeEventData(channel, out, req.getData());
                } else {
                    // 對請求數據進行序列化操作
                    encodeRequestData(channel, out, req.getData(), req.getVersion());
                }
                out.flushBuffer();
                if (out instanceof Cleanable) {
                    ((Cleanable) out).cleanup();
                }
                bos.flush();
                bos.close();
               
                // 獲取寫入的字節數,也就是消息體長度
                int len = bos.writtenBytes();
                checkPayload(channel, len);
        
                // 將消息體長度寫入到消息頭中
                Bytes.int2bytes(len, header, 12);
        
                // 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準備
                buffer.writerIndex(savedWriteIndex);
                // 從 savedWriteIndex 下標處寫入消息頭
                buffer.writeBytes(header);
                // 設置新的 writerIndex,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度
                buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
            }
           
            // 省略其他方法
        }

          以上就是請求對象的編碼過程,該過程首先會通過位運算將消息頭寫入到 header 數組中。然后對 Request 對象的 data 字段執行序列化操作,序列化后的數據最終會存儲到 ChannelBuffer 中。序列化操作執行完后,可得到數據序列化后的長度 len,緊接著將 len 寫入到 header 指定位置處。最后再將消息頭字節數組 header 寫入到 ChannelBuffer 中,整個編碼過程就結束了。本節的最后,我們再來看一下 Request 對象的 data 字段序列化過程,也就是 encodeRequestData 方法的邏輯,如下:

        public class DubboCodec extends ExchangeCodec implements Codec2 {
           
            protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
                RpcInvocation inv = (RpcInvocation) data;
        
                // 依次序列化 dubbo version、path、version
                out.writeUTF(version);
                out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
                out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
        
                // 序列化調用方法名
                out.writeUTF(inv.getMethodName());
                // 將參數類型轉換為字符串,并進行序列化
                out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
                Object[] args = inv.getArguments();
                if (args != null)
                    for (int i = 0; i < args.length; i++) {
                        // 對運行時參數進行序列化
                        out.writeObject(encodeInvocationArgument(channel, inv, i));
                    }
               
                // 序列化 attachments
                out.writeObject(inv.getAttachments());
            }
        }

          至此,關于服務消費方發送請求的過程就分析完了,接下來我們來看一下服務提供方是如何接收請求的。

          提供方接收請求

          (1) 請求解碼

          這里直接分析請求數據的解碼邏輯,忽略中間過程,如下:

        public class ExchangeCodec extends TelnetCodec {
           
            @Override
            public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
                int readable = buffer.readableBytes();
                // 創建消息頭字節數組
                byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
                // 讀取消息頭數據
                buffer.readBytes(header);
                // 調用重載方法進行后續解碼工作
                return decode(channel, buffer, readable, header);
            }
        
            @Override
            protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
                // 檢查魔數是否相等
                if (readable > 0 && header[0] != MAGIC_HIGH
                        || readable > 1 && header[1] != MAGIC_LOW) {
                    int length = header.length;
                    if (header.length < readable) {
                        header = Bytes.copyOf(header, readable);
                        buffer.readBytes(header, length, readable - length);
                    }
                    for (int i = 1; i < header.length - 1; i++) {
                        if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                            buffer.readerIndex(buffer.readerIndex() - header.length + i);
                            header = Bytes.copyOf(header, i);
                            break;
                        }
                    }
                    // 通過 telnet 命令行發送的數據包不包含消息頭,所以這里
                    // 調用 TelnetCodec 的 decode 方法對數據包進行解碼
                    return super.decode(channel, buffer, readable, header);
                }
               
                // 檢測可讀數據量是否少于消息頭長度,若小于則立即返回 DecodeResult.NEED_MORE_INPUT
                if (readable < HEADER_LENGTH) {
                    return DecodeResult.NEED_MORE_INPUT;
                }
        
                // 從消息頭中獲取消息體長度
                int len = Bytes.bytes2int(header, 12);
                // 檢測消息體長度是否超出限制,超出則拋出異常
                checkPayload(channel, len);
        
                int tt = len + HEADER_LENGTH;
                // 檢測可讀的字節數是否小于實際的字節數
                if (readable < tt) {
                    return DecodeResult.NEED_MORE_INPUT;
                }
               
                ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
        
                try {
                    // 繼續進行解碼工作
                    return decodeBody(channel, is, header);
                } finally {
                    if (is.available() > 0) {
                        try {
                            StreamUtils.skipUnusedStream(is);
                        } catch (IOException e) {
                            logger.warn(e.getMessage(), e);
                        }
                    }
                }
            }
        }

          上面方法通過檢測消息頭中的魔數是否與規定的魔數相等,提前攔截掉非常規數據包,比如通過 telnet 命令行發出的數據包。接著再對消息體長度,以及可讀字節數進行檢測。最后調用 decodeBody 方法進行后續的解碼工作,ExchangeCodec 中實現了 decodeBody 方法,但因其子類 DubboCodec 覆寫了該方法,所以在運行時 DubboCodec 中的 decodeBody 方法會被調用。下面我們來看一下該方法的代碼。

        public class DubboCodec extends ExchangeCodec implements Codec2 {
        
            @Override
            protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
                // 獲取消息頭中的第三個字節,并通過邏輯與運算得到序列化器編號
                byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
                Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
                // 獲取調用編號
                long id = Bytes.bytes2long(header, 4);
                // 通過邏輯與運算得到調用類型,0 - Response,1 - Request
                if ((flag & FLAG_REQUEST) == 0) {
                    // 對響應結果進行解碼,得到 Response 對象。這個非本節內容,后面再分析
                    // ...
                } else {
                    // 創建 Request 對象
                    Request req = new Request(id);
                    req.setVersion(Version.getProtocolVersion());
                    // 通過邏輯與運算得到通信方式,并設置到 Request 對象中
                    req.setTwoWay((flag & FLAG_TWOWAY) != 0);
                   
                    // 通過位運算檢測數據包是否為事件類型
                    if ((flag & FLAG_EVENT) != 0) {
                        // 設置心跳事件到 Request 對象中
                        req.setEvent(Request.HEARTBEAT_EVENT);
                    }
                    try {
                        Object data;
                        if (req.isHeartbeat()) {
                            // 對心跳包進行解碼,該方法已被標注為廢棄
                            data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                        } else if (req.isEvent()) {
                            // 對事件數據進行解碼
                            data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                        } else {
                            DecodeableRpcInvocation inv;
                            // 根據 url 參數判斷是否在 IO 線程上對消息體進行解碼
                            if (channel.getUrl().getParameter(
                                    Constants.DECODE_IN_IO_THREAD_KEY,
                                    Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                                inv = new DecodeableRpcInvocation(channel, req, is, proto);
                                // 在當前線程,也就是 IO 線程上進行后續的解碼工作。此工作完成后,可將
                                // 調用方法名、attachment、以及調用參數解析出來
                                inv.decode();
                            } else {
                                // 僅創建 DecodeableRpcInvocation 對象,但不在當前線程上執行解碼邏輯
                                inv = new DecodeableRpcInvocation(channel, req,
                                        new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                            }
                            data = inv;
                        }
                       
                        // 設置 data 到 Request 對象中
                        req.setData(data);
                    } catch (Throwable t) {
                        // 若解碼過程中出現異常,則將 broken 字段設為 true,
                        // 并將異常對象設置到 Reqeust 對象中
                        req.setBroken(true);
                        req.setData(t);
                    }
                    return req;
                }
            }
        }

          如上,decodeBody 對部分字段進行了解碼,并將解碼得到的字段封裝到 Request 中。隨后會調用 DecodeableRpcInvocation 的 decode 方法進行后續的解碼工作。此工作完成后,可將調用方法名、attachment、以及調用參數解析出來。

          (2)調用服務

          解碼器將數據包解析成 Request 對象后,NettyHandler 的 messageReceived 方法緊接著會收到這個對象,并將這個對象繼續向下傳遞。整個調用棧如下:

        NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent)
          —> AbstractPeer#received(Channel, Object)
            —> MultiMessageHandler#received(Channel, Object)
              —> HeartbeatHandler#received(Channel, Object)
                —> AllChannelHandler#received(Channel, Object)
                  —> ExecutorService#execute(Runnable)    // 由線程池執行后續的調用邏輯

          這里我們直接分析調用棧中的分析第一個和最后一個調用方法邏輯。如下:

          考慮到篇幅,以及很多中間調用的邏輯并非十分重要,所以這里就不對調用棧中的每個方法都進行分析了。這里我們直接分析最后一個調用方法邏輯。如下:

        public class ChannelEventRunnable implements Runnable {
           
            private final ChannelHandler handler;
            private final Channel channel;
            private final ChannelState state;
            private final Throwable exception;
            private final Object message;
           
            @Override
            public void run() {
                // 檢測通道狀態,對于請求或響應消息,此時 state = RECEIVED
                if (state == ChannelState.RECEIVED) {
                    try {
                        // 將 channel 和 message 傳給 ChannelHandler 對象,進行后續的調用
                        handler.received(channel, message);
                    } catch (Exception e) {
                        logger.warn("... operation error, channel is ... message is ...");
                    }
                }
               
                // 其他消息類型通過 switch 進行處理
                else {
                    switch (state) {
                    case CONNECTED:
                        try {
                            handler.connected(channel);
                        } catch (Exception e) {
                            logger.warn("... operation error, channel is ...");
                        }
                        break;
                    case DISCONNECTED:
                        // ...
                    case SENT:
                        // ...
                    case CAUGHT:
                        // ...
                    default:
                        logger.warn("unknown state: " + state + ", message is " + message);
                    }
                }
        
            }
        }

          如上,請求和響應消息出現頻率明顯比其他類型消息高,所以這里對該類型的消息進行了針對性判斷。ChannelEventRunnable 僅是一個中轉站,它的 run 方法中并不包含具體的調用邏輯,僅用于將參數傳給其他 ChannelHandler 對象進行處理,該對象類型為 DecodeHandler。

        public class DecodeHandler extends AbstractChannelHandlerDelegate {
        
            public DecodeHandler(ChannelHandler handler) {
                super(handler);
            }
        
            @Override
            public void received(Channel channel, Object message) throws RemotingException {
                if (message instanceof Decodeable) {
                    // 對 Decodeable 接口實現類對象進行解碼
                    decode(message);
                }
        
                if (message instanceof Request) {
                    // 對 Request 的 data 字段進行解碼
                    decode(((Request) message).getData());
                }
        
                if (message instanceof Response) {
                    // 對 Request 的 result 字段進行解碼
                    decode(((Response) message).getResult());
                }
        
                // 執行后續邏輯
                handler.received(channel, message);
            }
        
            private void decode(Object message) {
                // Decodeable 接口目前有兩個實現類,
                // 分別為 DecodeableRpcInvocation 和 DecodeableRpcResult
                if (message != null && message instanceof Decodeable) {
                    try {
                        // 執行解碼邏輯
                        ((Decodeable) message).decode();
                    } catch (Throwable e) {
                        if (log.isWarnEnabled()) {
                            log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                        }
                    }
                }
            }
        }

          DecodeHandler 主要是包含了一些解碼邏輯,完全解碼后的 Request 對象會繼續向后傳遞

        public class DubboProtocol extends AbstractProtocol {
        
            public static final String NAME = "dubbo";
           
            private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        
                @Override
                public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
                    if (message instanceof Invocation) {
                        Invocation inv = (Invocation) message;
                        // 獲取 Invoker 實例
                        Invoker<?> invoker = getInvoker(channel, inv);
                        if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                            // 回調相關,忽略
                        }
                        RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                        // 通過 Invoker 調用具體的服務
                        return invoker.invoke(inv);
                    }
                    throw new RemotingException(channel, "Unsupported request: ...");
                }
               
                // 忽略其他方法
            }
           
            Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
                // 忽略回調和本地存根相關邏輯
                // ...
               
                int port = channel.getLocalAddress().getPort();
               
                // 計算 service key,格式為 groupName/serviceName:serviceVersion:port。比如:
                //   dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
                String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
        
                // 從 exporterMap 查找與 serviceKey 相對應的 DubboExporter 對象,
                // 服務導出過程中會將 <serviceKey, DubboExporter> 映射關系存儲到 exporterMap 集合中
                DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
        
                if (exporter == null)
                    throw new RemotingException(channel, "Not found exported service ...");
        
                // 獲取 Invoker 對象,并返回
                return exporter.getInvoker();
            }
           
            // 忽略其他方法
        }

          在之前課程中介紹過,服務全部暴露完成之后保存到exporterMap中。這里就是通過serviceKey獲取exporter之后獲取Invoker,并通過 Invoker 的 invoke 方法調用服務邏輯

        public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
        
            @Override
            public Result invoke(Invocation invocation) throws RpcException {
                try {
                    // 調用 doInvoke 執行后續的調用,并將調用結果封裝到 RpcResult 中,并
                    return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
                } catch (InvocationTargetException e) {
                    return new RpcResult(e.getTargetException());
                } catch (Throwable e) {
                    throw new RpcException("Failed to invoke remote proxy method ...");
                }
            }
           
            protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
        }

          如上,doInvoke 是一個抽象方法,這個需要由具體的 Invoker 實例實現。Invoker 實例是在運行時通過 JavassistProxyFactory 創建的,創建邏輯如下:

        public class JavassistProxyFactory extends AbstractProxyFactory {
           
            // 省略其他方法
        
            @Override
            public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
                final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
                // 創建匿名類對象
                return new AbstractProxyInvoker<T>(proxy, type, url) {
                    @Override
                    protected Object doInvoke(T proxy, String methodName,
                                              Class<?>[] parameterTypes,
                                              Object[] arguments) throws Throwable {
                        // 調用 invokeMethod 方法進行后續的調用
                        return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                    }
                };
            }
        }

          Wrapper 是一個抽象類,其中 invokeMethod 是一個抽象方法。Dubbo 會在運行時通過 Javassist 框架為 Wrapper 生成實現類,并實現 invokeMethod 方法,該方法最終會根據調用信息調用具體的服務。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。

        /** Wrapper0 是在運行時生成的,大家可使用 Arthas 進行反編譯 */
        public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
            public static String[] pns;
            public static Map pts;
            public static String[] mns;
            public static String[] dmns;
            public static Class[] mts0;
        
            // 省略其他方法
        
            public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
                DemoService demoService;
                try {
                    // 類型轉換
                    demoService = (DemoService)object;
                }
                catch (Throwable throwable) {
                    throw new IllegalArgumentException(throwable);
                }
                try {
                    // 根據方法名調用指定的方法
                    if ("sayHello".equals(string) && arrclass.length == 1) {
                        return demoService.sayHello((String)arrobject[0]);
                    }
                }
                catch (Throwable throwable) {
                    throw new InvocationTargetException(throwable);
                }
                throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
            }
        }

          到這里,整個服務調用過程就分析完了。最后把調用過程貼出來,如下:

        ChannelEventRunnable#run()
          —> DecodeHandler#received(Channel, Object)
            —> HeaderExchangeHandler#received(Channel, Object)
              —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
                —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
                  —> Filter#invoke(Invoker, Invocation)
                    —> AbstractProxyInvoker#invoke(Invocation)
                      —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                        —> DemoServiceImpl#sayHello(String)

          提供方返回調用結果

          服務提供方調用指定服務后,會將調用結果封裝到 Response 對象中,并將該對象返回給服務消費方。服務提供方也是通過 NettyChannel 的 send 方法將 Response 對象返回,這里就不在重復分析了。本節我們僅需關注 Response 對象的編碼過程即可。

        public class ExchangeCodec extends TelnetCodec {
            public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
                if (msg instanceof Request) {
                    encodeRequest(channel, buffer, (Request) msg);
                } else if (msg instanceof Response) {
                    // 對響應對象進行編碼
                    encodeResponse(channel, buffer, (Response) msg);
                } else {
                    super.encode(channel, buffer, msg);
                }
            }
           
            protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
                int savedWriteIndex = buffer.writerIndex();
                try {
                    Serialization serialization = getSerialization(channel);
                    // 創建消息頭字節數組
                    byte[] header = new byte[HEADER_LENGTH];
                    // 設置魔數
                    Bytes.short2bytes(MAGIC, header);
                    // 設置序列化器編號
                    header[2] = serialization.getContentTypeId();
                    if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
                    // 獲取響應狀態
                    byte status = res.getStatus();
                    // 設置響應狀態
                    header[3] = status;
                    // 設置請求編號
                    Bytes.long2bytes(res.getId(), header, 4);
        
                    // 更新 writerIndex,為消息頭預留 16 個字節的空間
                    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
                    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
                    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
                   
                    if (status == Response.OK) {
                        if (res.isHeartbeat()) {
                            // 對心跳響應結果進行序列化,已廢棄
                            encodeHeartbeatData(channel, out, res.getResult());
                        } else {
                            // 對調用結果進行序列化
                            encodeResponseData(channel, out, res.getResult(), res.getVersion());
                        }
                    } else {
                        // 對錯誤信息進行序列化
                        out.writeUTF(res.getErrorMessage())
                    };
                    out.flushBuffer();
                    if (out instanceof Cleanable) {
                        ((Cleanable) out).cleanup();
                    }
                    bos.flush();
                    bos.close();
        
                    // 獲取寫入的字節數,也就是消息體長度
                    int len = bos.writtenBytes();
                    checkPayload(channel, len);
                   
                    // 將消息體長度寫入到消息頭中
                    Bytes.int2bytes(len, header, 12);
                    // 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準備
                    buffer.writerIndex(savedWriteIndex);
                    // 從 savedWriteIndex 下標處寫入消息頭
                    buffer.writeBytes(header);
                    // 設置新的 writerIndex,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度
                    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
                } catch (Throwable t) {
                    // 異常處理邏輯不是很難理解,但是代碼略多,這里忽略了
                }
            }
        }
        
        public class DubboCodec extends ExchangeCodec implements Codec2 {
           
            protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
                Result result = (Result) data;
                // 檢測當前協議版本是否支持帶有 attachment 集合的 Response 對象
                boolean attach = Version.isSupportResponseAttachment(version);
                Throwable th = result.getException();
               
                // 異常信息為空
                if (th == null) {
                    Object ret = result.getValue();
                    // 調用結果為空
                    if (ret == null) {
                        // 序列化響應類型
                        out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);
                    }
                    // 調用結果非空
                    else {
                        // 序列化響應類型
                        out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);
                        // 序列化調用結果
                        out.writeObject(ret);
                    }
                }
                // 異常信息非空
                else {
                    // 序列化響應類型
                    out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);
                    // 序列化異常對象
                    out.writeObject(th);
                }
        
                if (attach) {
                    // 記錄 Dubbo 協議版本
                    result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
                    // 序列化 attachments 集合
                    out.writeObject(result.getAttachments());
                }
            }
        }

          以上就是 Response 對象編碼的過程,和前面分析的 Request 對象編碼過程很相似。如果大家能看 Request 對象的編碼邏輯,那么這里的 Response 對象的編碼邏輯也不難理解,就不多說了。接下來我們再來分析雙向通信的最后一環 —— 服務消費方接收調用結果。

          消費方接收調用結果

          服務消費方在收到響應數據后,首先要做的事情是對響應數據進行解碼,得到 Response 對象。然后再將該對象傳遞給下一個入站處理器,這個入站處理器就是 NettyHandler。接下來 NettyHandler 會將這個對象繼續向下傳遞,最后 AllChannelHandler 的 received 方法會收到這個對象,并將這個對象派發到線程池中。這個過程和服務提供方接收請求的過程是一樣的,因此這里就不重復分析了。

          (1)響應數據解碼

          響應數據解碼邏輯主要的邏輯封裝在 DubboCodec 中,我們直接分析這個類的代碼。如下:

        public class DubboCodec extends ExchangeCodec implements Codec2 {
        
            @Override
            protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
                byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
                Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
                // 獲取請求編號
                long id = Bytes.bytes2long(header, 4);
                // 檢測消息類型,若下面的條件成立,表明消息類型為 Response
                if ((flag & FLAG_REQUEST) == 0) {
                    // 創建 Response 對象
                    Response res = new Response(id);
                    // 檢測事件標志位
                    if ((flag & FLAG_EVENT) != 0) {
                        // 設置心跳事件
                        res.setEvent(Response.HEARTBEAT_EVENT);
                    }
                    // 獲取響應狀態
                    byte status = header[3];
                    // 設置響應狀態
                    res.setStatus(status);
                   
                    // 如果響應狀態為 OK,表明調用過程正常
                    if (status == Response.OK) {
                        try {
                            Object data;
                            if (res.isHeartbeat()) {
                                // 反序列化心跳數據,已廢棄
                                data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                            } else if (res.isEvent()) {
                                // 反序列化事件數據
                                data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                            } else {
                                DecodeableRpcResult result;
                                // 根據 url 參數決定是否在 IO 線程上執行解碼邏輯
                                if (channel.getUrl().getParameter(
                                        Constants.DECODE_IN_IO_THREAD_KEY,
                                        Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                                    // 創建 DecodeableRpcResult 對象
                                    result = new DecodeableRpcResult(channel, res, is,
                                            (Invocation) getRequestData(id), proto);
                                    // 進行后續的解碼工作
                                    result.decode();
                                } else {
                                    // 創建 DecodeableRpcResult 對象
                                    result = new DecodeableRpcResult(channel, res,
                                            new UnsafeByteArrayInputStream(readMessageData(is)),
                                            (Invocation) getRequestData(id), proto);
                                }
                                data = result;
                            }
                           
                            // 設置 DecodeableRpcResult 對象到 Response 對象中
                            res.setResult(data);
                        } catch (Throwable t) {
                            // 解碼過程中出現了錯誤,此時設置 CLIENT_ERROR 狀態碼到 Response 對象中
                            res.setStatus(Response.CLIENT_ERROR);
                            res.setErrorMessage(StringUtils.toString(t));
                        }
                    }
                    // 響應狀態非 OK,表明調用過程出現了異常
                    else {
                        // 反序列化異常信息,并設置到 Response 對象中
                        res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
                    }
                    return res;
                } else {
                    // 對請求數據進行解碼,前面已分析過,此處忽略
                }
            }
        }

          以上就是響應數據的解碼過程,上面邏輯看起來是不是似曾相識。對的,我們在前面章節分析過 DubboCodec 的 decodeBody 方法中關于請求數據的解碼過程,該過程和響應數據的解碼過程很相似。下面,我們繼續分析調用結果的反序列化過程

        public class DecodeableRpcResult extends AppResponse implements Codec, Decodeable {
        
            private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class);
        
            private Channel channel;
        
            private byte serializationType;
        
            private InputStream inputStream;
        
            private Response response;
        
            private Invocation invocation;
        
            private volatile boolean hasDecoded;
        
            public DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id) {
                Assert.notNull(channel, "channel == null");
                Assert.notNull(response, "response == null");
                Assert.notNull(is, "inputStream == null");
                this.channel = channel;
                this.response = response;
                this.inputStream = is;
                this.invocation = invocation;
                this.serializationType = id;
            }
        
            @Override
            public void encode(Channel channel, OutputStream output, Object message) throws IOException {
                throw new UnsupportedOperationException();
            }
        
            @Override
            public Object decode(Channel channel, InputStream input) throws IOException {
                ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                        .deserialize(channel.getUrl(), input);
                // 反序列化響應類型
                byte flag = in.readByte();
                switch (flag) {
                    case DubboCodec.RESPONSE_NULL_VALUE:
                        break;
                    case DubboCodec.RESPONSE_VALUE:
                        handleValue(in);
                        break;
                    case DubboCodec.RESPONSE_WITH_EXCEPTION:
                        handleException(in);
                        break;
                        // 返回值為空,且攜帶了 attachments 集合
                    case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
                        handleAttachment(in);
                        break;
                        //返回值不為空,且攜帶了 attachments 集合
                    case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
                        handleValue(in);
                        handleAttachment(in);
                        break;
                    // 異常對象不為空,且攜帶了 attachments 集合
                    case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
                        handleException(in);
                        handleAttachment(in);
                        break;
                    default:
                        throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', but received: " + flag);
                }
                if (in instanceof Cleanable) {
                    ((Cleanable) in).cleanup();
                }
                return this;
            }

          正常調用下,線程會進入 RESPONSE_VALUE_WITH_ATTACHMENTS 分支中。然后線程會從 invocation 變量(大家探索一下 invocation 變量的由來)中獲取返回值類型,接著對調用結果進行反序列化,并將序列化后的結果存儲起來。最后對 attachments 集合進行反序列化,并存到指定字段中。

          異步轉同步

          Dubbo發送數據至服務方后,在通信層面是異步的,通信線程并不會等待結果數據返回。而我們在使用Dubbo進行RPC調用缺省就是同步的,這其中就涉及到了異步轉同步的操作。

          而在2.7.x版本中,這種自實現的異步轉同步操作進行了修改。新的`DefaultFuture`繼承了`CompletableFuture`,新的`doReceived(Response res)`方法如下:

        private void doReceived(Response res) {
            if (res == null) {
                throw new IllegalStateException("response cannot be null");
            }
            if (res.getStatus() == Response.OK) {
                this.complete(res.getResult());
            } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
                this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
            } else {
                this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
            }
        }

          通過`CompletableFuture#complete`方法來設置異步的返回結果,且刪除舊的`get()`方法,使用`CompletableFuture#get()`方法:

        public T get() throws InterruptedException, ExecutionException {
            Object r;
            return reportGet((r = result) == null ? waitingGet(true) : r);
        }

          使用`CompletableFuture`完成了異步轉同步的操作。

          異步多線程數據一致

          這里簡單說明一下。一般情況下,服務消費方會并發調用多個服務,每個用戶線程發送請求后,會調用 get 方法進行等待。 一段時間后,服務消費方的線程池會收到多個響應對象。這個時候要考慮一個問題,如何將每個響應對象傳遞給相應的 Future 對象,不出錯。答案是通過調用**編號**。Future 被創建時,會要求傳入一個 Request 對象。此時 DefaultFuture 可從 Request 對象中獲取調用編號,并將 <調用編號, DefaultFuture 對象> 映射關系存入到靜態 Map 中,即 FUTURES。線程池中的線程在收到 Response 對象后,會根據 Response 對象中的調用編號到 FUTURES 集合中取出相應的 DefaultFuture 對象,然后再將 Response 對象設置到 DefaultFuture 對象中。這樣用戶線程即可從 DefaultFuture 對象中獲取調用結果了。整個過程大致如下圖:

          

        1663310007996_4.jpg

        private DefaultFuture(Channel channel, Request request, int timeout) {
            this.channel = channel;
            this.request = request;
            this.id = request.getId();
            this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            // put into waiting map.
            FUTURES.put(id, this);
            CHANNELS.put(id, channel);
        }

          心跳檢查

          Dubbo采用雙向心跳的方式檢測Client端與Server端的連通性。

          我們再來看看 Dubbo 是如何設計應用層心跳的。Dubbo 的心跳是雙向心跳,客戶端會給服務端發送心跳,反之,服務端也會向客戶端發送心跳。

          創建定時器

        public class HeaderExchangeClient implements ExchangeClient {
        
            private final Client client;
            private final ExchangeChannel channel;
        
            private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(
                    new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
           
            private HeartbeatTimerTask heartBeatTimerTask;
            private ReconnectTimerTask reconnectTimerTask;
        
            public HeaderExchangeClient(Client client, boolean startTimer) {
                Assert.notNull(client, "Client can't be null");
                this.client = client;
                this.channel = new HeaderExchangeChannel(client);
        
                if (startTimer) {
                    URL url = client.getUrl();
                    //開啟心跳失敗之后處理重連,斷連的邏輯定時任務
                    startReconnectTask(url);
                    //開啟發送心跳請求定時任務
                    startHeartBeatTask(url);
                }
            }

          Dubbo 在 `HeaderExchangeClient `初始化時開啟了兩個定時任務

          `startReconnectTask` 主要用于定時發送心跳請求

          `startHeartBeatTask` 主要用于心跳失敗之后處理重連,斷連的邏輯

          發送心跳請求

          詳細解析下心跳檢測定時任務的邏輯 `HeartbeatTimerTask#doTask`:

        protected void doTask(Channel channel) {
              Long lastRead = lastRead(channel);
              Long lastWrite = lastWrite(channel);
              if ((lastRead != null && now() - lastRead > heartbeat)
                  || (lastWrite != null && now() - lastWrite > heartbeat)) {
                  Request req = new Request();
                  req.setVersion(Version.getProtocolVersion());
                  req.setTwoWay(true);
                  req.setEvent(Request.HEARTBEAT_EVENT);
                  channel.send(req);
              }
           }

          前面已經介紹過,**Dubbo 采取的是雙向心跳設計**,即服務端會向客戶端發送心跳,客戶端也會向服務端發送心跳,接收的一方更新 lastRead 字段,發送的一方更新 lastWrite 字段,超過心跳間隙的時間,便發送心跳請求給對端。這里的 lastRead/lastWrite 同樣會被同一個通道上的普通調用更新,通過更新這兩個字段,實現了只在連接空閑時才會真正發送空閑報文的機制,符合我們一開始科普的做法。

          處理重連和斷連

          繼續研究下重連和斷連定時器都實現了什么 `ReconnectTimerTask#doTask`。

           protected void doTask(Channel channel) {
               Long lastRead = lastRead(channel);
               Long now = now();
               if (!channel.isConnected()) {
                   ((Client) channel).reconnect();
                   // check pong at client
               } else if (lastRead != null && now - lastRead > idleTimeout) {
                   ((Client) channel).reconnect();
               }
            }

          第二個定時器則負責根據客戶端、服務端類型來對連接做不同的處理,當超過設置的心跳總時間之后,客戶端選擇的是重新連接,服務端則是選擇直接斷開連接。這樣的考慮是合理的,客戶端調用是強依賴可用連接的,而服務端可以等待客戶端重新建立連接。

          Dubbo 對于建立的每一個連接,同時在客戶端和服務端開啟了 2 個定時器,一個用于定時發送心跳,一個用于定時重連、斷連,執行的頻率均為各自檢測周期的 1/3。定時發送心跳的任務負責在連接空閑時,向對端發送心跳包。定時重連、斷連的任務負責檢測 lastRead 是否在超時周期內仍未被更新,如果判定為超時,客戶端處理的邏輯是重連,服務端則采取斷連的措施。

        分享到:
        在線咨詢 我要報名

        CHINESE熟女老女人HD

          <strike id="ddzbl"><span id="ddzbl"><em id="ddzbl"></em></span></strike>
          <font id="ddzbl"><sub id="ddzbl"><mark id="ddzbl"></mark></sub></font>

            <address id="ddzbl"></address>
            <listing id="ddzbl"></listing>

              <track id="ddzbl"><span id="ddzbl"><progress id="ddzbl"></progress></span></track>