Skip to content

Implement_steam

Geng Zhang edited this page Feb 19, 2017 · 1 revision

流式调用实现细节

设计

需求

流式调用是指客户端和服务端都能在一个方法内发送多次请求,或者多个返回值。
流式调用区别于Callback:

  1. 流式调用服务端客户端都可以发起,Callback是服务端发起。
  2. 流式调用的接口StreamObserver是一次性的,而Callback可以重复利用。

接口设计

public interface StreamObserver<V> {

    /**
     * 传输一段流对象(可调用多次)
     *
     * @param value 值
     */
    public void onValue(V value);

    /**
     * 传输完毕(和onError方法两者只可调一次)
     */
    public void onCompleted();

    /**
     * 传输出现异常,需要终止等(和onCompleted方法两者只可调一次)
     *
     * @param t 异常
     */
    public void onError(Throwable t);
}

实现

1.启动阶段

不管是服务端发布者还是服务调用者,在启动时候都会扫描接口下的每个方法、 如果满足如下条件,则保存,要不然就报错。

  1. 不存在多个StreamObserver参数,只能一个
  2. StreamObserver类型必须指定类型,不能不指定或者泛型。 正确:例如:StreamObserver<List>,StreamObserver 错误:StreamObserver, StreamObserver>, StreamObserver>, StreamObserver<List<List>>

记录到缓存中,格式为:{接口+方法名:实际类型}

2.客户端传递StreamObserver场景

  1. 客户端在调用有StreamObserver参数的方法(以下简称Stream方法)的时候,会传递一个StreamObserver现类,例如:

    String result = helloService.download("paramxxx", new StreamObserver<String>() {
        @Override
        public void onValue(String value) {
        }
        @Override
        public void onCompleted() {
           
        }
        @Override
        public void onError(Throwable t) {
          
        }
    });
  2. 发现这个方法是一个Stream方法(之前启动阶段已解析),进行特殊处理

    if (StreamUtils.hasStreamObserverParameter(key)) {
        StreamUtils.preMsgSend(request, this.channel);
    }
  3. 先对这个实现类生成一个streamInsKey,保留streamInsKey和实现类的关系

  4. 虚拟一个StreamObserverStub对象(保存streamInsKey),然后将Stream方法的StreamObserver参数替换为StreamObserverStub

    request.getArgs()[i] = new StreamObserverStub<>(streamInsKey);
  5. 视为一个正常请求,发送给服务端

服务端收到请求后,会经过服务端处理:

  1. 发现这个方法是一个Stream方法(之前启动阶段已解析),进行特殊处理

    if (StreamUtils.hasStreamObserverParameter(methodKey)) {
        StreamUtils.preMsgHandle(request, channel);
    }
  2. 先根据当前长连接(客户端->服务端)生成一个反向长连接(服务端->客户端),按连接信息缓存起来

    // 使用一个已有的channel虚拟化一个反向长连接
    ClientTransport clientTransport = ClientTransportFactory.getReverseClientTransport(channel);
  3. 读取Stream方法的StreamObserverStub参数,然后将反向长连接设置到Stub中

    stub.setClientTransport(clientTransport).initByMessage(request);
  4. 视为一个正常请求,进行调用。此时服务端实现类能拿到StreamObserver对象。

3.服务端返回StreamObserver场景

  1. 服务端在返回StreamObserver返回值方法(以下简称Stream方法)的时候,会传递一个StreamObserver实现类,例如:

    public StreamObserver<String> upload(String name) {
        return new StreamObserver<String>() {
            @Override
            public void onValue(String value) {
            }
    
            @Override
            public void onCompleted() {
            }
    
            @Override
            public void onError(Throwable t) {
            }
        };
    }
  2. 发现这个方法是一个Stream方法(之前启动阶段已解析),进行特殊处理

    if (StreamUtils.hasStreamObserverReturn(methodKey)) {
        StreamUtils.preMessageReturn(request, response, channel);
    }
  3. 先对这个实现类生成一个streamInsKey,保留streamInsKey和实现类的关系

  4. 虚拟一个StreamObserverStub对象(保存streamInsKey),然后将Stream方法的StreamObserver参数替换为StreamObserverStub

    response.setReturnData(new StreamObserverStub<>(streamInsKey));
  5. 视为一个正常请求,发送给客户端

客户端收到请求后,会经过服务端处理:

  1. 客户端收到Response,只有id,是没有接口名方法名这种,发现等待的Future是个Stream类型的(发送的时候记住类型),进行特殊处理

    if (isStreamReturn() && msg instanceof RpcResponse) {
        StreamUtils.preMsgReceive((RpcResponse) msg, clientTransport);
    }
  2. 注意不需要生成长连接,直接复用客户端到服务端的Channel即可

    stub.setClientTransport(clientTransport).initByMessage(response);
  3. 视为一个正常请求,进行调用。此时客户端能拿到StreamObserverStub对象。

4.StreamObserver使用

  1. Stream发送方拿到StreamObserver代理类(其实是StreamObserverStub)后可以调用onValue,onError,OnCompleted方法。

  2. Stream发送方拦截StreamObserverStub的方法,拼装为RpcRequest,然后设置Head里的streamInsKey为stub的streamInsKey

  3. Stream发送方,通过反向长连接,发送请求到Stream接收方

  4. Stream接收方收到RpcRequest,发现Head里面带streamInsKey关键字,生成一个StreamTask,异步执行

    if (streamInsKey != null) { // 客户端发给服务端的stream请求
        StreamTask task = new StreamTask(request, channel);
        bizThreadPool.submit(task); 
    }
    
    if (streamInsKey != null) {  // 服务端发给客户的stream请求
        StreamTask task = new StreamTask(request, clientTransport.getChannel());
        AsyncContext.getAsyncThreadPool().execute(task);
    }
  5. Stream接收方根据streamInsKey找到真正的StreamObserver实例。

    String streamInsKey = (String) request.getHeadKey(HeadKey.STREAM_INS_KEY);
    StreamObserver observer = StreamContext.getStreamIns(streamInsKey);
  6. 客户端执行真正调用后,作为一个正常的RpcResponse返回。

  7. 服务端收到响应后,根据连接信息,找到反向长连接,执行回复事件。

    ClientTransport clientTransport = ClientTransportFactory.getReverseClientTransport(channelKey);
    clientTransport.receiveRpcResponse(response);

资源回收

服务端

Stream发送方主要维护如下:

  1. Stream方法缓存:启动时扫描加载,不回收。
  2. 反向长连接列表:在客户端断开长连接时候进行回收。

客户端

Stream接收方主要维护如下:

  1. Stream方法缓存:启动时扫描加载,不回收。
  2. StreamObserver实例列表:
    StreamObserver一次性,如果调用了OnError或者onCompleted的方法就标记位可回收
    如果没调用OnError或者onCompleted的方法,那么存在保留了StreamInsKey与真正StreamObserver实例的对应关系**???**,就存在垃圾映射关系

Clone this wiki locally