1、前言 昨天分析了ipc包下的RPC、Client类,今天来分析下ipc.Server。Server类因为是Hadoop自己使用,所以代码结构以及流程都很清晰,可以清楚的看到实例化、停止、运行等过程。
2、Server类结构 上面是Server的五个内部类,分别介绍一下:
1)Call
用以存储客户端发来的请求,这个请求会放入一个BlockQueue中;
2)Listener
监听类,用以监听客户端发来的请求。同时Listener下面还有一个静态类,Listener.Reader,当监听器监听到用户请求,便用让Reader读取用户请求。
3)Responder
响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
4)Connection
连接类,真正的客户端请求读取逻辑在这个类中。
5)Handler
请求(blockQueueCall)处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。
3、Server初始化
第一篇博客 说了,Server的初始化入口在RPC.getServer中,getServer其实是调用的RPC.Server静态类中的构造方法,我们看看Namenode创建RPCServer的方法和RPC.Server构造方法代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private void initialize (Configuration conf) throws IOException { … this .serviceRpcServer = RPC.getServer(this , dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, false , conf, namesystem.getDelegationTokenSecretManager()); this .serviceRpcServer.start(); } ``` ``` java public Server (Object instance, Configuration conf, String bindAddress, int port, int numHandlers, boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { super (bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()), secretManager); this .instance = instance; this .verbose = verbose; }
该方法调用了父类的构造方法,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 protected Server (String bindAddress, int port, Class<? extends Writable> paramClass, int handlerCount, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { this .bindAddress = bindAddress; this .conf = conf; this .port = port; this .paramClass = paramClass; this .handlerCount = handlerCount; this .socketSendBufferSize = 0 ; this .maxQueueSize = handlerCount * conf.getInt( IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); this .maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT); this .readThreads = conf.getInt( IPC_SERVER_RPC_READ_THREADS_KEY, IPC_SERVER_RPC_READ_THREADS_DEFAULT); this .callQueue = new LinkedBlockingQueue<Call>(maxQueueSize); this .maxIdleTime = 2 *conf.getInt("ipc.client.connection.maxidletime" , 1000 ); this .maxConnectionsToNuke = conf.getInt("ipc.client.kill.max" , 10 ); this .thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold" , 4000 ); this .secretManager = (SecretManager<TokenIdentifier>) secretManager; this .authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false ); this .isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); listener = new Listener(); this .port = listener.getAddress().getPort(); this .rpcMetrics = RpcInstrumentation.create(serverName, this .port); this .tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay" , false ); responder = new Responder(); if (isSecurityEnabled) { SaslRpcServer.init(conf); } }
不难看出,父类的构造方法就初始化了一些配置和变量。
4、Server运行 在上面第一段代码中,还有一句RpcServer.start()的方法,在调用构造函数初始化一些变量之后,Server就可以正式运行起来了:
1 2 3 4 5 6 7 8 9 10 public synchronized void start () { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0 ; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }
responder、listener、handlers三个对象的线程均阻塞了,前两个阻塞在selector.select()方法上,handler阻塞在callQueue.take()方法,都在等待客户端请求。Responder设置了超时时间,为15分钟。而listener还开启了Reader线程,该线程也阻塞了。
4、Server接受请求流程 1)监听到请求
Listener监听到请求,获得所有请求的SelectionKey,执行doAccept(key)方法,该方法将所有的连接对象放入list中,并将connection对象与key绑定,以供reader使用。初始化玩所有的conne对象之后,就可以激活Reader线程了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void doAccept (SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null ; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null ) { channel.configureBlocking(false ); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); try { reader.startAdd(); SelectionKey readKey = reader.registerChannel(channel); c = new Connection(readKey, channel, System.currentTimeMillis()); readKey.attach(c); synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } … } finally { reader.finishAdd(); } } }
2)接收请求
Reader的run方法和Listener基本一致,也是获得所有的SelectionKey,再执行doRead(key)方法。该方法获得key中绑定的connection,并执行conection的readAndProcess()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 void doRead (SelectionKey key) throws InterruptedException { int count = 0 ; Connection c = (Connection)key.attachment(); if (c == null ) { return ; } c.setLastContact(System.currentTimeMillis()); try { count = c.readAndProcess(); } catch (InterruptedException ieo) { … } if (count < 0 ) { … closeConnection(c); c = null ; } else { c.setLastContact(System.currentTimeMillis()); } } ``` ``` java public int readAndProcess () throws IOException, InterruptedException { while (true ) { int count = –1 ; if (dataLengthBuffer.remaining() > 0 ) { count = channelRead(channel, dataLengthBuffer); … if (!rpcHeaderRead) { if (rpcHeaderBuffer == null ) { rpcHeaderBuffer = ByteBuffer.allocate(2 ); } count = channelRead(channel, rpcHeaderBuffer); if (count < 0 || rpcHeaderBuffer.remaining() > 0 ) { return count; } int version = rpcHeaderBuffer.get(0 ); byte [] method = new byte [] {rpcHeaderBuffer.get(1 )}; authMethod = AuthMethod.read(new DataInputStream( new ByteArrayInputStream(method))); dataLengthBuffer.flip(); … dataLengthBuffer.clear(); … rpcHeaderBuffer = null ; rpcHeaderRead = true ; continue ; } … data = ByteBuffer.allocate(dataLength); } count = channelRead(channel, data); if (data.remaining() == 0 ) { … if (useSasl) { saslReadAndProcess(data.array()); } else { processOneRpc(data.array()); } … } return count; } }
3)获得call请求
在Connection中解析param请求中,解析了请求数据,并构造Call对象,将其加入callQueue。
1 2 3 4 5 6 7 8 9 10 11 12 13 private void processData (byte [] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); … Writable param = ReflectionUtils.newInstance(paramClass, conf); param.readFields(dis); Call call = new Call(id, param, this ); callQueue.put(call); incRpcCount(); }
4)处理call对象
Connection给callQueue添加了call对象,阻塞的Handler可以继续运行了,拿出一个call对象,并调用RPC.Call方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 while (running) { final Call call = callQueue.take(); CurCall.set(call); value = call(call.connection.protocol, call.param, call.timestamp); CurCall.set(null ); synchronized (call.connection.responseQueue) { setupResponse(buf, call, (error == null ) ? Status.SUCCESS : Status.ERROR, value, errorClass, error); … responder.doRespond(call); } }
5)响应请求
上面代码中的setupResponse将call的id和状态发送回去,再设置了call中的response:ByteBuffer,之后就开始responder.doRespond(call)了,processResponse以及Responder.run()没太弄明白,就先不说了。
1 2 3 4 5 6 7 8 9 10 void doRespond (Call call) throws IOException { synchronized (call.connection.responseQueue) { call.connection.responseQueue.addLast(call); if (call.connection.responseQueue.size() == 1 ) { processResponse(call.connection.responseQueue, true ); } } }
6、总结 Server用的标准的Java TCP/IP NIO通信,同时请求的超时使用基于BlockingQueue以及wait/notify机制实现。使用的模式是reactor模式,关于nio和reactor可以参考这个博客 。
对于服务器端接收多个连接请求的需求,Server采用Listener来监听连接的事件,并用Listener.Reader来监听网络流读以及Responder监听写的事件,当有实际的网络流读写时间发生之后,解析了请求Call之后,添加进阻塞队列,并交由多个Handlers来处理请求。
这个方法比TCP/IP BIO好处就是可接受很多的连接,而这些连接只在真实的请求时才会创建线程处理,称之为一请求一处理。但是,连接上的请求发送非常频繁时,TCP/IP NIO的方法并不会带来太大的优势。
但是Hadoop实际场景中,通常是服务器端支持大量的连接数(Namenode连上几千个Datanode),但是连接发送的请求并不会太多(heartbeat、blockreport都有较长间隔)。这样就造成了Hadoop不适合实时的、多请求的运算,带来的代价是模型、实现简单,但是这也为以后的扩展埋下了祸根。
P.S.: 以上分析基于稳定版0.20.203.0rc1。