-
KeyValue对RDDS 创建KeyValue对RDDS: 使用map()函数,返回key/value对 eg: 包含数行数据的RDD,把每一行数据的第一个单词作为keys; val rdd=sc.textFile("/home/1707498/YC_test/words.txt") rdd.foreach(println) var rdd2=rdd.map(line=>(line.spilt(" ")(0),line)) 每一行的第一个座位key,整行数据座位value KeyValue对Rdds的Transformation eg:val rdd=sc.parallelize(Array((1,2),(3,4),(3,6))) rdd.reduceByKey(func): 把相同的key结合 eg:rdd.reduceByKey((x,y)=>x+y) 结果:{1,2),(3,10)} rdd.groupByKeyByKey(func):把相同key的values分组 eg:rdd.groupByKeyByKey((x,y)=>x+y) 结果:{(1,[2]),(3,[4,6])} rdd.mapValues(func):函数作用于pairRDD的每个元素,key不变 eg:rdd.mapValues(x=>x+1) 结果:{(1,3),(3,5),(3,7)} rdd.flatMapValues(func):符号化的时候使用; eg:rdd.flatMapValues(x=>x to 5) 结果:{(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)} keys():仅返回keys values():仅返回values sortByKey():按照key排序的RDD *重要 combinByKey(createCombiner,mergeValue,mergeCombiners,partitioner):把相同的key结合,使用不同的返回类型 最常用的聚合函数,返回的类型可以与输入类型不一样,许多; 遍历partition中的元素,元素的key,要么之前见过的,要么不是(rdd中很多个分区组成); 如果是新元素key,使用我们提供的createCombiner()函数(相当于初始化) 如果是这个partition中已经存在的key,就会使用mergeValue()函数(相当于整合) 合计每个partition的结果的时候,使用mergeCombiners()函数 eg:求平均值 val scores=sc.parallelize(Array(("jake",80.0),("jake",90.0),("jake",90.0),("mary",89.0),("mary",40.0),("mary",90.0)) scores.foreach(println) val score2=scores.combinByKey(x=>(1,x),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double)=>(c1._1+c2_1,c1._2+c2._2))) x=>(1,x) 想要求平均值,需要知道科目的总和,科目的个数,每遍历一个新key记1 c1:(Int,Double) int表示累计科目数,double累计分数 c1._1+1,c1._2+newScore c1._1取的第一个值,遇到新的key就加1 c1._2+newScore 分数累加 c1:(Int,Double),c2:(Int,Double)=>(c1._1+c2_1,c1._2+c2._2) 每一个分区中汇总的科目数、分数汇总 val score2=scores.combinByKey(x=>(1,x),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double)=>(c1._1+c2_1,c1._2+c2._2))) val average=score2.map{case(name,(num,score))=>(name,score/num)} case(name,(num,score)) 判断传递过来的类型是否正确 name,score/num) 正确的话便执行求均值
查看全部 -
errorsRDD和waringsRDD都是inputRDD经过filter操作后生成的新的RDD,这两个RDD经过union操作后生成新的badLinesRDD,这样一步一步组成血统关系图
延迟计算
Spark对DDS的计算是在第一次使用action操作的时候才使用;
这种方式在处理大数据的时候特别有用,可以减少数据的传输(因为在第一次ation时才使用);
Spark内部记录metadata,表名transformations操作已经被响应了
加载数据也是延迟计算,数据只有在必要的时候,才会被加载进去(只有使用的时候才会被加载进去)
RDD.persist():
默认每次在RDDS上面进行action操作时,Spark都重新计算RDDS;
如果想重复利用一个RDDD,可以使用RDD.persist()(如还想使用上述union的BadLinesRDD会从inputRDD开始action一遍,RDD.persist则无需重复上诉过程)
unpersist()方法从缓存中移除;
查看全部 -
RDDS介绍
SparkContext:代表和集群的连接
Driver Program 通过SparkContext对象访问Spark;
代表和一个集群的连接
在shell中SparkContext自动创建好了,就是sc
RDDS:
eg:var lines=sc.textfile("/rrr.txt")
弹性分布式数据集(把yyy.txt的数据加载到弹性分布式数据集lines中)
并行分布在整个集群中(如果有500G的数据,分成5块放在不同的集群中);
是Spark分发数据和计算的基础抽象类;
是一个不可改变的弹性分布式数据集(增删改后相当于生成了一个新的RDD)
Spark中所有的计算都是通过RDDs创建、转换操作完成的;
一个RDDS的内部有很多分片组成的(如果有500G的数据,分成5块放在不同的集群中,每台100G)
分片:
每个分片包括一部分数据,partions在在集群不同节点上计算;
分片是spark并行处理的单元,Spark是顺序的、并行的处理分片
RDDS的创建方法:
把一个存在的集合传给SparkContext的parallelize()方法;
eg: val rdd =sc.parallelize(Array(1,2,2,4),4)
第一个参数:待并行化处理的集合,第二个参数:分区个数
rdd.foreach(print) 遍历
rdd.foreach(println) 换行,多次打印顺序不一致,因为分成4个分片,先取到那个分片是随机的;
加载外部数据集(很多方法,也可以加载集群上的数据);
eg: val rddText=sc.textTextFile("yyy.txt")
Scala的变量声明:
创建变量的时候,必须使用val或者var;
val:变量值不可修改,一旦分配不能重新指向别的值
var:分配后,可以指向类型相同的值;
Scala的匿名解析和类型解析:
eg: lines.filter(line => line.contains("world"))
定义一个匿名函数,接收一个参数line指向包含‘word’的行
使用line这个String类型变量上的contains方法,并且返回结果;
line的类型不需要指定,能够推断出来
查看全部 -
20180924:讲的非常一般,看了十分钟,不想继续了,先放着吧。查看全部
-
Spark组件:
Spark core(其余组件军继承了RDD API):
包含Spark基本功能,任务调度,内存管理,容错机制等;
内部定义了RDDs(弹性分布式数据集);
提供了很多APIs来创建这些RDDs;
应用场景,为其他组件提供底层服务;
Spark SQl:
是Spark处理结构化数据的库,类似hive sql,mysql;
应用场景:企业中做报表统计
Spark Streaming:
是实时数据流处理组件,类似Storm;
Spark Streaming提供了API来操作实时流数据;
应用场景,企业中用来从kafka接收数据作实时统计
Mlib:
一个包含通用机器学习功能的包,Machine learnimg lib;
包含分类,聚类,回归等,还包括模型评估和数据导入;
MLIB提供以上的方法都支持集群上的横向扩展(python 单机,);
应用场景:机器 学习
Graphx:
是处理图的库(如社交网络图),并进行图的并行计算;
提供了各种图的操作,和常用的图算法(如PangeRank);
应用场景:图计算;
Cluster Managers:
集群管理,Spark自带一个集群 管理是单独调度器;
长健集群关键包括hadoop yarn,Apache Mesos;
紧密集成的优点:
Spark底层优化了,基于Spark底层的组件也得到了相应的优化;
紧密集成,节省和各个组件组合使用时的部署,测试等时间;
向Spark增加新的组件时,其他组件可立刻享用新组件的功能;
查看全部 -
Spark
快速且通用的集群计算平台
特点:
快速:
扩充了流行的MR计算模型(如MR是分/时计算,那么Spark就是秒/分及计算)
基于内存的计算(计算过程难免产生中间结果,中中间结果放在内存比硬盘快很多)
通用:
容纳了其他分布式系统拥有的功能
包括批处理(如Hadoop),迭代式计算(其他机器学习系统),交互式查询(如hive)和流处理(如stom)
优点:大量降低了维护成本;
高度开放:
提供了python、java、scala、sql的api和丰富的内置库;
和其他的大数据工具整合的很好,包括hadoop、kafka等等
查看全部 -
存下学习查看全部
-
存下学习查看全部
-
把数据加载到节点的内存中,使得分布式处理在秒级完成
查看全部 -
1.spark持久化的集中方式
查看全部 -
spark大纲
查看全部 -
创建keyvalue查看全部
-
persist缓存级别补充查看全部
-
persist缓存级别查看全部
-
persist函数,使得可以重复利用rdd查看全部
举报