2018年通信业务公司软件研究院MapReduce与Spark实战培训课件.pdf
下载文档
上传人:地**
编号:1266426
2024-12-16
48页
1.59MB
该文档所属资源包:
通信业务公司软件研究院大数据技术信息安全IT总体规划培训课件资料
1、MapReduceMapReduce与与SparkSpark实战实战 目录一MapReduce计算模型与工作机制二Spark计算模型与工作机制三MapReduce实战四Spark实战 MapReduce概念一种编程模型用于大规模数据集(大于1TB)的并行运算包含“Map(映射)”和“Reduce(归约)”便于编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上 MapReduce计算原理原理:利用一个输入Key/Value pair集合来产生一个输出的Key/Value pair集合Map函数:接受一个输入的Key/Value pair值,然后产生一个中间Key/Value p2、air值的集合。Reduce函数:接受一个中间Key值和相关的一个Value值的集合,合并这些Value值。Datastore1mapmapDatastore2Input key*value pairsInput key*value pairskey 1,Valueskey 2,Valueskey 3,Valueskey 1,Valueskey 2,Valueskey 3,Values=Barrier=:Aggregates intermediate values by output keyReduceReduceReducekey 1,Intermefiatevalueskey 2,Inte3、rmefiatevalueskey 3,Intermefiatevaluesfinal key 1valuesfinal key 1valuesfinal key 1values MapReduce1.0组成 Master 使用者发起作业 指派任务到Tasktrackers 作业分配、错误处理JobtrackerTasktrackers Workers 运行Map、Reduce任务 管理存储、汇报运算结果 MapReduce2.0组成 集群中所有资源的统一管理和分配,它接收来自各个节点(NodeManager)的资源汇报信息,并把这些信息按照一定的策略分配给各个应用程序Resource Man4、agerNode Manager 与ResourceManager保持通信,管理Container的生命周期、监控每一个Container的资源使用(内存、CPU等)情况、追踪节点健康状况 MapReduce2.0架构 Client 与MapReduceMapReduce1 1.0 0的ClientClient类似,用户通过ClientClient与YARNYARN交互,提交MapReduceMapReduce作业,查询作业运行状态,管理作业等。MRAppMaster 功能类似于 1.01.0中的JobTrackerJobTracker,但不负责资源管理;功能包括:任务划分、资源申请并将之二次5、分配给MapMap TaskTask和ReduceReduce TaskTask、任务状态监控和容错。MapReduce2.0架构 MapReduce2.0架构 MapReduce作业流程 MapReduce任务执行过程 MapReduce执行框架的组件和执行流程 目录一MapReduce计算模型与工作机制二Spark计算模型与工作机制三MapReduce实战四Spark实战 MapReduceMapReduce的局限性的局限性 Hadoop(MapReduce)极大的简化了大数据分析,但是,随着大数据需求和使用模式的扩大,MapReduce存在以下局限:更复杂的多重处理效率低(比如迭代计算,6、ML,Graph)不适合低延迟的交互式查询(比如ad-hoc query)不适合流式处理(点击日志分析)MapReduce作业结果需要到固化到硬盘上,由此产生数据备份、磁盘I/O和序列化等操作产生了大量的开销,也由于这一缺点,MapReduce计算模型的架构不适合于上述场景 Spark简介 Spark简介MapReduceImpalaPregelDremelGraphLabStormGiraphDrillTezS4批处理系统实时计算,流处理,图计算框架BSP能否有一种灵活的框架可同时进行批处理、流式计算、交互式计算等?能否有一种灵活的框架可同时进行批处理、流式计算、交互式计算等?框架多样化 S7、park简介统一的框架:批处理、流式计算、交互式计算、图计算统一的框架:批处理、流式计算、交互式计算、图计算 Spark简介 是什么是什么 Spark是基于内存计算的大数据并行计算框架;2009年诞生于加州大学伯克利分校AMP LAB,之后成为Apache社区的顶级开源项目;做什么做什么 数据处理数据处理(Data Processing):可以用来快速处理数据,兼具容错性和可扩展性。迭代计算迭代计算(Iterative Computation):支持迭代计算,有效应对多步的数据处理逻辑。数据挖掘数据挖掘(Data Mining):在海量数据基础上进行复杂的挖掘分析,可支持各种数据挖掘和机器学习8、算法。Spark简介 SparkSpark特点特点 轻:Spark核心代码只有3万行,代码量比MapReduce少25倍。Scala语言的简洁和丰富表达力 巧妙利用了Hadoop和Mesos的基础设施 快:Spark对小数据集可达到亚秒级的延迟,对大数据集的迭代机器学习、即席查询、图计算等应用,Spark版本比基于MR实现快10-100倍。内存计算、数据本地性和传输优化、调度优化 灵:Spark提供了不同层面的灵活性。Spark支持内存计算、迭代批量处理、即席查询、流处理和图计算等多种范式。巧:巧妙借力现有大数据组件。Spark借Hadoop之势,与Hadoop无缝结合;Spark简介Spar9、k 与其他框架对比Logistic regression Spark功能与架构|数据共享机制iter.1iter.2.InputHDFSreadHDFSwriteHDFSreadHDFSwriteInputquery 1query 2query 3result 1result 2result 3.HDFSread太慢,冗余读写、序列化、磁盘太慢,冗余读写、序列化、磁盘IOIOData Sharing in Data Sharing in MapReduceMapReduceData Sharing in SparkData Sharing in Spark1010-100 x100 x快于网络10、和磁盘快于网络和磁盘iter.1iter.2.InputDistributedmemoryInputquery 1query 2query 3.one-timeprocessing Spark功能与架构|计算模型 弹性分布式数据集(Resilient Distributed Datasets,RDD)集群分布式内存抽象,可以简单理解按分区分布在集群上的一个大的数组。只读的,可分区的分布式数据集 只能直接通过操作符来创建和处理 支持容错处理RDD的创建方式 从Hadoop文件系统输入创建 从父RDD转换得到新的RDD Spark功能与架构|计算模型 RDD 操作Transformation&Ac11、tion(变换和行动)Transformation操作是延迟计算的,一个RDD转换到另一个RDD的操作需要等到Action操作时才会触发。Action操作会触发Spark提交作业。Spark功能与架构|计算模型RDD操作示例 Spark功能与架构|计算模型RDD的依赖 窄依赖(Narrow Dependencies):子RDD的每一个分区依赖于父RDD的部分分区 宽依赖(Wide Dependencies):子RDD的分区依赖于父RDD的所有分区流水线操作流水线操作 窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区。例如,逐个元素地执行map、然后filter操作 宽12、依赖则需要首先计算好所有父分区数据,然后在节点之间进行数据混合,这与MapReduce类似。容错机制的处理容错机制的处理 窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算 对于一个宽依赖关系的Lineage,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。窄依赖与宽依赖的区别 Spark功能与架构|计算模型 Spark功能与架构|容错机制 血统关系(Lineage)记录RDD是如何从其它RDD中演变过来的一系列操作 当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数13、据分区iter.1iter.2.InputDistributedmemoryInputquery 1query 2query 3.one-timeprocessing快的同时,也要保证系统鲁棒性 Spark功能与架构|容错机制 检查点(Checkpoint)RDD定期做checkpoint,失败后,从某个断点开始计算iter.1iter.2.InputDistributedmemoryInputquery 1query 2query 3.one-timeprocessing快的同时,也要保证系统鲁棒性 Spark功能与架构|架构 Spark功能与架构|任务调度RDD ObjectsDAGSch14、edulerTaskSchedulerWorkerrdd1.join(rdd2).groupBy().filter()build operator DAGsplit graph into stages of taskssubmit each stage as readylaunch tasks via cluster managerretry failed or straggling tasksexecute tasksstore and serve blocksDAGTaskSetClustermanagerBlock managerThreadsTask Spark功能与架构|任务调度按窄15、依赖划分Stage依据分区来划分Join操作重用依据缓存的数据joinuniongroupBymapStage 3Stage 1Stage 2A:B:C:D:E:F:G:=previously computed partitionTask 目录一MapReduce计算模型与工作机制二Spark计算模型与工作机制三MapReduce实战四Spark实战 MapReduce编程之wordcount 场景:有大量的文本文件,每个文件里存储了很多的单词 任务:统计每个单词出现的次数 应用:搜索引擎中,统计最流行的K个搜索词 统计搜索词频率,帮助优化搜索词提示 MapReduce编程之wordcount16、MapReduce输入输入输出输出 MapReduce编程之wordcountMapMapMap MapReduce编程之wordcountReduceReduceReduceReduce MapReduce编程之wordcountpublic void map(Object key,Text value,Context context)throws IOException,InterruptedException StringTokenizer itr=new StringTokenizer(value.toString();System.out.println(map line:+itr)17、;while(itr.hasMoreTokens()word.set(itr.nextToken();System.out.println(map word:+word);context.write(word,one);System.out.println(map word:+word+:one:+one);MapReduce编程之wordcountpublic void reduce(Text key,Iterable values,Context context)throws IOException,InterruptedException int sum=0;for(IntWritabl18、e val:values)sum+=val.get();result.set(sum);context.write(key,result);System.out.println(key+=+result);MapReduce编程之wordcountpublic static void main(String args)throws Exception Configuration conf=new Configuration();String otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs19、.length!=2)System.err.println(Usage:wordcount );System.exit(2);Job job=new Job(conf,word count);job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOut20、putValueClass(IntWritable.class);FileInputFormat.addInputPath(job,new Path(otherArgs0);FileOutputFormat.setOutputPath(job,new Path(otherArgs1);System.exit(job.waitForCompletion(true)?0:1);目录一MapReduce计算模型与工作机制二Spark计算模型与工作机制三MapReduce实战四Spark实战 Spark 编程|shell客户端Spark提供多种语言的API和客户端,包括R、SQL、Python、Sca21、la和Javabeeline:通过Thrift JDBC/ODBC server连接spark/hive,执行sql查询pyspark:python接口客户端Spark-shell:scala接口客户端sparkR:R语言接口客户端Spark-sql:sql查询客户端spark-submit:作业提交命令 Spark 编程|shell客户端 Spark-shell 交互式运行spark代码(scala交互式终端)可用于快速体验spark,查看程序运行结果等bin/spark-shellscala val data=Array(1,2,3,4,5)/产生datascala val distDat22、a=sc.parallelize(data)/生成RDDscala val result=distData.reduce(_+_)/求和 Spark 编程|作业提交 Spark 编程|基本流程返回结果保存到HDFS中,或直接打印出来在RDD之上进行转换和actionSpark提供了多种转换和action函数创建RDD可从集合或Hadoop数据集上创建创建SparkContext对象封装了spark执行环境信息 Spark 编程|scala编程 Spark 编程|Python编程 Spark 编程|java编程安装eclipse新建java工程导入依赖包编写spark程序打成jar包Spark-submit执行 Spark 编程|java编程THANKS