zoukankan      html  css  js  c++  java
  • Hbase源码分析:server端RPC

    server端rpc包括master和RegionServer。接下来主要梳理一下,master和regionserver中有关rpc创建,启动以及处理的过程。

    1,server rpc的初始化过程

    首先看一下上篇rpc概述中有关hbase rpc端的总体流程图。

    由于HMaster继承自HRegionServer,master和region server中有关rpc的成员变量主要在HRegionServer中,主要包括(rpcServices和rpcClient)。当前主要讨论rpcServices,有关RpcClient会另外单独讨论。

    Master和Region Server启动过程中有关rpc初始化和启动过程中的步骤如下:

    1,在HRegionServer构造函数中调用createRpcService生成RSRpcServices对象。如果是master启动,HRegionServer是Hmaster的父类,该函数也会调用。

    1   protected RSRpcServices createRpcServices() throws IOException {
    2     return new RSRpcServices(this);
    3   }

    2,在RSRpcServices的构造函数中,生成RpcServer对象。

    1     rpcServer = new RpcServer(rs, name, getServices(),
    2       bindAddress, // use final bindAddress for this server.
    3       rs.conf,
    4       rpcSchedulerFactory.create(rs.conf, this, rs));

    在构造RpcServer对象的过程中,HMaster和HRegionServer分别实现了getService()函数以使HMaster和HRegionServer响应不同的rpc服务。

    3,在RpcServer的构造函数中,分别生成Listener,responder以及scheduler等几个重要的对象

     1     // Start the listener here and let it bind to the port
     2     listener = new Listener(name);
     3     this.port = listener.getAddress().getPort();
     4 
     5     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
     6     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
     7     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
     8 
     9     this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
    10     this.delayedCalls = new AtomicInteger(0);
    11     this.ipcUtil = new IPCUtil(conf);
    12 
    13 
    14     // Create the responder here
    15     responder = new Responder();
    16     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
    17     this.userProvider = UserProvider.instantiate(conf);
    18     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
    19     if (isSecurityEnabled) {
    20       HBaseSaslRpcServer.init(conf);
    21     }
    22     this.scheduler = scheduler;
    23     this.scheduler.init(new RpcSchedulerContext(this));

    4,在Listener的构造函数中,还包含了readThreads个reader用来读取请求。

     1 readers = new Reader[readThreads];
     2       readPool = Executors.newFixedThreadPool(readThreads,
     3         new ThreadFactoryBuilder().setNameFormat(
     4           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
     5           ",port=" + port).setDaemon(true).build());
     6       for (int i = 0; i < readThreads; ++i) {
     7         Reader reader = new Reader();
     8         readers[i] = reader;
     9         readPool.execute(reader);
    10       }

    在以上这些对象构造完成以后,在HRegionServer的构造函数中会调用rpcServices.start()---》rpcServer.start(). 在rpcServer start函数中会分别启动responder,listener以及scheduler。

     1   public synchronized void start() {
     2     if (started) return;
     3     authTokenSecretMgr = createSecretManager();
     4     if (authTokenSecretMgr != null) {
     5       setSecretManager(authTokenSecretMgr);
     6       authTokenSecretMgr.start();
     7     }
     8     this.authManager = new ServiceAuthorizationManager();
     9     HBasePolicyProvider.init(conf, authManager);
    10     responder.start();
    11     listener.start();
    12     scheduler.start();
    13     started = true;
    14   }

    2,server rpc的处理过程

    rpcserver监控,读取,请求基于Reactor模式, 流程图如下(来自引用)。 

    2.1 Listener

    对于Listener,有一个acceptChannle的ServerSocketChannel,acceptChannle在selector注册了OP_ACCEPT事件,同时Listener中包含了readThreads的readers线程由线程池管理。Listener的主要处理流程在doRunLoop函数中:

     1 private synchronized void doRunLoop() {
     2         while (running) {
     3           try {
     4             readSelector.select();
     5             while (adding) {
     6               this.wait(1000);
     7             }
     8 
     9             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
    10             while (iter.hasNext()) {
    11               SelectionKey key = iter.next();
    12               iter.remove();
    13               if (key.isValid()) {
    14                 if (key.isReadable()) {
    15                   doRead(key);
    16                 }
    17               }
    18             }
    19           } catch (InterruptedException e) {
    20             LOG.debug("Interrupted while sleeping");
    21             return;
    22           } catch (IOException ex) {
    23             LOG.info(getName() + ": IOException in Reader", ex);
    24           }
    25         }
    26       }

    当没有请求的时候,线程阻塞在第4行的select处。当有请求来临时,在判断请求有效后,会读取该连接上的请求上的数据(具体逻辑在readAndProcess函数中)。读取数据以后,会处理数据,具体在processOneRpc函数中。

     1     private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
     2       if (connectionHeaderRead) {
     3         processRequest(buf);
     4       } else {
     5         processConnectionHeader(buf);
     6         this.connectionHeaderRead = true;
     7         if (!authorizeConnection()) {
     8           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
     9           // down the connection instead of trying to read non-existent retun.
    10           throw new AccessDeniedException("Connection from " + this + " for service " +
    11             connectionHeader.getServiceName() + " is unauthorized for user: " + user);
    12         }
    13       }
    14     }

    根据连接头是否已经读取,如果没有读取连接头信息,变通过ProcessConnectionHeader读取连接头信息。如果读取连接头信息以后,会解析请求,并且将请求构造成统一的结构CallRunner,最终这个CallRunnder会被添加到scheduler中任务队列中,根据不同的调度策略(FifoRpcScheduler和SimpleRpcScheduler)进行处理。ProcessRequest的核心代码如下:

    1       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
    2               totalRequestSize, traceInfo, RpcServer.getRemoteIp());
    3       scheduler.dispatch(new CallRunner(RpcServer.this, call));

    2.2 Scheduler

    Scheduler 默认使用了SimpleRpcScheduler。SimpleRpcScheduler包含三个不同的RpcExecutor(callExecutor,priorityExecutor,replicationExecutor)。对于大部分基于用户表的请求都是通过callExecutor来执行,callExecutor从之前添加的请求任务队列中获取请求,并且将请求交流对应的handler进行处理。具体逻辑在RpcExecutor的consumerLoop中,如下:

     1  protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
     2     boolean interrupted = false;
     3     double handlerFailureThreshhold =
     4         conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
     5           HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
     6     try {
     7       while (running) {
     8         try {
     9           MonitoredRPCHandler status = RpcServer.getStatus();
    10           CallRunner task = myQueue.take();
    11           task.setStatus(status);
    12           try {
    13             activeHandlerCount.incrementAndGet();
    14             task.run();
    15           } 

    由于myQueue是阻塞队列,如果没有请求,那么scheduler将阻塞在第10行take处。否则将执行CallRunner中的run函数。而紧接着会调用rpcServer中的call函数。

    1         // make the call
    2         resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
    3           call.timestamp, this.status);

    而在rpcServer的call函数中,首先会根据请求调用本地的对应的实现函数,并且通过阻塞的方法调用,返回结果(result)。

    1 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
    2       Message result = service.callBlockingMethod(md, controller, param);
    3 ...
    4 return new Pair<Message, CellScanner>(result, controller.cellScanner());

    并且在CallRunner的 run函数中,将结果通过调用setResponse函数生成返回结果,将结果通过调用sendResponseIfReady通过responder将结果返回给client端。

          // Set the response for undelayed calls and delayed calls with
          // undelayed responses.
          if (!call.isDelayed() || !call.isReturnValueDelayed()) {
            Message param = resultPair != null ? resultPair.getFirst() : null;
            CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
            call.setResponse(param, cells, errorThrowable, error);
          }

    2.3 responder

    responder负责将结果写回给client端。responder的实现也是通过类似Listener的React模式。上面schedule调度执行完以后生成的结果,将通过doRespond函数加入到返回结果的相应队列里面。在这个函数里面,如果一次channlewrite能够完成操作,则直接完成该写结果请求。否则将该call的connection注册OP_WRITE到selector。

     1 void doRespond(Call call) throws IOException {
     2       boolean added = false;
     3 
     4       // If there is already a write in progress, we don't wait. This allows to free the handlers
     5       //  immediately for other tasks.
     6       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
     7         try {
     8           if (call.connection.responseQueue.isEmpty()) {
     9             // If we're alone, we can try to do a direct call to the socket. It's
    10             //  an optimisation to save on context switches and data transfer between cores..
    11             if (processResponse(call)) {
    12               return; // we're done.
    13             }
    14             // Too big to fit, putting ahead.
    15             call.connection.responseQueue.addFirst(call);
    16             added = true; // We will register to the selector later, outside of the lock.
    17           }
    18         } finally {
    19           call.connection.responseWriteLock.unlock();
    20         }
    21       }
    22 
    23       if (!added) {
    24         call.connection.responseQueue.addLast(call);
    25       }
    26       call.responder.registerForWrite(call.connection);
    27 
    28       // set the serve time when the response has to be sent later
    29       call.timestamp = System.currentTimeMillis();
    30     }
    31   }

    在registerForWrite中会唤醒writeSelect,使得一旦有该连接上的请求数据过来,那么responder将通过doAsSyncWrite--》ProcessAllResponse处理请求,此时便和Listener的处理类似了。

     1           registerWrites();
     2           int keyCt = writeSelector.select(purgeTimeout);
     3           if (keyCt == 0) {
     4             continue;
     5           }
     6 
     7           Set<SelectionKey> keys = writeSelector.selectedKeys();
     8           Iterator<SelectionKey> iter = keys.iterator();
     9           while (iter.hasNext()) {
    10             SelectionKey key = iter.next();
    11             iter.remove();
    12             try {
    13               if (key.isValid() && key.isWritable()) {
    14                 doAsyncWrite(key);
    15               }

    3 小结

    本文结合代码了解了rpcserver的listener,reader,scheduler以及responder处理rpc请求的过程。对server端处理rpc请求有了一个较为清晰的认识。接下来会对client端的rpc请求逻辑做一个梳理,加油!

  • 相关阅读:
    js正则匹配以某字符串开始字符串
    vue+vue-resource+vue-cookie随笔
    [考试反思]1001csp-s模拟测试(b):逃离
    [考试反思]0929csp-s模拟测试55:消逝
    [考试反思]0928csp-s模拟测试54:转瞬
    [考试反思]0927csp-s模拟测试53:沦陷
    [考试反思]0926csp-s模拟测试52:审判
    [考试反思]0924csp-s模拟测试51:破碎
    Function:凸包,决策单调性,题意转化,单峰函数三分,离线处理
    土兵占领:二分答案,最大流
  • 原文地址:https://www.cnblogs.com/superhedantou/p/5840635.html
Copyright © 2011-2022 走看看