Hadoop-MapReduce-YarnChild启动篇

 一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、上下文

在上一篇<Hadoop-MapReduce-MRAppMaster启动篇>中已经将到:MRAppMaster的启动,那么运行MapTask、ReduceTask的容器(YarnChild)是怎么启动的呢?接下来我们一起来看看

三、结论

MRJobConfig是一个MRJob的配置,里面包含了Map、Reduce、Combine类以及Job名称、用户名称、队列名称、MapTask数量、ReduceTask数量、工作目录,jar在本地的路径、任务超时时间、任务id、输入输出目录,每个任务的内存大小和cpu核数等等。

此外它里面还有一个属性,如下:

package org.apache.hadoop.mapreduce;
public interface MRJobConfig {//......省略......public static final String APPLICATION_MASTER_CLASS ="org.apache.hadoop.mapreduce.v2.app.MRAppMaster";public static final String MAPREDUCE_V2_CHILD_CLASS = "org.apache.hadoop.mapred.YarnChild";//......省略......
}

MRAppMaster是MapReduce的ApplicationMaster实现,负责整个MapReduce作业的过程调度和状态协调

YarnChid是运行在每个容器中的进程,负责运行某一个MapTask或者ReduceTask,

有兴趣的同学可以看一个任务的Yarn日志,也可以看我的<Hadoop-MapReduce-跟着日志理解整体流程>一篇中的日志,就可以发现ApplicationMaster容器和MapTask、ReduceTask所在容器的的日志开头分别就是MRAppMaster和YarnChid

MRAppMaster的启动参数是在YARNRunner中配置的:

public class YARNRunner implements ClientProtocol {private List<String> setupAMCommand(Configuration jobConf) {List<String> vargs = new ArrayList<>(8);vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)+ "/bin/java");//......省略......vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);//......省略......return vargs;}
}

YarnChid的启动参数是在MapReduceChildJVM中配置的:

