spark知识点总结

标签: spark

spark介绍:

spark是一种轻量快速的分布式的计算框架。并不提供存储数据能力。

spark数据源:可以是HDFS,本地文件系统,kafka等数据源。

Spark处理后的数据存储目的地:HDFS,本地文件系统,Hbase,关系型数据库等。

Spark即可以用于离线批处理,还可以用于实时处理计算,机器学习。

spark引入了缓存机制并且充分的应用了这一特性,所以Spark是一种高度依赖内存的计算框架。

cache() 使用默认存储级别调用persist() 是一样的。

persist()

 

基于非循环的数据流模型



RDD介绍:

一种分布式的内存抽象称为弹性分布式数据集(Resilient Distributed Dataset,RDD ),弹性指的是RDD可以灵活的调整分区数。

特点:1.有分区机制,可以分布式处理数据。

            2.有容错机制,RDD的数据如果丢失,可以恢复。

创建RDD的两种途径:

           1.将普通的集合类型(Array,List)转化为RDD.

            例如:val data = Array(1, 2, 3, 4, 5)               
                       val r1 = sc.parallelize(data)   //无分区   

                       val r2 = sc.parallelize(data,2) //两分区,分区目的为了分布式。

                        val rdd = sc.makeRDD(List(1,3,5,7,9))  

                         val rdd = sc.makeRDD(List(1,3,5,7,9),2)   

           2.从文件系统(本地文件系统,HDFS等)读取文件,把文件数据转变为RDD

           例如:val distFile = sc.textFile("data.txt") //无分区 

                      val distFile = sc.textFile("data.txt",2) //两分区,分区目的为了分布式

           查看RDD:

                    rdd.collect

                    收集rdd中的数据组成Array返回,此方法将会把分布式存储的rdd中的数据集中到一台机器中组建Array。

                    在生产环境下一定要慎用这个方法,容易内存溢出。

RDD操作:

1.Transformation(变换)操作属于操作(算子)不会真正触发RDD的处理计算,每当使用一次都会生成一个新的RDD.

2.Actions(执行)操作才会真正触发计算。

Transformations

Transformation

Meaning

map(func)

Return a new distributed dataset formed by passing each element of the source through a function func.

参数是函数,函数应用于RDD每一个元素,返回值是新的RDD

案例展示:

map 将函数应用到rdd的每个元素中

val rdd = sc.makeRDD(List(1,3,5,7,9))

rdd.map(_*10)

 

flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

扁平化map对RDD每个元素转换, 然后再扁平化处理

案例展示:

flatMap 扁平map处理

val rdd = sc.makeRDD(List("hello world","hello count","world spark"),2)

 

rdd.map(_.split{" "})//Array(Array(hello, world), Array(hello, count), Array(world, spark))

 

rdd.flatMap(_.split{" "})//Array[String] = Array(hello, world, hello, count, world, spark)

//Array[String] = Array(hello, world, hello, count, world, spark)

 

注:mapflatMap有何不同?

map: 对RDD每个元素转换

flatMap: 对RDD每个元素转换, 然后再扁平化(即去除集合)

 

所以,一般我们在读取数据源后,第一步执行的操作是flatMap

 

filter(func)

Return a new dataset formed by selecting those elements of the source on which func returns true.参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD

案例展示:

filter 用来从rdd中过滤掉不符合条件的数据

val rdd = sc.makeRDD(List(1,3,5,7,9));

rdd.filter(_<5);

 

mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.

该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。

案例展示:

val rdd3 = rdd1.mapPartitions{ x => {

val result = List[Int]()

var i = 0

while(x.hasNext){

i += x.next()

}

result.::(i).iterator

}}

 

scala>rdd3.collect

 

 

补充:此方法可以用于某些场景的调优,比如将数据存储数据库,

如果用map方法来存,有一条数据就会建立和销毁一次连接,性能较低
所以此时可以用mapPartitions代替map

 

mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

 

函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。

案例展示:

var rdd1 = sc.makeRDD(1 to 5,2)

 

var rdd2 = rdd1.mapPartitionsWithIndex{

(index,iter) => {

var result = List[String]()

var i = 0

while(iter.hasNext){

i += iter.next()

}

result.::(index + "|" + i).iterator

}

}

 

 

