-
RDDS查看全部
-
RDDs之SparkContext sc查看全部
-
RDD之Driver program查看全部
-
Spark
Spark简介
Spark是什么:
Spark是一个快速且通用的集群计算平台
Spark的特点
Spark是快速的
Spark扩充了流行的MapReduce计算模型
Spark是基于内存的计算
Spark是通用的
Spark的设计容纳了其他分布式系统拥有的功能,批处理,迭代式计算,交互查询和流处理等。
优点:降低了维护成本
Spark是高度开放的
Spark提供了Python, Java, Scala, SQL的API和丰富的内置库。
Spark和其他的大数据工具整合的很好,包括Hadoop, Kafka等
Spark历史
诞生于2009年,加州大学伯克利分校RAD实验室的一个研究项目,最初是基于Hadoop MapReduce
发现MapReduce在迭代式计算和交互式上抵消,引入内存存储
2010年3月份Spark开源
2011年AMP实验室在Spark上开发高级组件,像Spark Streaming
2013年转移到了Apache下,不久便成为顶级项目了。
Spark组件:Spark包含多个紧密集成的组件
Spark Core:
包含Spark的基本功能,包含任务调度,内存管理,容错机制等。
内部定义了RDDs(弹性分布式数据集)
提供了很多API来创建和操作这些RDDs。
应用场景,为其他组件提供底层的服务。
Spark SQL:
是Spark处理结构化数据的库,就像Hive SQL,MySQL一样。
应用场景,企业中用来做报表统计。
Spark Streaming:
是实施数据流处理的组件,类似Storm。
Spark Streaming提供了API来操作实施流数据。
应用场景,企业中用来从Kafka接受数据做实时统计。
Mlib:
一个包含通用机器学习功能的包,Machine learning lib.
包含分类,聚类,回归等,还包括模型评估,和数据导入。
Milb提供的上面这些方法,都支持集群上的横向扩展。
应用场景,机器学习。
Graphx:
是处理图的库(例如,社交网络图),并进行图的并行计算。
像Spark Streaming, Spark SQL一样, 它也集成了RDD API。
它提供了各种图的操作,和常用的图算法,例如PangeRank算法。
应用场景,图计算
Cluster Managers:
就是集群管理,Spark自带一个集群管理是单独调度器。
常见集群管理包括Hadoop YARN, Apache Mesos。
紧密集成的优点:
Spark底层优化了,基于Spark底层的组件,也得到了相应的优化。
紧密集成,节省了各个组件组合使用时的部署,测试等时间。
向Spark增加新的组件时,其他组件,可立刻享用新组件的功能。
Spark与Hadoop的比较
Hadoop应用场景
离线处理
对时效性要求不高
Spark应用场景
时效性要求高的场景
机器学习等领域
比较
Doug Cutting(Hadoop之父)的观点
这是生态系统,每个组件都有其作用,各善其职即可。
Spark不具有HDFS的存储能力,要借助HDFS等持久化数据。
大数据将会孕育出更多的新技术。
Spark安装
Spark运行环境
Spark 是scala写的,运行在JVM上,所以运行环境Java7+
如果使用Python API,需要安装Python 2.6+或者Python3.4+。
Spark 1.6.2 -Scala 2.10 Spark 2.0.0 -Scala 2.11
Spark下载:
下载地址:http://spark.apache.org/downloads.html
搭Spark不需要Hadoop,如有hadoop集群,可下载相应的版本解压。
Spark目录
bin:包含用来和Spark交互的可执行未见,如Spark shell。
core, streaming, python, ...包含主要组件的源代码。
examples包含一些单机的Spark jobs,可以单机运行的例子。
Spark的Shell
Spark的shell使你能够处理分布在集群上的数据。
Spark把数据加载到节点的内存中,因此分布式处理可在秒级完成。
快速使迭代式计算,实时查询,分析一般能够在shell中完成。
Spark提供了Python shells和Scala shells。
Python Shell:
bin/pyspark
Scala Shell
bin/spark-shell
```
var lines = sc.textFile("../testfile/helloSpark")
lines.conut()
lines.first()
修改日志级别log4j.rootCategory=WARN, console
```
Spark开发环境的搭建
Scala安装:
IDEA下载
插件安装:Scala
搭建开发环境常遇到的问题
网络问题,导致sbt插件下载失败,解决方法,找一个好的网络环境。
版本匹配问题:Scala2.10.5, jdk1.8
IntelliJ IDEA 常用设置
开发第一个Spark程序
配置ssh无密登陆:
ssh-keygen
.ssh目录下cat xxx_rsa.pub > authorized_keys
chmod 600 authorized_keys
WordCount:
创建一个Spark Context
加载数据
把每一行分割成单词
转换成pairs并且计数
RDDs介绍
Driver program:
包含程序的main()方法,RDDs的定义和操作。
它管理很多节点,我们称作executors。
SparkContext:
Driver programs通过SparkContext对象访问Spark。
SparkContext对象代表和一个集群的连接。
在Shell中SparkContext自动创建好了,就是sc。
RDDs:
Resilient distributed datasets(弹性分布式数据集,简称RDDs)
这些RDDs,并行的分布在整个集群中。
RDDs是Spark分发数据和计算的基础抽象类。
一个RDD是一个不可改变的分布式集合对象。
Spark中,所有的计算都是通过RDDs的创建,转换,操作完成的。
一个RDD内部有很多partition(分片)组成的。
分片:
每个分片包括一部分数据,partitions可在集群不同节点上计算。
分片是Spark并行处理的单元,Spark顺序的,并行的处理分片。
RDDs的创建方法:
把一个存在的集合传给SparkContext的parallelize()方法,测试用
val rdd = sc.parallelize(Array(1, 2, 2, 4), 4)
第1个参数:待并行化处理的集合
第2个参数:分区个数
加载外部数据集
val rddText = sc.textFile("helloSpark.txt")
Scala的基础知识
Scala的变量声明
创建变量是val/var
val:变量值是不可修改的,类似java final。
var:变量值定义完是可以修改的。
Scala的匿名函数和类型推断
lines.filter(line => line.contains("world")),定义一个匿名函数,接受line.
使用line这个Strig类型的变量上的contains方法,并且返回结果
line不需要制定类型,会自动推断
RDD基本操作 Transformation
Transformations介绍:
转换
从之前的RDD构建一个新的RDD,像map() 和 filter()。
逐元素Transformation
map(): map()接收函数,把函数应用到RDD的每一个元素,返回新的RDD。
filter(): filter()接受函数,返回只包含满足filter()函数的元素的新RDD。
flatMap(): flatMap()对每个输入元素,输出多个输出元素。flat压扁的意思,将RDD中的元素压扁后返回一个新的RDD。
集合运算
RDDs支持数学集合的计算,例如并集,交集计算。
RDD基本操作 Action
Action介绍:在RDD上计算出来一个结果。把结果返回给driver program或保存在文件系统,count(), save()
collect(): 返回RDD的所有元素。
count(): 计数。
countByValue(): 返回一个map表示唯一元素出现的个数。
take(num): 返回几个元素。
top(num): 返回前几个元素。
takeOrdered(num)(ordering): 返回基于提供的排序算法的前几个元素。
takeSample(withReplacement, num, [seed]): 取样例。
reduce(fun): 合并RDD中元素。
fold(zero)(fun): 与reduce()相似提供zero value。
aggregate(zeroValue)(seqOp, combOp): 与fold()相似,返回不同类型。
foreach(fun): 对RDD的每个元素作用函数,什么也不返回
RDDs的特性
RDDs的血统关系图:
Spark维护着RDDs之间的依赖关系和创建关系,叫做血统关系图
Spark使用血统关系图来计算每个RDD的需求和恢复丢失的数据
延迟计算(Lazy Evaluation):
Spark对RDDs的计算是,他们第一次使用action操作的时候。
这种方式在处理大数据的时候特别有用,可以减少数据的传输。
Spark内部记录metadata表明transformations操作已经被响应了。
加载数据也是延迟计算,数据只有在必要的时候,才会被加载进去。
RDD.persist(): 持久化
默认每次在RDDs上面进行action操作时,Spark都重新计算RDDs,如果像重复利用一个RDD,可以使用RDD.persist()。
unpersist()方法从缓存中移除。
例子RDD.persist()
KeyValue对RDDs
创建KeyValue对RDDs
使用map()函数,返回key/value对,例如,包含数行数据的RDD,把每行数据的第一个单词作为keys
KeyValue对RDDs的Transformations
reduceByKey(fun): 把相同Key的结合。
groupByKey(): 把相同的key的values分组。
combineByKey(): 把相同key的结合,使用不同的返回类型。
mapValues(fun): 函数作用于pairRDD的每个元素,key不变
flatMapValues(fun): 符号化的时候使用
keys: 仅返回keys
values: 仅返回values
sortByKey(): 按照key排序的RDD
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
最常用的基于key的聚合函数,返回的类型可以与输入类型不一样,许多基于key的聚合函数都用到了它,像groupByKey()
原理,便利partition中的元素,元素的key,要么之前见过的,要么不是。如果是新元素,使用我们提供的createCombiner()函数,如果是这个partition中已经存在的key,就会使用mergeValue()函数,合计每个partition的结果的时候,使用mergeCombiners()函数
Spark基础课程总结
Spark介绍。
Spark安装,开发环境搭建,WordConut程序开发和运行。
RDDs介绍,Transformations,Actions
RDDs的特性,KeyValue对RDDs
后续课程:
Spark架构
Spark运行过程
Spark程序部署
查看全部 -
reduce查看全部
-
Prest查看全部
-
开发第一个Spark程序放到集群上运行
查看全部 -
scala基础知识
变量声明:
创建变量必须使用val /var
区别:val:变量值不可修改,一旦分配不能重新指向别的值
var:分配后,可以指向类型相同的值
查看全部 -
rdds创建方法
查看全部 -
分片:
每个分片包含一部分数据,partitions可在集群不同节点上计算。
分片是spark并行处理的单元,spark顺序的、并行的处理分片
查看全部 -
一个RDD是一个不可改变的分布式集合对象。
Spark中,所有的计算都是通过RDDs的创建、转换和操作完成的。
一个RDD内部由很多partitions(分片)组成
查看全部 -
RDDs介绍:
RDDs: Resilient distributed datasets(弹性分布式数据集,简称RDDs)
RDDs是Spark分发数据和计算的基础抽象类
查看全部 -
spark context:
Driver programs 通过sparkContext对象访问spark
SparkContext对象代表和一个集群的连接
在shell中sparkContext自动创建好了,就是sc
查看全部 -
Driver program:
包含程序的main()方法,RDDs的定义和操作。
它管理很多节点,我们称作executors
查看全部 -
环境搭建,spark和Scala版本匹配
查看全部
举报