`
universsky
  • 浏览: 92334 次
文章分类
社区版块
存档分类
最新评论

Map/Reduce hadoop 细节

 
阅读更多

Map/Reduce hadoop 细节

原文:http://www.cnblogs.com/duguguiyu/archive/2009/02/28/1400278.html

分布式计算(Map/Reduce,同样是一个宽泛的概念,在这里,它狭义的指代,按Google Map/Reduce框架所设计的分布式框架。在Hadoop中,分布式文件系统,很大程度上,是为各种分布式计算需求所服务的我们说分布式文件系统就是加了分布式的文件系统,类似的定义推广到分布式计算上,我们可以将其视为增加了分布式支持的计算函数从计算的角度上看,Map/Reduce框架接受各种格式的键值对文件作为输入,读取计算后,最终生成自定义格式的输出文件。而从分布式的角度上看,分布式计算的输入文件往往规模巨大,且分布在多个机器上,单机计算完全不可支撑且效率低下,因此Map/Reduce框架需要提供一套机制,将此计算扩展到无限规模的机器集群上进行。依照这样的定义,我们对整个Map/Reduce的理解,也可以分别沿着这两个流程去看。。。

Map/Reduce框架中,每一次计算请求,被称为作业。在分布式计算Map/Reduce框架中,为了完成这个作业,它进行两步走的战略,首先是将其拆分成若干个Map任务分配到不同的机器上去执行,每一个Map任务拿输入文件的一部分作为自己的输入,经过一些计算,生成某种格式的中间文件,这种格式,与最终所需的文件格式完全一致,但是仅仅包含一部分数据。因此,等到所有Map任务完成后,它会进入下一个步骤,用以合并这些中间文件获得最后的输出文件。此时,系统会生成若干个Reduce任务,同样也是分配到不同的机器去执行,它的目标,就是将若干个Map任务生成的中间文件为汇总到最后的输出文件中去。当然,这个汇总不总会像1 + 1 = 2那么直接了当,这也就是Reduce任务的价值所在。经过如上步骤,最终,作业完成,所需的目标文件生成。整个算法的关键,就在于增加了一个中间文件生成的流程,大大提高了灵活性,使其分布式扩展性得到了保证。。。

I. 术语对照

和分布式文件系统一样,GoogleHadoop....我,各执一种方式表述统一概念,为了保证其统一性,特有下表。。。

文中翻译

Hadoop术语

Google术语

相关解释

作业

Job

Job

用户的每一个计算请求,就称为一个作业。

作业服务器

JobTracker

Master

用户提交作业的服务器,同时,它还负责各个作业任务的分配,管理所有的任务服务器。

任务服务器

TaskTracker

Worker

任劳任怨的工蜂,负责执行具体的任务。

任务

Task

Task

每一个作业,都需要拆分开了,交由多个服务器来完成,拆分出来的执行单位,就称为任务。

备份任务

Speculative Task

Buckup Task

每一个任务,都有可能执行失败或者缓慢,为了降低为此付出的代价,系统会未雨绸缪的实现在另外的任务服务器上执行同样一个任务,这就是备份任务。

II. 基本架构

与分布式文件系统类似,Map/Reduce的集群,也由三类服务器构成。其中作业服务器,在Hadoop中称为Job Tracker,在Google论文中称为Master。前者告诉我们,作业服务器是负责管理运行在此框架下所有作业的,后者告诉我们,它也是为各个作业分配任务的核心。HDFS的主控服务器类似,它也是作为单点存在的,简化了负责的同步流程。具体的负责执行用户定义操作的,是任务服务器,每一个作业被拆分成很多的任务,包括Map任务Reduce任务等,任务是具体执行的基本单元,它们都需要分配到合适任务服务器上去执行,任务服务器一边执行一边向作业服务器汇报各个任务的状态,以此来帮助作业服务器了解作业执行的整体情况,分配新的任务等等。。。

除了作业的管理者执行者,还需要有一个任务的提交者,这就是客户端。与分布式文件系统一样,客户端也不是一个单独的进程,而是一组API,用户需要自定义好自己需要的内容,经由客户端相关的代码,将作业及其相关内容和配置,提交到作业服务器去,并时刻监控执行的状况。。。

同作为Hadoop的实现,与HDFS的通信机制相同,Hadoop Map/Reduce也是用了协议接口来进行服务器间的交流。实现者作为RPC服务器,调用者经由RPC的代理进行调用,如此,完成大部分的通信,具体服务器的架构,和其中运行的各个协议状况,参见下图。从图中可以看到,与HDFS相比,相关的协议少了几个,客户端与任务服务器,任务服务器之间,都不再有直接通信关系。这并不意味着客户端就不需要了解具体任务的执行状况,也不意味着,任务服务器之间不需要了解别家任务执行的情形,只不过,由于整个集群各机器的联系比HDFS复杂的多,直接通信过于的难以维系,所以,都统一由作业服务器整理转发。另外,从这幅图可以看到,任务服务器不是一个人在战斗,它会像孙悟空一样招出一群宝宝帮助其具体执行任务。这样做的好处,个人觉得,应该有安全性方面的考虑,毕竟,任务的代码是用户提交的,数据也是用户指定的,这质量自然良莠不齐,万一碰上个搞破坏的,把整个任务服务器进程搞死了,就因小失大了。因此,放在单独的地盘进行,爱咋咋地,也算是权责明确了。。。

与分布式文件系统相比,Map/Reduce框架的还有一个特点,就是可定制性强。文件系统中很多的算法,都是很固定和直观的,不会由于所存储的内容不同而有太多的变化。而作为通用的计算框架,需要面对的问题则要复杂很多,在各种不同的问题、不同的输入、不同的需求之间,很难有一种包治百病的药能够一招鲜吃遍天。作为Map/Reduce框架而言,一方面要尽可能的抽取出公共的一些需求,实现出来。更重要的,需要提供良好的可扩展机制,满足用户自定义各种算法的需求。Hadoop是由Java来实现的,因此通过反射来实现自定义的扩展,显得比较小菜一碟了。JobConf类中,定义了大量的接口,这基本上是Hadoop Map/Reduce框架所有可定制内容的一次集中展示。在JobConf中,有大量set接口接受一个Class<? extends xxx>的参数,通常它都有一个默认实现的类,用户如果不满意,则可自定义实现。。。

III. 计算流程

如果一切都按部就班的进行,那么整个作业的计算流程,应该是作业的提交-> Map任务的分配和执行-> Reduce任务的分配和执行-> 作业的完成。而在每个任务的执行中,又包含输入的准备-> 算法的执行-> 输出的生成,三个子步骤。沿着这个流程,我们可以很快的整理清晰整个Map/Reduce框架下作业的执行。。。

1、作业的提交

个作业,在提交之前,需要把所有应该配置的东西都配置好,因为一旦提交到了作业服务器上,就陷入了完全自动化的流程,用户除了观望,最多也就能起一个监督作用,惩治一些不好好工作的任务。。。

基本上,用户在提交代码阶段,需要做的工作主要是这样的:

首先,书写好所有自定的代码,最起码,需要有MapReduce的执行代码。Hadoop中,Map需要派生自Mapper<K1, V1, K2, V2>接口Reduce需要派生自Reducer<K2, V2, K3, V3>接口。这里都是用的泛型,用以支持不同的键值类型。这两个接口都仅有一个方法,一个是map,一个是reduce,这两个方法都直接受四个参数,前两个是输入的相关的数据结构,第三个是作为输出相关的数据结构,最后一个,是一个Reporter类的实例,实现的时候可以利用它来统计一些计数。除了这两个接口,还有大量可以派生的接口,比如分割的Partitioner<K2, V2>接口。。。

然后,需要书写好主函数的代码,其中最主要的内容就是实例化一个JobConf类的对象,然后调用其丰富的setXXX接口,设定好所需的内容,包括输入输出的文件路径,MapReduce的类,甚至包括读取写入文件所需的格式支持类,等等。。。

最后,调用JobClientrunJob方法,提交此JobConf对象runJob方法会先行调用到JobSubmissionProtocol接口所定义的submitJob方法,将此作业,提交给作业服务。接着,runJob开始循环,不停的调用JobSubmissionProtocolgetTaskCompletionEvents方法,获得TaskCompletionEvent类的对象实例,了解此作业各任务的执行状况。。。

2Map任务的分配

当一个作业提交到了作业服务器上,作业服务器会生成若干个Map任务,每一个Map任务,负责将一部分的输入转换成格式与最终格式相同的中间文件。通常一个作业的输入都是基于分布式文件系统的文件(当然在单机环境下,文件系统单机的也可以...),因为,它可以很天然的和分布式的计算产生联系。而对于一个Map任务而言,它的输入往往是输入文件的一个数据块,或者是数据块的一部分,但通常,不跨数据块。因为,一旦跨了数据块,就可能涉及到多个服务器,带来了不必要的复杂性。

当一个作业,从客户端提交到了作业服务器上,作业服务器会生成一个JobInProgress对象,作为与之对应的标识,用于管理。作业被拆分成若干个Map任务后,会预先挂在作业服务器上的任务服务器拓扑树。这是依照分布式文件数据块的位置来划分的,比如一Map任务需要用某个数据块,这个数据块有三份备份,那么,在这三台服务器上都会挂上此任务,可以视为是一个预分配。

关于任务管理和分配的大部分的真实功能和逻辑的实现,JobInProgress则依托JobInProgressListenerTaskScheduler的子类。TaskScheduler,顾名思义是用于任务分配的策略类(为了简化描述,用它代指所有TaskScheduler的子类...)。它会掌握好所有作业的任务信息,其assignTasks函数,接受一个TaskTrackerStatus作为参数,依照此任务服务器的状态和现有的任务状况,为其分配新的任务。而为了掌握所有作业相关任务的状况,TaskScheduler会将若干个JobInProgressListener注册到JobTracker中去,当有新的作业到达、移除或更新的时候,JobTracker会告知给所有的JobInProgressListener,以便它们做出相应的处理。

任务分配是一个重要的环节,所谓任务分配,就是将合适作业的合适任务分配到合适的服务器上。不难看出,里面蕴含了两个步骤,先是选择作业,然后是在此作业中选择任务。和所有分配工作一样,任务分配也是一个复杂的活。不良好的任务分配,可能会导致网络流量增加、某些任务服务器负载过重效率下降,等等。不仅如此,任务分配还是一个无一致模式的问题,不同的业务背景,可能需要不同的算法才能满足需求。因此,在Hadoop中,有很多TaskScheduler的子类,像FacebookYahoo,都为其贡献出了自家用的算法。在Hadoop中,默认的任务分配器,是JobQueueTaskScheduler。它选择作业的基本次序是:Map Clean Up TaskMap任务服务器的清理任务,用于清理相关的过期的文件和环境...-> Map Setup TaskMap任务服务器的安装任务,负责配置好相关的环境...-> Map Tasks -> Reduce Clean Up Task -> Reduce Setup Task -> Reduce Tasks。在这个前提下,具体到Map任务的分配上来。当一个任务服务器工作的游刃有余,期待获得新的任务的时候,JobQueueTaskScheduler会按照各个作业的优先级,从最高优先级的作业始分配。每分配一个,还会为其留出余量,已被不时之需。举一个例子:系统目前有优先级321的三个作业,每个作业都有一个可分配的Map任务,一个任务服务器来申请新的任务,它还有能力承载3个任务的执行,JobQueueTaskScheduler会先从优先级3的作业上取一个任务分配给它,然后再留出一个1任务的余量。此时,系统只能在将优先级2作业的任务分配给此服务器,而不能分配优先级1的任务。这样的策略,基本思路就是一切为高优先级的作业服务,优先分配不说,分配了好保留有余力以备不时之需,如此优待,足以让高优先级的作业喜极而泣,让低优先级的作业感慨既生瑜何生亮,甚至是活活饿死。。。

确定了从哪个作业提取任务后,具体的分配算法,经过一系列的调用,最后实际是由JobInProgressfindNewMapTask函数完成的。它的算法很简单,就是尽全力为此服务器非配且尽可能好的分配任务也就是说,只要还有可分配的任务,就一定会分给它,而不考虑后来者。作业服务器会从离它最近的服务器开始,看上面是否还挂着未分配的任务(预分配上的),从近到远,如果所有的任务都分配了,那么看有没有开启多次执行,如果开启,考虑把未完成的任务再分配一次(后面有地方详述...)。。。

对于作业服务器来说,把一个任务分配出去了,并不意味着它就彻底解放,可以对此任务可以不管不顾了。因为任务可以在任务服务器上执行失败,可能执行缓慢,这都需要作业服务器帮助它们再来一次。因此在Task中,记录有一个TaskAttemptID,对于任务服务器而言,它们每次跑的,其实都只是一个Attempt而已,Reduce任务只需要采信一个的输出,其他都算白忙乎了。。。

3Map任务的执行

HDFS类似,任务服务器是通过心跳消息,向作业服务器汇报此时此刻其上各个任务执行的状况,并向作业服务器申请新的任务的。具体实现,是TaskTracker调用InterTrackerProtocol协议的heartbeat方法来做的。这个方法接受一个TaskTrackerStatus对象作为参数,它描述了此时此任务服务器的状态。当其有余力接受新的任务的时候,它还会传入acceptNewTaskstrue的参数,表示希望作业服务器委以重任。JobTracker接收到相关的参数后,经过处理,会返回一个HeartbeatResponse对象。这个对象中,定义了一组TaskTrackerAction,用于指导任务服务器进行下一步的工作。系统中已定义的了一堆其TaskTrackerAction的子类,有的对携带的参数进行了扩充,有的只是标明了下ID,具体不详写了,一看便知。

TaskTracker收到的TaskTrackerAction中,包含了LaunchTaskAction,它会开始执行所分配的新的任务。在TaskTracker中,有一个TaskTracker.TaskLauncher线程(确切的说是两个,一个等Map任务,一个等Reduce任务),它们在痴痴的守候着新任务的来到。一旦等到了,会最终调用到TaskcreateRunner方法,构造出一个TaskRunner对象,新建一个线程来执行。对于一个Map任务,它对应的RunnerTaskRunner的子类MapTaskRunner不过,核心部分都在TaskRunner的实现内。TaskRunner会先将所需的文件全部下载并拆包好,并记录到一个全局缓存中,这是一个全局的目录,可以供所有此作业的所有任务使用。它会用一些软链接,将一些文件名链接到这个缓存中来。然后,根据不同的参数,配置出一个JVM执行的环境,这个环境JvmEnv类的对象对应。

接着,TaskRunner会调用JvmManagerlaunchJvm法,提交给JvmManager处理。JvmManager用于管理该TaskTracker上所有运行的Task子进程。在目前的实现中,尝试的是池化的方式。有若干个固定的槽,如果槽没有满,那么就启动新的子进程,否则,就寻找idle的进程,如果是同Job的直接放进去,否则杀死这个进程,用一个新的进程代替。每一个进程都是由JvmRunner来管理的,它也是位于单独线程中的。但是从实现上看,这个机制好像没有部署开,子进程是死循环等待,而不会阻塞在父进程的相关线程上,父线程的变量一直都没有个调整,一旦分配,始终都处在繁忙的状况了。

真实的执行载体,是Child,它包含一个main函数,进程执行,会将相关参数传进来,它会拆解这些参数,并且构造出相关的Task实例,调用其run函数进行执行。每一个子进程,可以执行指定个数量的Task,这就是上面所说的池化的配置。但是,这套机制在我看来,并没有运行起来,每个进程其实都没有机会不死而执行新的任务,只是傻傻的等待进程池满,而被一刀毙命。也许是我老眼昏花,没看出其中实现的端倪。。。

分享到:
评论

相关推荐

    Windows平台下Hadoop的Map/Reduce开发

    讲述了Windows平台的Hadoop安装... 最后,以最简单的求和为例,剖析Hadoop的Map/Reduce工作机制,对于初学Hadoop及Map/Reduce的读者有很大的帮助。相信通过最简单的求和为例,读者可步入Hadoop的Map/Reduce开发者行列。

    hadoop中map/reduce

    hadoop中map/reduce自学资料合集

    hadoop之map/reduce

    hadoop开发文档

    基于Map/Reduce的分布式搜索引擎研究

    【摘要】在对Map/Reduce算法进行分析的基础上,利用开源Hadoop软件设计出高容错高性能的分布式搜索引擎,以面对搜索引擎对海量数据的处理和存储问题。

    远程调用执行Hadoop Map/Reduce

    NULL 博文链接:https://sgq0085.iteye.com/blog/1879442

    基于Map_Reduce的分布式搜索引擎研究

    在对Map/Reduce算法进行分析的基础上,利用开源Hadoop软件设计出高容错高性能的分布式搜索引擎,以面对搜索引擎对海量数据的处理和存储问题

    Hadoop Map Reduce教程

    Hadoop Map Reduce教程,介绍hadoop map/reduce框架的各个方面

    基于Map_Reduce的并行关联分析方法

    本文在研究BIRCH算法、规则关联算法、Hadoop的map/reduce机制的基础上,提 出了一种基于map/reduce的应用于网络安全事件分析的并行关联方法。一方面,通过对BIRCH 算法的改进,在BIRCH的分层次思想中引入预定义的...

    最高气温 map reduce hadoop 实例

    自己的第一个hadoop 实例,好高兴分享一下。 运行命令hadoop jar ‘/home/hadoop/downloas/max.jar’ upload.MaxTemperature

    eclipse-hadoop3x-master.rar

    Map:俗点说就是直接把数据打散,一份数据把它切分成多份小的数据进行处理,这个过程可以称之为Map。 Reduce:有打散当然要有聚合,把处理完的数据再重新合成一个,这个过程称之为Reduce。 这两个操作实际上就是...

    大数据云计算技术系列 hadoop搭建与eclipse开发环境设置-已验证通过(共13页).pdf

    重启eclipse,打开windows-&gt;open perspective-&gt;other-&gt;map/reduce 可以看到map/reduce开发视图。 1.2 设置连接参数 打开windows-&gt;show view-&gt;other-&gt; map/reduce Locations视图,在点击大象后弹出的对话框(General ...

    hadoop-eclipse-plugins

    eclipse中使用Hadoop Map/Reduce插件进行map/reduce的开发

    基于Hadoop 平台的数据分析方案的设计

    面对互联网上的海量数据,...析,给出相应Map/Reduce 程序的设计思路和实例,并提出Map/Reduce 分布式程序的部分设 计和性能优化方法,实验结果表明,本文提出的这些方法能简化Map/Reduce 程序设计、有 效提高程序性能

    Hadoop下的分布式搜索引擎

    了Map/Reduce模型的开源实现版本——Hadoop分布 式处理平台,在此基础上将搜索引擎的爬行器、索引器和 查询器三个功能模块按照Map/Reduce模型进行设计, 充分利用Hadoop的集群拓扑特性,实现了搜索引擎的分 布式处理...

    基于Java和mapreduce实现的贝叶斯文本分类器设计.zip

    本项目为一个Hadoop课程设计,使用Java语言和map/reduce实现贝叶斯文本分类器。项目的具体内容如下:1:用MapReduce算法实现贝叶斯分类器的训练过程,并输出训练模型; 2:用输出的模型对测试集文档进行分类测试。...

    基于Hadoop的数据分析.doc

    " " " "(3)设置Map/Reduce location,选择Map/Reduce locations,new hadoop " "location,将其中的内容设置成下图所示的内容: " " " "设置Advanced parameters中的tmp文件夹位置为/usr/local/hadoop/tmp,...

    Hadoop集群安装

    /opt/hadoop$ scp -r /opt/hadoop/* 主機二:/opt/hadoop/ step 8. 格式化HDFS 以上我們已經安裝及設定好 Hadoop 的叢集環境,接著讓我們來啟動 Hadoop ,首先還是先格式化hdfs,在"主機一" 上操作 • /opt/hadoop$ ...

    hadoop-eclipse-plugin-2.7.3.jar

    放到eclipse的plugins目录下。重启eclipse ...配置 Map/Reduce Master和DFS Mastrer,Host和Port配置成hdfs-site.xml与core-site.xml的设置一致即可。  如果连接成功,会出现hdfs上面的文件夹

    hadoop 1.2.1 api 最新chm 伪中文版

    Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。 一个Map/Reduce 作业(job) 通常会把输入的...

    分布式计算(MapReduce).docx

    这是一篇关于分布式计算的毕业设计论文,内容摘要:分布式式计算,同样是一个宽泛的概念,在这里,它狭义的指代,按Google Map/Reduce框架所设计的分布式框架。在Hadoop中,分布式文件系统,很大程度上,是为各种...

Global site tag (gtag.js) - Google Analytics