案例展示:

val rdd = sc.makeRDD(List(1,2,3,4,5),2);

rdd.mapPartitionsWithIndex((index,iter)=>{

var list = List[String]()

while(iter.hasNext){

if(index==0)

list = list :+ (iter.next + "a")

else {

list = list :+ (iter.next + "b")

}

}

list.iterator

});

 

 

union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.

案例展示:

union 并集 -- 也可以用++实现

val rdd1 = sc.makeRDD(List(1,3,5));

val rdd2 = sc.makeRDD(List(2,4,6,8));

val rdd = rdd1.union(rdd2);

val rdd = rdd1 ++ rdd2;

 

 

intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

案例展示:

intersection 交集

val rdd1 = sc.makeRDD(List(1,3,5,7));

val rdd2 = sc.makeRDD(List(5,7,9,11));

val rdd = rdd1.intersection(rdd2);

 

 

subtract

案例展示:

subtract 差集

val rdd1 = sc.makeRDD(List(1,3,5,7,9));

val rdd2 = sc.makeRDD(List(5,7,9,11,13));

val rdd =  rdd1.subtract(rdd2);

distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.

没有参数,将RDD里的元素进行去重操作

案例展示:

val rdd = sc.makeRDD(List(1,3,5,7,9,3,7,10,23,7));

rdd.distinct

 

groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 

Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 

Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

 

案例展示:

scala>val rdd = sc.parallelize(List(("cat",2), ("dog",5),("cat",4),("dog",3),("cat",6),("dog",3),("cat",9),("dog",1)),2);

scala>rdd.groupByKey()

注:groupByKey对于数据格式是有要求的,即操作的元素必须是一个二元tuple

tuple._1 是key, tuple._2是value

比如下面这种数据格式

sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)就不符合要求

以及这种:

sc.parallelize(List(("cat",2,1), ("dog",5,1),("cat",4,1),("dog",3,2),("cat",6,2),("dog",3,4),("cat",9,4),("dog",1,4)),2);

 

reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

案例展示:

scala>var rdd = sc.makeRDD( List( ("hello",1),("spark",1),("hello",1),("world",1) ) )

rdd.reduceByKey(_+_);

 

注:reduceByKey操作的数据格式必须是一个二元tuple

aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

 

使用方法及案例展示:

aggregateByKey(zeroValue)(func1,func2)

 

scala> val rdd = sc.parallelize(List(("cat",2),("dog",5),("cat",4),

("dog",3),("cat",6),("dog",3),

("cat",9),("dog",1)),2);

查看分区结果:

partition:[0]

(cat,2)

(dog,5)

(cat,4)

(dog,3)

cat   2,4=6

dog 5,3=8

 

 

partition:[1]

(cat,6)

(dog,3)

(cat,9)

(dog,1)

cat 6,9=15

dog 3,1=4

 

cat 6,15=90

dog 8,4=32

 

 

scala> rdd.aggregateByKey(0)( _+_  ,  _+_);

 

 

scala> rdd.aggregateByKey(0)(_+_,_*_);

 

  • zeroValue表示初始值,初始值会参与func1的计算
  • 在分区内,按key分组,把每组的值进行fun1的计算
  • 再将每个分区每组的计算结果按fun2进行计算

sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

案例展示:

val d2 = sc.parallelize(Array(("cc",32),("bb",32),("cc",22),("aa",18),("bb",6),("dd",16),("ee",104),("cc",1),("ff",13),("gg",68),("bb",44))) 

 

d2.sortByKey(true).collect

 

 

join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoinrightOuterJoin, and fullOuterJoin.

案例展示:

val rdd1 = sc.makeRDD(List(("cat",1),("dog",2)))

val rdd2 = sc.makeRDD(List(("cat",3),("dog",4),("tiger",9)))

rdd1.join(rdd2);

 

cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

参数是RDD,求两个RDD的笛卡儿积

案例展示:

cartesian 笛卡尔积

val rdd1 = sc.makeRDD(List(1,2,3))

val rdd2 = sc.makeRDD(List("a","b"))

rdd1.cartesian(rdd2);

 

coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

 

coalesce(n,true/false) 扩大或缩小分区

 

案例展示:

val rdd = sc.makeRDD(List(1,2,3,4,5),2)

rdd.coalesce(3,true);//如果是扩大分区 需要传入一个true 表示要重新shuffle

rdd.coalesce(2);//如果是缩小分区 默认就是false 不需要明确的传入

 

repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

repartition(n) 等价于上面的coalesce

 

Actions

Action

Meaning

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

并行整合所有RDD数据,例如求和操作

collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

返回RDD所有元素,将rdd分布式存储在集群中不同分区的数据 获取到一起组成一个数组返回

要注意 这个方法将会把所有数据收集到一个机器内,容易造成内存的溢出 在生产环境下千万慎用

 

count()

Return the number of elements in the dataset.

统计RDD里元素个数

案例展示:

val rdd = sc.makeRDD(List(1,2,3,4,5),2)

rdd.count

 

first()

Return the first element of the dataset (similar to take(1)).

take(n)

Return an array with the first n elements of the dataset.

案例展示:

take 获取前几个数据

val rdd = sc.makeRDD(List(52,31,22,43,14,35))

rdd.take(2)

takeOrdered(n[ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

案例展示:

takeOrdered(n) 先将rdd中的数据进行升序排序 然后取前n个

val rdd = sc.makeRDD(List(52,31,22,43,14,35))

rdd.takeOrdered(3)

 

top(n)

top(n) 先将rdd中的数据进行降序排序 然后取前n个

val rdd = sc.makeRDD(List(52,31,22,43,14,35))

rdd.top(3)

saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

案例示例:

saveAsTextFile 按照文本方式保存分区数据

val rdd = sc.makeRDD(List(1,2,3,4,5),2);

rdd.saveAsTextFile("/root/work/aaa")

 

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 

Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

 

DAG概念:

Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。接下来以“Word Count”为例,详细描述这个DAG生成的实现过程。

经典小案例:单词统计

1: val file=sc.textFile("hdfs://hadoop01:9000/hello1.txt")

2: val counts = file.flatMap(line => line.split(" "))

3:            .map(word => (word, 1))

4:            .reduceByKey(_ + _)

5: counts.saveAsTextFile("hdfs://...")

file和counts都是RDD,其中file是从HDFS上读取文件并创建了RDD,而counts是在file的基础上通过flatMap、map和reduceByKey这三个RDD转换生成的。最后,counts调用了动作saveAsTextFile,用户的计算逻辑就从这里开始提交的集群进行计算。

五行代码的具体实现:

1)行1:sc是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口,会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。

sc.textFile("hdfs://...")就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。

2)行2:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到MapPartitionsRDD。

3)行3:将第2步生成的MapPartittionsRDD再次经过map将每个单词word转为(word,1)的元组。这些元组最终被放到一个MapPartitionsRDD中。

4)行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。

5)行5:向HDFS输出RDD的数据内容。最后,调用org.apache.spark.SparkContext#runJob向集群提交这个计算任务。

RDD的依赖关系:

RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

1. 窄依赖:父分区与子分区是一对一关系,就是把父分区从一个形式转变为另一个形式。例如:map filter,flatMap

                   窄依赖没有shuffle过程,DAG中存在多个连续的窄依赖,整合到一起执行,这种优化方式称为流水线优化。

2.宽依赖,父分区与子分区是一对多关系

                 可以认为按照某种条件产生了分组操作,父分区的数据分发到多个子分区。例如:groupByKey, ReduceByKey等

                 spark在宽依赖操作会进行shuffle操作发生磁盘I/O,会减低性能。

                 产生shuffle时,会将产生的中间结果落地,避免当子分区数据丢失时可能导致重新计算所有父分区的情况

Spark DAG的生成与DAG的Stage(阶段)的划分和task的概念

Spark在执行任务(job)时,首先会根据依赖关系,将DAG划分为不同的阶段(Stage)。

DAG:有向无环图,当一整条RDD的依赖关系形成之后,就形成了一个DAG。一般来说,一个DAG,最后都至少会触发一个Action操作,触发执行。一个Action对应一个Job任务。

Stage:一个DAG会根据RDD之间的依赖关系进行Stage划分,流程是:以Action为基准,向前回溯,遇到宽依赖,就形成一个Stage。遇到窄依赖,则执行流水线优化(将多个连续的窄依赖放到一起执行)

