Hadoop源码 - ipc.Client
1、前言
上篇博客分析了ipc包下的RPC类,这篇博客来看看Client类吧。
Hadoop的RPC机制还是挺简单的,简述如下:
- 创建代理对象;
- 代理对象调用相应方法(invoke());
- invoke调用client对象的call方法,向服务器发送请求(参数、方法);
- 再等待call方法的完成;
- 返回请求结果。
具体是怎么实现的,下面来看看吧。
2、Client类分析
有了RPC大致分析,Client我就挑重要的分析了。
1)connections
RPC中就有了ClientCache类,那么client可以复用,所以一个client对象会有多个连接对象,实现中是用HashTable<ConnectionId, Connection>
存储的连接对象。
其中,ConnectionId是用来唯一标识连接的ID,主要由客户端地址、时间戳再结合一个素数生成;Connection是Client中的静态内部类,用以处理远程连接对象。
2)Call
客户端方法调用的实体类,存放了id、参数、返回值等。需要注意的是,Call类中有callComplete()方法,在一次call调用完毕之后调用,并调用notify()通知client接收完毕。
3)call(Writable param, ConnectionId remoteId)
1 | public Object invoke(Object proxy, Method method, Object[] args) |
1 | // client请求方法 |
1 | // Connection线程,等待服务器响应 |
1 | private void receiveResponse() { |
1 | public synchronized void setValue(Writable value) { |
该方法由invoker调用,调用过程如下:
- 构建Call对象
- 用remoteId获得connection对象
2.1 如果connections中有remoteId,取得该connection;反之,创建一个,并添加进connections;
2.2 connection.setupIOstreams()连接到服务器,并配置好连接对象,发送协议头,接着运行connection线程,等待接收工作(waitForWork()); - 调用connection.sendParam发送协议体,等待接收响应,call.wait();;
3.1 receiveResponse(),依次读入call的值(id,value);
3.2 标记接收结束(markClosed),同时notifyAll(); - 获得返回值,返回调用者。
3、异步/同步模型
Hadoop的RPC对外的接口其实是同步的,但是,RPC的内部实现其实是异步消息机制。hadoop用线程wait/notify机制实现异步转同步,发送请求(call)之后wait请求处理完毕,接收完响应(connection.receiveResponse())之后notify,notify()方法在call.setValue中。
但现在有一个问题,一个connection有多个call。可能同时有多个call在等待接收消息,那么是当client接收到response后,怎样确认它到底是之前哪个request的response呢?这个就是依靠的connection中的一个HashTable<Integer, Call>了,其中的Integer是用来标识Call,这样就可以将request和response对应上了。
参考资料: