本文共 11444 字,大约阅读时间需要 38 分钟。
之前在《》一文当中介绍了Hadoop的RPC机制,今天给大家分享关于YARN的RPC的机制。下面是今天的分享目录:
下面开始今天的内容分享。
我们知道在Hadoop的RPC当中,其主要由RPC,Client及Server这三个大类组成,分别实现对外提供编程接口、客户端实现及服务端实现。如下图所示:
图中是Hadoop的RPC的一个类的关系图,大家可以到《》一文中,通过代码示例去理解他们之间的关系,这里就不多做赘述了。接下来,我们去看Yarn的RPC。
Yarn对外提供的是YarnRPC这个类,这是一个抽象类,通过阅读YarnRPC的源码可以知道,实际的实现由参数yarn.ipc.rpc.class设定,默认情况下,其值为:org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC,部分代码如下:
public abstract class YarnRPC { // ...... public static YarnRPC create(Configuration conf) { LOG.debug("Creating YarnRPC for " + conf.get(YarnConfiguration.IPC_RPC_IMPL)); String clazzName = conf.get(YarnConfiguration.IPC_RPC_IMPL); if (clazzName == null) { clazzName = YarnConfiguration.DEFAULT_IPC_RPC_IMPL; } try { return (YarnRPC) Class.forName(clazzName).newInstance(); } catch (Exception e) { throw new YarnRuntimeException(e); } }}
public class YarnConfiguration extends Configuration { //Configurations public static final String YARN_PREFIX = "yarn."; // IPC Configs public static final String IPC_PREFIX = YARN_PREFIX + "ipc."; /** RPC class implementation*/ public static final String IPC_RPC_IMPL = IPC_PREFIX + "rpc.class"; public static final String DEFAULT_IPC_RPC_IMPL = "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC";}
而HadoopYarnProtoRPC 通过 RPC 的 RpcFactoryProvider 生成客户端工厂(由参数 yarn.ipc.client.factory.class 指定,默认值是 org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl)和服务器工厂 (由参数 yarn.ipc.server.factory.class 指定,默认值是 org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl),以根据通信协议的 Protocol Buffers 定义生成客户端对象和服务器对象。相关类的部分代码如下:
public class HadoopYarnProtoRPC extends YarnRPC { private static final Log LOG = LogFactory.getLog(HadoopYarnProtoRPC.class); @Override public Object getProxy(Class protocol, InetSocketAddress addr, Configuration conf) { LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol); return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, 1, addr, conf); } @Override public void stopProxy(Object proxy, Configuration conf) { RpcFactoryProvider.getClientFactory(conf).stopClient(proxy); } @Override public Server getServer(Class protocol, Object instance, InetSocketAddress addr, Configuration conf, SecretManager secretManager, int numHandlers, String portRangeConfig) { LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol + " with " + numHandlers + " handlers"); return RpcFactoryProvider.getServerFactory(conf).getServer(protocol, instance, addr, conf, secretManager, numHandlers, portRangeConfig); }}
RpcFactoryProvider
public class RpcFactoryProvider { // ...... public static RpcClientFactory getClientFactory(Configuration conf) { String clientFactoryClassName = conf.get( YarnConfiguration.IPC_CLIENT_FACTORY_CLASS, YarnConfiguration.DEFAULT_IPC_CLIENT_FACTORY_CLASS); return (RpcClientFactory) getFactoryClassInstance(clientFactoryClassName); } //...... }
/** Factory to create client IPC classes.*/ public static final String IPC_CLIENT_FACTORY_CLASS = IPC_PREFIX + "client.factory.class"; public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS = "org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl";
在 YARN 中并未使用Hadoop自带的Writable来做序列化,而是使用 Protocol Buffers 作为默认的序列化机制,这带来的好处主要有以下几点:
YARN 的工作流程是先定义通信协议接口ResourceTracker,它包含2个函数,具体代码如下所示:
public interface ResourceTracker { @Idempotent public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException; @AtMostOnce public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException;}
这里ResourceTracker提供了Protocol Buffers定义和Java实现,其中设计的Protocol Buffers文件有:ResourceTracker.proto、yarn_server_common_service_protos.proto和yarn_server_common_protos.proto,文件路径在Hadoop的源码包的 hadoop-2.6.0-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto,这里就不贴出3个文件的具体代码类,大家可以到该目录去阅读这部分代码。这里需要注意的是,若是大家要编译这些文件需要安装 ProtoBuf 的编译环境,环境安装较为简单,这里给大家简要说明下。
首先是下载ProtoBuf的安装包,然后解压,进入到解压目录,编译安装。命令如下:
./configure --prefix=/home/work /protobuf/ make && make install
最后编译 .proto 文件的命令:
protoc ./ResourceTracker.proto --java_out=./
下面,我们去收取Hadoop源码到本地工程,运行调试相关代码。
TestYarnServerApiClasses:
public class TestYarnServerApiClasses { // ...... // 列举测试4个方法 @Test public void testRegisterNodeManagerResponsePBImpl() { RegisterNodeManagerResponsePBImpl original = new RegisterNodeManagerResponsePBImpl(); original.setContainerTokenMasterKey(getMasterKey()); original.setNMTokenMasterKey(getMasterKey()); original.setNodeAction(NodeAction.NORMAL); original.setDiagnosticsMessage("testDiagnosticMessage"); RegisterNodeManagerResponsePBImpl copy = new RegisterNodeManagerResponsePBImpl( original.getProto()); assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals(NodeAction.NORMAL, copy.getNodeAction()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); }@Test public void testNodeHeartbeatRequestPBImpl() { NodeHeartbeatRequestPBImpl original = new NodeHeartbeatRequestPBImpl(); original.setLastKnownContainerTokenMasterKey(getMasterKey()); original.setLastKnownNMTokenMasterKey(getMasterKey()); original.setNodeStatus(getNodeStatus()); NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( original.getProto()); assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId()); assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost()); }@Test public void testNodeHeartbeatResponsePBImpl() { NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); original.setDiagnosticsMessage("testDiagnosticMessage"); original.setContainerTokenMasterKey(getMasterKey()); original.setNMTokenMasterKey(getMasterKey()); original.setNextHeartBeatInterval(1000); original.setNodeAction(NodeAction.NORMAL); original.setResponseId(100); NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( original.getProto()); assertEquals(100, copy.getResponseId()); assertEquals(NodeAction.NORMAL, copy.getNodeAction()); assertEquals(1000, copy.getNextHeartBeatInterval()); assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); }@Test public void testRegisterNodeManagerRequestPBImpl() { RegisterNodeManagerRequestPBImpl original = new RegisterNodeManagerRequestPBImpl(); original.setHttpPort(8080); original.setNodeId(getNodeId()); Resource resource = recordFactory.newRecordInstance(Resource.class); resource.setMemory(10000); resource.setVirtualCores(2); original.setResource(resource); RegisterNodeManagerRequestPBImpl copy = new RegisterNodeManagerRequestPBImpl( original.getProto()); assertEquals(8080, copy.getHttpPort()); assertEquals(9090, copy.getNodeId().getPort()); assertEquals(10000, copy.getResource().getMemory()); assertEquals(2, copy.getResource().getVirtualCores()); }}
TestResourceTrackerPBClientImpl:
public class TestResourceTrackerPBClientImpl { private static ResourceTracker client; private static Server server; private final static org.apache.hadoop.yarn.factories.RecordFactory recordFactory = RecordFactoryProvider .getRecordFactory(null); @BeforeClass public static void start() { System.out.println("Start client test"); InetSocketAddress address = new InetSocketAddress(0); Configuration configuration = new Configuration(); ResourceTracker instance = new ResourceTrackerTestImpl(); server = RpcServerFactoryPBImpl.get().getServer(ResourceTracker.class, instance, address, configuration, null, 1); server.start(); client = (ResourceTracker) RpcClientFactoryPBImpl.get().getClient(ResourceTracker.class, 1, NetUtils.getConnectAddress(server), configuration); } @AfterClass public static void stop() { System.out.println("Stop client"); if (server != null) { server.stop(); } } /** * Test the method registerNodeManager. Method should return a not null * result. * */ @Test public void testResourceTrackerPBClientImpl() throws Exception { RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); assertNotNull(client.registerNodeManager(request)); ResourceTrackerTestImpl.exception = true; try { client.registerNodeManager(request); fail("there should be YarnException"); } catch (YarnException e) { assertTrue(e.getMessage().startsWith("testMessage")); } finally { ResourceTrackerTestImpl.exception = false; } } /** * Test the method nodeHeartbeat. Method should return a not null result. * */ @Test public void testNodeHeartbeat() throws Exception { NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class); assertNotNull(client.nodeHeartbeat(request)); ResourceTrackerTestImpl.exception = true; try { client.nodeHeartbeat(request); fail("there should be YarnException"); } catch (YarnException e) { assertTrue(e.getMessage().startsWith("testMessage")); } finally { ResourceTrackerTestImpl.exception = false; } } public static class ResourceTrackerTestImpl implements ResourceTracker { public static boolean exception = false; public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnException, IOException { if (exception) { throw new YarnException("testMessage"); } return recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); } public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { if (exception) { throw new YarnException("testMessage"); } return recordFactory.newRecordInstance(NodeHeartbeatResponse.class); } }}
接下来,我们使用JUnit去测试代码,截图预览如下所示:
testResourceTrackerPBClientImpl()方法的DEBUG调试
这里由于设置exception的状态为true,在调用registerNodeManager()时,会打印一条测试异常信息。
if (exception) { throw new YarnException("testMessage");}
在学习Hadoop YARN的RPC时,可以先了解Hadoop的RPC机制,这样在接触YARN的RPC的会比较好理解,YARN的RPC只是其中的一部分,后续会给大家分享更多关于YARN的内容。
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!