public class MapReduceChildJVM {public static List<String> getVMCommand(InetSocketAddress taskAttemptListenerAddr, Task task, JVMId jvmID) {TaskAttemptID attemptID = task.getTaskID();JobConf conf = task.conf;Vector<String> vargs = new Vector<String>(8);vargs.add(MRApps.crossPlatformifyMREnv(task.conf, Environment.JAVA_HOME)+ "/bin/java");//......省略......vargs.add(YarnChild.class.getName());  // main of Child//......省略......return vargsFinal;}
}

YarnChid启动后会启动MapTask或者ReduceTask

四、调用细节(源码跟读)

1、MRAppMaster

MRAppMaster是Map Reduce应用程序母版。状态机被封装在Job接口的实现中。所有状态更改都通过作业界面进行。每个事件都会导致作业中的有限状态转换。

MR AppMaster是松散耦合服务的组合。服务之间通过事件进行交互。这些组件类似于Actors模型。该组件对接收到的事件进行操作,并将事件发送到其他组件。

这使它保持高度并发性,而不需要同步或只需要最少的同步。

事件由中央调度机制进行调度。所有组件都注册到Dispatcher。

使用AppContext在不同组件之间共享信息。

我们先从MRAppMaster的main方法开始捋

public static void main(String[] args) {try {mainStarted = true;//设置默认异常处理Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());//获取容器相关信息:容器id、容器所在的NodeManager信息、应用提交时间String containerIdStr =System.getenv(Environment.CONTAINER_ID.name());String nodeHostString = System.getenv(Environment.NM_HOST.name());String nodePortString = System.getenv(Environment.NM_PORT.name());String nodeHttpPortString =System.getenv(Environment.NM_HTTP_PORT.name());String appSubmitTimeStr =System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);//校验容器相关信息validateInputParam(containerIdStr,Environment.CONTAINER_ID.name());validateInputParam(nodeHostString, Environment.NM_HOST.name());validateInputParam(nodePortString, Environment.NM_PORT.name());validateInputParam(nodeHttpPortString,Environment.NM_HTTP_PORT.name());validateInputParam(appSubmitTimeStr,ApplicationConstants.APP_SUBMIT_TIME_ENV);ContainerId containerId = ContainerId.fromString(containerIdStr);//根据containerId 获取ApplicationAttemptId //ContainerId:表示集群中容器的全局唯一标识符//ApplicationAttemptId:表示ApplicationMaster对给定ApplicationId的特定尝试,由于ApplicationMaster的临时故障,如硬件故障、连接问题等,在计划应用程序的节点上,可能需要多次尝试才能运行应用程序。ApplicationAttemptId applicationAttemptId =containerId.getApplicationAttemptId();if (applicationAttemptId != null) {CallerContext.setCurrent(new CallerContext.Builder("mr_appmaster_" + applicationAttemptId.toString()).build());}long appSubmitTime = Long.parseLong(appSubmitTimeStr);//构建MRAppMasterMRAppMaster appMaster =new MRAppMaster(applicationAttemptId, containerId, nodeHostString,Integer.parseInt(nodePortString),Integer.parseInt(nodeHttpPortString), appSubmitTime);//在JVM正常关闭期间接收到AND信号时运行的关闭挂钩。ShutdownHookManager.get().addShutdownHook(new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);//构建 map/reduce 作业配置JobConf conf = new JobConf(new YarnConfiguration());//添加配置资源(启动MRAppMaster 时配置过)conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));MRWebAppUtil.initialize(conf);//记录系统属性String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);if (systemPropsToLog != null) {LOG.info(systemPropsToLog);}String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());conf.set(MRJobConfig.USER_NAME, jobUserName);//初始化并启动该作业的AppMasterinitAndStartAppMaster(appMaster, conf, jobUserName);} catch (Throwable t) {LOG.error("Error starting MRAppMaster", t);ExitUtil.terminate(1, t);}}

下面我们接着看initAndStartAppMaster()

protected static void initAndStartAppMaster(final MRAppMaster appMaster,final JobConf conf, String jobUserName) throws IOException,InterruptedException {//设置UGI的静态配置UserGroupInformation.setConfiguration(conf);// MAPREDUCE-6565: 需要设置SecurityUtil的配置。SecurityUtil.setConfiguration(conf);//安全框架已经将令牌加载到当前的UGI中,只需使用它们Credentials credentials =UserGroupInformation.getCurrentUser().getCredentials();LOG.info("Executing with tokens: {}", credentials.getAllTokens());//使用登录名创建用户。它旨在用于RPC中的远程用户,因为它没有任何凭据。UserGroupInformation appMasterUgi = UserGroupInformation.createRemoteUser(jobUserName);//将给定的凭据添加到此用户。appMasterUgi.addCredentials(credentials);//现在删除AM->RM令牌,这样任务就没有它了Iterator<Token<?>> iter = credentials.getAllTokens().iterator();while (iter.hasNext()) {Token<?> token = iter.next();if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {iter.remove();}}//将所有凭据从一个凭据对象复制到另一个。现有的机密和令牌将被覆盖。conf.getCredentials().addAll(credentials);appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {@Overridepublic Object run() throws Exception {//服务所需的所有初始化代码。//在特定服务实例的生命周期中,此方法只会被调用一次。//有兴趣的同学可以进去看看,我这里大致总结下它做了什么://    1、创建作业类加载器//    2、获得作业所需的令牌,并将其放入UGI//    3、创建事件调度程序接口。它根据事件类型将事件分派给已注册的事件处理程序。//    4、将事件调度程序添加到管理的服务列表//    5、创建尝试任务完成监控(如果任务尝试在FINISHING状态下停留的时间过长,则此类会生成TA_TIMED_OUT。)//    6、将尝试任务完成监控添加到管理的服务列表//    7、根据尝试任务创建心的jobid//    8、判断是用新API还是旧API//    9、获取该作业输出格式的输出提交器。它负责确保正确提交输出。//    10、检查该作业在HDFS上的临时目录是否存在//    11、构建用于处理来自JobClient的请求的服务//    12、创建用于处理输出提交的服务并添加到管理的服务列表//    13、处理来自RM的抢占请求//    14、创建用于处理对TaskUmplicalProtocol的请求的服务并添加到管理的服务列表//    15、创建用于记录作业历史事件的服务,并注册到事件调度程序中//    16、创建该作业的事件调度服务并注册到事件调度程序中//    17、创建投机者组件事件调度服务并注册到事件调度程序中(任务尝试的状态更新将发送到此组件)并注册到事件调度程序//    18、启动临时目录清理程序//    19、构建从ResourceManager分配容器的服务(如果是uber模式,则是伪造容器)并注册到事件调度程序//    20、构建通过NodeManager启动分配容器的相应服务并注册到事件调度程序//    21、最后添加JobHistoryEventHandler并添加到管理的服务列表appMaster.init(conf);//启动该作业的AppMasterappMaster.start();if(appMaster.errorHappenedShutDown) {throw new IOException("Was asked to shut down.");}return null;}});}

下面我们接着看appMaster.start(),它最终还是会调用本类的serviceStart()

protected void serviceStart() throws Exception {amInfos = new LinkedList<AMInfo>();//因为作业有失败重试功能,假如这一次是重试作业旧需要覆盖上一次的Task并清理上一次的临时目录和输出completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();processRecovery();cleanUpPreviousJobOutput();//当前AM生成的当前AMInfo(里面有AppAttemptId、开始时间、所在容器id、所在NodeManager域名和端口)AMInfo amInfo =MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,nmPort, nmHttpPort);//创建并初始化(但不启动)单例job。//并将job完成事件和处理程序注册到事件调度程序中job = createJob(getConfig(), forcedState, shutDownMessage);//为所有以前的AM发送一个MR AM启动的事件。for (AMInfo info : amInfos) {dispatcher.getEventHandler().handle(new JobHistoryEvent(job.getID(), new AMStartedEvent(info.getAppAttemptId(), info.getStartTime(), info.getContainerId(),info.getNodeManagerHost(), info.getNodeManagerPort(), info.getNodeManagerHttpPort(), appSubmitTime)));}//为此AM发送一个MR AM启动的事件。dispatcher.getEventHandler().handle(new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo.getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo.getNodeManagerHttpPort(), this.forcedState == null ? null: this.forcedState.toString(), appSubmitTime)));amInfos.add(amInfo);//metrics system(度量系统)初始化并启动DefaultMetricsSystem.initialize("MRAppMaster");boolean initFailed = false;if (!errorHappenedShutDown) {// create a job event for job initializationJobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);// Send init to the job (this does NOT trigger job execution)// This is a synchronous call, not an event through dispatcher. We want// job-init to be done completely here.jobEventDispatcher.handle(initJobEvent);// If job is still not initialized, an error happened during// initialization. Must complete starting all of the services so failure// events can be processed.initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);// JobImpl's InitTransition is done (call above is synchronous), so the// "uber-decision" (MR-1220) has been made.  Query job and switch to// ubermode if appropriate (by registering different container-allocator// and container-launcher services/event-handlers).if (job.isUber()) {speculatorEventDispatcher.disableSpeculation();LOG.info("MRAppMaster uberizing job " + job.getID()+ " in local container (\"uber-AM\") on node "+ nmHost + ":" + nmPort + ".");} else {// send init to speculator only for non-uber jobs. // This won't yet start as dispatcher isn't started yet.dispatcher.getEventHandler().handle(new SpeculatorEvent(job.getID(), clock.getTime()));LOG.info("MRAppMaster launching normal, non-uberized, multi-container "+ "job " + job.getID() + ".");}// Start ClientService here, since it's not initialized if// errorHappenedShutDown is trueclientService.start();}//启动所有组件super.serviceStart();//最终设置作业类加载器MRApps.setClassLoader(jobClassLoader, getConfig());if (initFailed) {JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);jobEventDispatcher.handle(initFailedEvent);} else {//所有组件都已启动后,启动作业startJobs();}}

下面我们接着看startJobs(),可以覆盖此项以实例化多个作业并创建工作流

protected void startJobs() {/** 创建一个作业启动事件(JobEventType.JOB_START)来启动 */JobEvent startJobEvent = new JobStartEvent(job.getID(),recoveredJobStartTime);/** 发送作业启动事件。这将触发作业执行 */dispatcher.getEventHandler().handle(startJobEvent);}

下面我们看下JobEventType.JOB_START事件的处理,作业启动事件和处理程序在JobImpl中。

2、JobImpl

JobImpl是作业界面的实施。维护作业的状态机。读和写调用使用ReadWriteLock实现并发。

关于作业的状态有NEW、INITED、SETUP、RUNNING、KILL_WAIT、COMMITTING、SUCCEEDED、FAIL_WAIT、FAIL_ABORT、KILL_ABORT、FAILED、KILLED、INTERNAL_ERROR、AM_REBOOT等,有兴趣的同学可以跟读下每个状态的转换细节,这里不一一跟读了,

protected static finalStateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> stateMachineFactory= new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>(JobStateInternal.NEW)// Transitions from NEW state// Transitions from INITED state.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,JobEventType.JOB_START,new StartTransition())// Transitions from SETUP state.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,JobEventType.JOB_SETUP_COMPLETED,new SetupCompletedTransition())// Transitions from RUNNING state// Transitions from KILL_WAIT state.// Transitions from COMMITTING state// Transitions from SUCCEEDED state// Transitions from FAIL_WAIT state//Transitions from FAIL_ABORT state// Transitions from KILL_ABORT state// Transitions from FAILED state// Transitions from KILLED state// No transitions from INTERNAL_ERROR state. Ignore all.// No transitions from AM_REBOOT state. Ignore all.// create the topology tables.installTopology();

以下是JobEventType.JOB_START事件的处理程序

public static class StartTransitionimplements SingleArcTransition<JobImpl, JobEvent> {/*** 这个转换在事件调度器线程中执行,尽管它是在MRAppMaster的startJobs()方法中触发的。*/@Overridepublic void transition(JobImpl job, JobEvent event) {JobStartEvent jse = (JobStartEvent) event;if (jse.getRecoveredJobStartTime() != -1L) {job.startTime = jse.getRecoveredJobStartTime();} else {job.startTime = job.clock.getTime();}JobInitedEvent jie =new JobInitedEvent(job.oldJobId,job.startTime,job.numMapTasks, job.numReduceTasks,job.getState().toString(),job.isUber());job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,job.appSubmitTime, job.startTime);job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));job.metrics.runningJob(job);//CommitterEventType.JOB_SETUP事件处理job.eventHandler.handle(new CommitterJobSetupEvent(job.jobId, job.jobContext));}}

JOB_SETUP事件是由CommitterEventHandler处理

3、CommitterEventHandler

CommitterEventHandler负责处理JOB_SETUP、JOB_COMMIT、JOB_ABORT、TASK_ABORT事件

public class CommitterEventHandler extends AbstractServiceimplements EventHandler<CommitterEvent> {public void run() {LOG.info("Processing the event " + event.toString());switch (event.getType()) {case JOB_SETUP:handleJobSetup((CommitterJobSetupEvent) event);break;case JOB_COMMIT:handleJobCommit((CommitterJobCommitEvent) event);break;case JOB_ABORT:handleJobAbort((CommitterJobAbortEvent) event);break;case TASK_ABORT:handleTaskAbort((CommitterTaskAbortEvent) event);break;default:throw new YarnRuntimeException("Unexpected committer event "+ event.toString());}}//处理JOB_SETUP事件protected void handleJobSetup(CommitterJobSetupEvent event) {try {committer.setupJob(event.getJobContext());//现在job的状态为JobEventType.JOB_SETUP_COMPLETEDcontext.getEventHandler().handle(new JobSetupCompletedEvent(event.getJobID()));} catch (Exception e) {LOG.warn("Job setup failed", e);context.getEventHandler().handle(new JobSetupFailedEvent(event.getJobID(), StringUtils.stringifyException(e)));}}}

4、再回JobImpl

JobEventType.JOB_SETUP_COMPLETED的处理程序为SetupCompletedTransition(),在第2步中有。

private static class SetupCompletedTransitionimplements SingleArcTransition<JobImpl, JobEvent> {@Overridepublic void transition(JobImpl job, JobEvent event) {job.setupProgress = 1.0f;//调度MapTaskjob.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);//调度ReduceTaskjob.scheduleTasks(job.reduceTasks, true);//如果没有任务,只需过渡到已完成的工作状态if (job.numReduceTasks == 0 && job.numMapTasks == 0) {job.eventHandler.handle(new JobEvent(job.jobId,JobEventType.JOB_COMPLETED));}}}protected void scheduleTasks(Set<TaskId> taskIDs,boolean recoverTaskOutput) {for (TaskId taskID : taskIDs) {TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);if (taskInfo != null) {//如果是重试任务需要覆盖eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,committer, recoverTaskOutput));} else {//新任务,需要做任务调度处理,我们看这块的逻辑eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));}}}

5、TaskImpl

TaskImpl是任务接口的实现,维护任务的状态机。任务的状态有NEW、SCHEDULED、RUNNING、KILL_WAIT、SUCCEEDED、FAILED

private static final StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent> stateMachineFactory = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>(TaskStateInternal.NEW)// 定义Task的状态机// Transitions from NEW state.addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED, TaskEventType.T_SCHEDULE, new InitialScheduleTransition())// Transitions from SCHEDULED state//启动第一次尝试时,任务状态设置为RUNNING// Transitions from RUNNING state// Transitions from KILL_WAIT state// Transitions from SUCCEEDED state// Transitions from FAILED state        // create the topology tables.installTopology();

可以看到处理事件调用的是InitialScheduleTransition()

private static class InitialScheduleTransitionimplements SingleArcTransition<TaskImpl, TaskEvent> {@Overridepublic void transition(TaskImpl task, TaskEvent event) {task.addAndScheduleAttempt(Avataar.VIRGIN);task.scheduledTime = task.clock.getTime();task.sendTaskStartedEvent();}
}private void addAndScheduleAttempt(Avataar avataar, boolean reschedule) {TaskAttempt attempt = addAttempt(avataar);inProgressAttempts.add(attempt.getID());//schedule the nextAttemptNumberif (failedAttempts.size() > 0 || reschedule) {eventHandler.handle(new TaskAttemptEvent(attempt.getID(),TaskAttemptEventType.TA_RESCHEDULE));} else {//将任务状态变成TaskAttemptEventType.TA_SCHEDULE)eventHandler.handle(new TaskAttemptEvent(attempt.getID(),TaskAttemptEventType.TA_SCHEDULE));}}private void sendTaskStartedEvent() {launchTime = getLaunchTime();//创建事件以记录任务的开始TaskStartedEvent tse = new TaskStartedEvent(TypeConverter.fromYarn(taskId), launchTime,TypeConverter.fromYarn(taskId.getTaskType()),getSplitsAsString());eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tse));historyTaskStartGenerated = true;}public static org.apache.hadoop.mapreduce.TaskType fromYarn(TaskType taskType) {switch (taskType) {case MAP:return org.apache.hadoop.mapreduce.TaskType.MAP;case REDUCE:return org.apache.hadoop.mapreduce.TaskType.REDUCE;default:throw new YarnRuntimeException("Unrecognized task type: " + taskType);}}

6、TaskAttemptImpl

TaskAttemptImpl是尝试任务的实现,因为有失败重试机制,因此每一次在容器中运行的任务都先称为尝试任务,当尝试任务运行成功后,对应的任务也会标记为成功。

TaskAttemptImpl维护尝试任务的状态机。任务的状态有NEW、UNASSIGNED、ASSIGNED、RUNNING、SUCCESS_FINISHING_CONTAINER、FAIL_FINISHING_CONTAINER、COMMIT_PENDING、SUCCESS_CONTAINER_CLEANUP、FAIL_CONTAINER_CLEANUP、KILL_CONTAINER_CLEANUP、FAIL_TASK_CLEANUP、KILL_TASK_CLEANUP、SUCCEEDED、FAILED、KILLED

private static final StateMachineFactory<TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>stateMachineFactory= new StateMachineFactory<TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>(TaskAttemptStateInternal.NEW)// Transitions from the NEW state..addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))// Transitions from the UNASSIGNED state..addTransition(TaskAttemptStateInternal.UNASSIGNED,TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,new ContainerAssignedTransition())// Transitions from the ASSIGNED state.// Transitions from RUNNING state.// Transitions from SUCCESS_FINISHING_CONTAINER state// Transitions from COMMIT_PENDING state// Transitions from SUCCESS_CONTAINER_CLEANUP state// kill and cleanup the container// Transitions from FAIL_CONTAINER_CLEANUP state.// Transitions from KILL_CONTAINER_CLEANUP// Transitions from FAIL_TASK_CLEANUP// run the task cleanup// Transitions from KILL_TASK_CLEANUP// Transitions from SUCCEEDED// Transitions from FAILED state// Transitions from KILLED state// create the topology tables.installTopology();

可以看到处理TaskAttemptEventType.TA_SCHEDULE事件的逻辑是RequestContainerTransition()

 static class RequestContainerTransition implementsSingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {private final boolean rescheduled;public RequestContainerTransition(boolean rescheduled) {this.rescheduled = rescheduled;}@SuppressWarnings("unchecked")@Overridepublic void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {//告诉任何投机者我们正在请求一个容器taskAttempt.eventHandler.handle(new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));//申请容器if (rescheduled) {taskAttempt.eventHandler.handle(ContainerRequestEvent.createContainerRequestEventForFailedContainer(taskAttempt.attemptId, taskAttempt.resourceCapability));} else {//处理ContainerAllocator.EventType.CONTAINER_REQ事件taskAttempt.eventHandler.handle(new ContainerRequestEvent(taskAttempt.attemptId, taskAttempt.resourceCapability,taskAttempt.dataLocalHosts.toArray(new String[taskAttempt.dataLocalHosts.size()]),taskAttempt.dataLocalRacks.toArray(new String[taskAttempt.dataLocalRacks.size()])));}}}

7、LocalContainerAllocator

LocalContainerAllocator负责在本地分配容器。不分配真正的容器;而是为所有请求发送一个已分配的事件。也处理ContainerAllocator.EventType.CONTAINER_REQ事件

public void handle(ContainerAllocatorEvent event) {if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {LOG.info("Processing the event " + event.toString());//分配与AM相同的容器IDContainerId cID =ContainerId.newContainerId(getContext().getApplicationAttemptId(),this.containerId.getContainerId());Container container = recordFactory.newRecordInstance(Container.class);container.setId(cID);NodeId nodeId = NodeId.newInstance(this.nmHost, this.nmPort);container.setResource(Resource.newInstance(0, 0));container.setNodeId(nodeId);container.setContainerToken(null);container.setNodeHttpAddress(this.nmHost + ":" + this.nmHttpPort);//将容器分配的事件发送到任务尝试if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {JobCounterUpdateEvent jce =new JobCounterUpdateEvent(event.getAttemptID().getTaskId().getJobId());// TODO Setting OTHER_LOCAL_MAP for now.jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);eventHandler.handle(jce);}//此时处理TaskAttemptEventType.TA_ASSIGNED事件eventHandler.handle(new TaskAttemptContainerAssignedEvent(event.getAttemptID(), container, applicationACLs));}}

8、再回TaskAttemptImpl

第6步已经写了TaskAttemptEventType.TA_ASSIGNED事件的处理逻辑:new ContainerAssignedTransition()

private static class ContainerAssignedTransition implementsSingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {@SuppressWarnings({ "unchecked" })@Overridepublic void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {final TaskAttemptContainerAssignedEvent cEvent = (TaskAttemptContainerAssignedEvent) event;Container container = cEvent.getContainer();taskAttempt.container = container;//这是真正的TasktaskAttempt.remoteTask = taskAttempt.createRemoteTask();taskAttempt.jvmID =new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(),taskAttempt.remoteTask.isMapTask(),taskAttempt.container.getId().getContainerId());taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.remoteTask, taskAttempt.jvmID);taskAttempt.computeRackAndLocality();//启动容器//为给定的Task尝试创建要启动的容器对象ContainerLaunchContext launchContext = createContainerLaunchContext(cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,taskAttempt.taskAttemptListener, taskAttempt.credentials);taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,launchContext, container, taskAttempt.remoteTask));// 向投机者发送我们的容器需求得到满足的事件taskAttempt.eventHandler.handle(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));}}static ContainerLaunchContext createContainerLaunchContext(Map<ApplicationAccessType, String> applicationACLs,Configuration conf, Token<JobTokenIdentifier> jobToken, Task remoteTask,final org.apache.hadoop.mapred.JobID oldJobId,WrappedJvmID jvmID,TaskAttemptListener taskAttemptListener,Credentials credentials) {synchronized (commonContainerSpecLock) {if (commonContainerSpec == null) {commonContainerSpec = createCommonContainerLaunchContext(applicationACLs, conf, jobToken, oldJobId, credentials);}}//填写通用规范中缺少的每个容器所需的字段boolean userClassesTakesPrecedence =conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);//通过从公共环境克隆来设置环境。Map<String, String> env = commonContainerSpec.getEnvironment();Map<String, String> myEnv = new HashMap<String, String>(env.size());myEnv.putAll(env);if (userClassesTakesPrecedence) {myEnv.put(Environment.CLASSPATH_PREPEND_DISTCACHE.name(), "true");}MapReduceChildJVM.setVMEnv(myEnv, remoteTask);//设置启动命令 这里会调用MapReduceChildJVM的getVMCommand()List<String> commands = MapReduceChildJVM.getVMCommand(taskAttemptListener.getAddress(), remoteTask, jvmID);//复制ByteBuffers以供多个容器访问。Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();for (Entry<String, ByteBuffer> entry : commonContainerSpec.getServiceData().entrySet()) {myServiceData.put(entry.getKey(), entry.getValue().duplicate());}//构建实际的容器ContainerLaunchContext container = ContainerLaunchContext.newInstance(commonContainerSpec.getLocalResources(), myEnv, commands,myServiceData, commonContainerSpec.getTokens().duplicate(),applicationACLs);return container;}

9、MapReduceChildJVM

这里就会加载YarnChild,启动运行MapTask、ReduceTask的容器

public static List<String> getVMCommand(InetSocketAddress taskAttemptListenerAddr, Task task, JVMId jvmID) {TaskAttemptID attemptID = task.getTaskID();JobConf conf = task.conf;Vector<String> vargs = new Vector<String>(8);vargs.add(MRApps.crossPlatformifyMREnv(task.conf, Environment.JAVA_HOME)+ "/bin/java");// Add child (task) java-vm options.//// The following symbols if present in mapred.{map|reduce}.child.java.opts // value are replaced:// + @taskid@ is interpolated with value of TaskID.// Other occurrences of @ will not be altered.//// Example with multiple arguments and substitutions, showing// jvm GC logging, and start of a passwordless JVM JMX agent so can// connect with jconsole and the likes to watch child memory, threads// and get thread dumps.////  <property>//    <name>mapred.map.child.java.opts</name>//    <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@taskid@.gc \//           -Dcom.sun.management.jmxremote.authenticate=false \//           -Dcom.sun.management.jmxremote.ssl=false \//    </value>//  </property>////  <property>//    <name>mapred.reduce.child.java.opts</name>//    <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@taskid@.gc \//           -Dcom.sun.management.jmxremote.authenticate=false \//           -Dcom.sun.management.jmxremote.ssl=false \//    </value>//  </property>//String javaOpts = getChildJavaOpts(conf, task.isMapTask());javaOpts = javaOpts.replace("@taskid@", attemptID.toString());String [] javaOptsSplit = javaOpts.split(" ");for (int i = 0; i < javaOptsSplit.length; i++) {vargs.add(javaOptsSplit[i]);}Path childTmpDir = new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);vargs.add("-Djava.io.tmpdir=" + childTmpDir);MRApps.addLog4jSystemProperties(task, vargs, conf);if (conf.getProfileEnabled()) {if (conf.getProfileTaskRange(task.isMapTask()).isIncluded(task.getPartition())) {final String profileParams = conf.get(task.isMapTask()? MRJobConfig.TASK_MAP_PROFILE_PARAMS: MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, conf.getProfileParams());vargs.add(String.format(profileParams,getTaskLogFile(TaskLog.LogName.PROFILE)));}}// Add main class and its arguments vargs.add(YarnChild.class.getName());  // main of Child// pass TaskAttemptListener's addressvargs.add(taskAttemptListenerAddr.getAddress().getHostAddress()); vargs.add(Integer.toString(taskAttemptListenerAddr.getPort())); vargs.add(attemptID.toString());                      // pass task identifier// Finally add the jvmIDvargs.add(String.valueOf(jvmID.getId()));vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDOUT));vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDERR));// Final commmandStringBuilder mergedCommand = new StringBuilder();for (CharSequence str : vargs) {mergedCommand.append(str).append(" ");}Vector<String> vargsFinal = new Vector<String>(1);vargsFinal.add(mergedCommand.toString());return vargsFinal;}

五、总结

1、MRAppMaster启动

2、初始化并启动job

3、处理各种job状态

4、启动Task

5、处理各种Task事件

6、启动尝试任务

7、处理各种尝试任务事件

8、在尝试任务的TaskAttemptEventType.TA_SCHEDULE事件处理时申请容器

9、调用java命令配置主类YarnChild启动容器运行MapTask或者ReduceTask

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/439264.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

burp靶场--xss上篇【1-15】

burp靶场–xss https://portswigger.net/web-security/cross-site-scripting 1. 什么是xss: 跨站脚本 (XSS) 是一种通常出现在 Web 应用程序中的计算机安全漏洞。XSS 允许攻击者将恶意代码注入网站&#xff0c;然后在访问该网站的任何人的浏览器中执行该代码。这可能允许攻击…

【UE 材质】闪电材质

效果 步骤 1. 新建一个材质这里命名为“M_Lighting” 打开“M_Lighting”&#xff0c;设置混合模式为半透明&#xff0c;着色模型为无光照 在材质图表中添加如下节点 其中&#xff0c;纹理采样节点的纹理是一个线条 此时预览窗口中效果如文章开头所示。

VS编写Qt项目:vs2017运行vs2019项目时的一些问题

目录 一、无法找到 v142 的生成工具(平台工具集 “v142”) ​编辑 二、第一个问题解决后的后续问题 一、无法找到 v142 的生成工具(平台工具集 “v142”) 第一步&#xff1a;选中项目->右键点击属性->常规->平台工具集&#xff1a;选为v141 第二步&#xff1a;选中项…

【DRAM存储器十八】DDR3介绍

&#x1f449;个人主页&#xff1a;highman110 &#x1f449;作者简介&#xff1a;一名硬件工程师&#xff0c;持续学习&#xff0c;不断记录&#xff0c;保持思考&#xff0c;输出干货内容 参考资料&#xff1a;《镁光DDR3数据手册》 、《JESD79-3E》 最近忙于工作&#xff0…

vue3封装el-pagination分页组件

1、效果如图&#xff1a; 2、分页组件代码&#xff1a; <template><div class"paging"><el-config-provider :locale"zhCn"><el-paginationv-model:current-page"page.currentPage"v-model:page-size"page.pageSize…

JVM系列——垃圾收集器

对象存活判断 引用计数法 在对象中添加一个引用计数器&#xff0c;每当有一个地方引用它时&#xff0c;计数器值就加一&#xff1b;当引用失效时&#xff0c;计数器值就减一&#xff1b;任何时刻计数器为零的对象就是不可能再被使用的。 可达性分析算法 通过一系列称为“GC …

C语言系列-整数在内存中的存储大小端字节序

&#x1f308;个人主页: 会编程的果子君 ​&#x1f4ab;个人格言:“成为自己未来的主人~” 目录 整数在内存中的存储 大小端字节序和字节序判断 什么是大小端 为什么会有大小端 练习 整数在内存中的存储 在讲解操作符的时候&#xff0c;我们就讲过了下面的内容 整数的2…

Aigtek电压放大器选购技巧和方法有哪些

电压放大器设计是电子工程领域的重要一环&#xff0c;它在各种电子设备和通信系统中扮演着关键角色。在设计一个高性能的电压放大器时&#xff0c;需要考虑以下几个重要的设计要求。 增益要求&#xff1a;电压放大器的主要功能就是将输入的电压信号放大到所需的输出电压。因此&…

统一监控、统一运维,贵阳新世界学校上线智和信通运维方案

贵阳新世界学校是贵阳一中教育发展联盟校、教育部中国教师发展基金会校本建设项目全国重点实验学校、美国加州大学欧文分校在中国的第一所生源基地学校、中美“千校携手”项目学校、中美高中生交流基地学校、首批贵州省校本研修示范学校。截至2022年&#xff0c;学校建设以万兆…

格子表单GRID-FORM | 嵌套子表单与自定义脚本交互

格子表单/GRID-FORM已在Github 开源&#xff0c;如能帮到您麻烦给个星&#x1f91d; GRID-FORM 系列文章 基于 VUE3 可视化低代码表单设计器嵌套表单与自定义脚本交互 新版本功能 &#x1f389; 不觉间&#xff0c;GRID-FORM 已经开源一年&#xff08;2023年1月29日首次提交…

PeakCAN连接到WSL2 Debian

操作步骤 按照以下步骤进行操作&#xff1a; 在Windows下安装PeakCAN驱动并安装&#xff0c;地址是https://www.peak-system.com/PCAN-USB.199.0.html?&L1 在Windows下安装usbipd&#xff0c;地址是https://github.com/dorssel/usbipd-win/releases&#xff0c;最新版是…

sqli-labs-master less-1 详解

目录 关于MySQL的一些常识 information_schema 常用的函数 sqli-labs-master less-1 分析PHP源码 测试 关于MySQL的一些常识 information_schema information_schema 是 MySQL 数据库中的一个元数据&#xff08;metadata&#xff09;数据库&#xff0c;它包含…