task:任务。一个分区对应一个task。可以这样理解:一个Stage是一组Task的集合

版权声明:本文为qq_36250202原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_36250202/article/details/95308955

智能推荐

Python 中各种imread函数的区别与联系

  原博客:https://blog.csdn.net/renelian1572/article/details/78761278 最近一直在用python做图像处理相关的东西,被各种imread函数搞得很头疼,因此今天决定将这些imread总结一下,以免以后因此犯些愚蠢的错误。如果你正好也对此感到困惑可以看下这篇总结。当然,要了解具体的细节,还是应该 read the fuc...

用栈判断一个字符串是否平衡

注: (1)本文定义:左符号:‘(’、‘[’、‘{’…… 右符号:‘)’、‘]’、‘}’……. (2)所谓的字符串的符号平衡,是指字符串中的左符号与右符号对应且相等,如字符串中的如‘(&r...

JAVA环境变量配置

位置 计算机->属性->高级系统设置->环境变量 方式一 用户变量新建path 系统变量新建classpath 方式二 系统变量 新建JAVA_HOME,值为JDK路径 编辑path,前加 方式三 用户变量新建JAVA_HOME 此路径含lib、bin、jre等文件夹。后运行tomcat,eclipse等需此变量,故最好设。 用户变量编辑Path,前加 系统可在任何路径识别jav...

常用的伪类选择器

CSS选择器众多 CSS选择器及权重计算 最常用的莫过于类选择器,其它的相对用的就不会那么多了,当然属性选择器和为类选择器用的也会比较多,这里我们就常用的伪类选择器来讲一讲。 什么是伪类选择器? CSS伪类是用来添加一些选择器的特殊效果。 常用的为类选择器 状态伪类 我们中最常见的为类选择器就是a标签(链接)上的为类选择器。 当我们使用它们的时候,需要遵循一定的顺序问题,否则将可能出现bug 注意...

ButterKnife的使用介绍及原理探究(六)

前面分析了ButterKnife的源码,了解其实现原理,那么就将原理运用于实践吧。 github地址:       点击打开链接 一、自定义注解 这里为了便于理解,只提供BindView注解。 二、添加注解处理器 添加ViewInjectProcessor注解处理器,看代码, 这里分别实现了init、getSupportedAnnotationTypes、g...

猜你喜欢

1.写一个程序,提示输入两个字符串,然后进行比较,输出较小的字符串。考试复习题库1|要求:只能使用单字符比较操作。

1.写一个程序,提示输入两个字符串,然后进行比较,输出较小的字符串。 要求只能使用单字符比较操作。 参考代码: 实验结果截图:...

小demo:slideDown()实现二级菜单栏下拉效果

效果如下,鼠标经过显示隐藏的二级菜单栏 但是这样的时候会存在一个问题,就是鼠标快速不停移入移出会导致二级菜单栏闪屏现象,一般需要使用stop()来清除事件  ...

基于docker环境的mysql主从复制

1、安装docker 可以参考之前的博客,之前写过了~ 2、拉取mysql镜像 3、创建mysql01和mysql02实例 主: 从: 4、进入容器修改配置 1)修改主数据库配置 进入主数据库容器 切换到 etc/mysql/目录下 查看可以看到my.cnf文件,使用vim编辑器打开,但是需要提前安装 安装vim命令: 安装成功后,修改my.cnf文件 新增配置后的my.cnf: binlog 日...

机器学习算法之决策树

原文:http://www.jianshu.com/p/6eecdeee5012 决策树是一种简单高效并且具有强解释性的模型,广泛应用于数据分析领域。其本质是一颗由多个判断节点组成的树,如: 决策树 在使用模型进行预测时,根据输入参数依次在各个判断节点进行判断游走,最后到叶子节点即为预测结果。 如何构造决策树 决策树算法的核心是通过对数据的学习,选定判断节点,构造一颗合适的决策树。 假设我们从用户...

Netty实现一个简单的RPC框架

微服务 微服务通讯 API构建需要考虑的因素 通讯协议 文本协议或者二进制协议 支持的调用方式:单向、双向、Streaming API定义与声明 API容错、可伸缩性 RPC框架 REST http://www.ics.uci.edu/~fielding/pubs/dissertation/rest_arch_style.htm REST即Representational State Transf...