~>_<~

Spark 简介

Apache Spark 是专为⼤规模数据处理⽽设计的快速通⽤的计算引擎。Spark是UC Berkeley AMP lab (加州⼤学伯克利分校AMP实验室)开源的类Hadoop MapReduce的通⽤并⾏框架,Spark拥有Hadoop MapReduce所具有并⾏计算的优点;但不同于MapReduce的是-Job中间输出结果可以保存在内存中,从⽽不再需要读写HDFS,因此Spark能更好地适⽤于数据挖掘与机器学习等需要迭代的MapReduce的算法。
img

Spark 发展历史

对于⼀个具有相当技术⻔槛与复杂度的平台,Spark从诞⽣到正式版本的成熟,经历的时间如此之短,让⼈感到惊诧。2009年,Spark诞⽣于伯克利⼤学AMPLab,最开初属于伯克利⼤学的研究性项⽬。它于2010年正式开源,并于2013年成为了Aparch基⾦项⽬,并于2014年成为Aparch基⾦的顶级项⽬,整个过程不到五年时间。Apache Spark是专为⼤规模数据处理⽽设计的快速通⽤的计算引擎 ,现在形成⼀个⾼速发展应⽤⼴泛的⽣态系统。
img
Spark提供的基于RDD的⼀体化解决⽅案,将MapReduce、Streaming、SQL、MachineLearning、Graph Processing等模型统⼀到⼀个平台下,并以⼀致的API公开,并提供相同的部署⽅案,使得Spark的⼯程应⽤领域变得更加⼴泛。Spark的特⾊在于它⾸先为⼤数据应⽤提供了⼀个统⼀的平台。从数据处理层⾯看,模型可以分为批处理、交互式、流处理等多种⽅式;⽽从⼤数据平台⽽⾔,已有成熟的Hadoop、Cassandra、Mesos以及其他云的供应商。

Spark & hadoop 关系

Spark是对Map Reduce计算模型的改进,也可以理解为没有HDFS和MapReduce,就没有现在的Spark。 Spark可以使⽤YARN作为它的集群管理器,并且可以处理HDFS的数据。这对于已经部署Hadoop集群的⽤户特别重要,毕竟不需要做任何的数据迁移就可以使⽤Spark的强⼤处理能⼒。
img

Spark 计算&Hadoop MapReduce

Hadoop中Map Redcue由Map和Reduce两个阶段,并通过shuffle将两个阶段连接起来的。
img
但是套⽤Map Reduce模型解决问题,不得不将问题分解为若⼲个有依赖关系的⼦问题,每个⼦问题对应⼀个Map Reduce作业,最终所有这些作业形成⼀个DAG。Spark是通⽤的DAG框架,可以将多个有依赖关系的作业转换为⼀个⼤的DAG。
img
Spark核⼼思想是将Map和Reduce两个操作进⼀步拆分为多个元操作,这些元操作可以灵活组合,产⽣新的操作,并经过⼀些控制程序组装后形成⼀个⼤的DAG作业。由于Hadoop有多个MapReduce作业组成,每个作业都会从HDFS上读取⼀次数据和写⼀次数据(默认写三份),即使这些MapReduce作业产⽣的数据是中间数据也需要写HDFS。这种表达作业依赖关系的⽅式⽐较低效,会浪费⼤量不必要的磁盘和⽹络IO,根本原因是作业之间产⽣的数据不是直接流动的,⽽是借助HDFS作为共享数据存储系统。但是在Spark中,使⽤内存(内存不够使⽤本地磁盘)替代了使⽤HDFS存储中间结果。对于迭代运算效率更⾼。
img

注意:中间结果是指reduce操作后⽣成的结果,所以在⽐较Spark和Hadoop的计算模型的时候⼀般对⽐的是多个阶段的计算。

Spark 架构介绍

Spark的整体流程为:Client提交应⽤,Master找到⼀个Worker启动Driver,Driver向Master或者资源管理器申请资源,之后将应⽤转化为RDD Graph,再由DAGScheduler将RDD Graph转化为Stage的有向⽆环图提交给TaskScheduler,由TaskScheduler提交任务给Executor执⾏。在任务执⾏过程中,其他组件协同⼯作,确保整个应⽤顺利执⾏。
img
Spark集群部署后,需要在主节点和从节点分别启动Master进程和Worker进程,对整个集群进⾏控制。在⼀个Spark应⽤的执⾏过程中,Driver和Worker是两个重要⻆⾊。Driver程序是应⽤逻辑执⾏的起点,负责作业的调度,即Task任务的分发,⽽多个Worker⽤来管理计算节点和创建Executor并⾏处理任务。在执⾏阶段,Driver会将Task和Task所依赖的file和jar包序列化后传递给对应的Worker机器,同时Executor对相应数据分区的任务进⾏处理。

基本组件介绍

ClusterManager: 在Standalone模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器。
Worker: 从节点,负责控制计算节点,启动Executor或Driver。在YARN模式中为NodeManager,负责计算节点的控制。
Driver: 运⾏Application的main()函数并创建SparkContext。
RDD: Spark的基本运算单元,⼀组RDD可形成执⾏的有向⽆环图RDD Graph。
DAGScheduler: 实现将Spark作业分解成⼀到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后⽣成相应的Task Set放到TaskScheduler中。
TaskScheduler: 将任务(Task)分发给Executor执⾏。
Stag: ⼀个Spark作业⼀般包含⼀到多个Stage。
Task: ⼀个Stage包含⼀到多个Task,通过多个Task实现并⾏运⾏的功能。

Spark 安装部署

1. 安装 CentOS-6.5 64bit 环境,关闭防⽕墙

1
2
3
4
5
[root@CentOS ~]# service iptables stop
iptables: Setting chains to policy ACCEPT: filter [ OK ]
iptables: Flushing firewall rules: [ OK ]
iptables: Unloading modules: [ OK ]
[root@CentOS ~]# chkconfig iptables off

2. 安装配置 jdk-8u171-linux-x64.rpm 配置 JAVA_HOME 环境变量(/root/.bashrc)

1
2
3
4
5
6
7
JAVA_HOME=/usr/java/latest
CLASSPATH=.
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME
export CLASSPATH
export PATH
[root@CentOS ~]# source .bashrc

3. 配置 IP 和主机名映射关系

1
2
3
4
5
6
[root@CentOS ~]# vi /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4
localhost4.localdomain4
::1 localhost localhost.localdomain localhost6
localhost6.localdomain6
192.168.80.128 CentOS

4. 下载 Spark 安装包spark-2.3.0-bin-hadoop2.6.tgz解压

1
[root@CentOS ~]# tar -zxf spark-2.3.0-bin-hadoop2.6.tgz -C /usr

Spark 测试

1. 本地测试 Spark

1
2
[root@CentOS spark-2.3.0-bin-hadoop2.6]# ./bin/spark-shell --master
local[5]

2. 启动 Spark 集群

1
2
3
4
5
[root@CentOS spark-2.3.0-bin-hadoop2.6]# ./sbin/start-master.sh 【启动
Master】
[root@CentOS spark-2.3.0-bin-hadoop2.6]# ./sbin/start-slave.sh --cores 2 --memory 512m spark://CentOS:7077 【启动work节点】
[root@CentOS spark-2.3.0-bin-hadoop2.6]# ./bin/spark-shell --master
spark://CentOS:7077 --executor-memory 512M --total-executor-cores 1

RDD算⼦

RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象。RDD是Spark的最基本抽象,是对分布式内存的抽象使⽤,实现了以操作本地集合的⽅式来操作分布式数据集的抽象实现。
RDD是Spark最核⼼的东⻄,它表示已被分区、不可变的并能够被并⾏操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下⼀个操作可以直接从内存中输⼊,省去了MapReduce⼤量的磁盘IO操作。这对于迭代运算⽐较常⻅的机器学习算法, 交互式数据挖掘来说,效率提升⽐较⼤。

RDD 特点

RDD计算流程

Spark的输⼊、运⾏转换、输出。在运⾏转换中通过算⼦对RDD进⾏转换。算⼦是RDD中定义的函数,可以对RDD中的数据进⾏转换和操作。
img
1、输入:在Spark程序运行中,数据从外部数据空间(例如,HDFS、Scala集合或数据)输入到Spark,数据就进入了Spark运行时数据空间,会转化为Spark中的数据块,通过BlockManager进行管理。
2、运行:在Spark数据输入形成RDD后,便可以通过变换算子fliter等,对数据操作并将RDD转化为新的RDD,通过行动(Action)算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
3、输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS)或Scala数据或集合中(collect输出到Scala集合,count返回Scala Int型数据)。

RDD编程模型

来看一段代码:textFile算子从HDFS读取日志文件,返回“file”(RDD);filter算子筛出带 “ERROR” 的行,赋给 “errors”(新RDD);cache算子把它缓存下来以备未来使用;count算子返回 “errors” 的行数。RDD看起来与Scala集合类型 没有太大差别,但它们的数据和运行模型大相迥异。

1
2
3
4
var file= sc.textFile("hdfs://CentOS:9000/demo/src/access.log")
var errors = file.filter(_.contains("ERROR"))
errors.cache()
errors.count()

上⾯代码给出了RDD数据模型,并将上例中⽤到的四个算⼦映射到四种算⼦类型。Spark程序⼯作在两个空间中:Spark RDD空间和Scala原⽣数据空间。在原⽣数据空间⾥,数据表现为标量(scalar,即Scala基本类型,⽤橘⾊⼩⽅块表示)、集合类型(蓝⾊虚线 框)和持久存储(红⾊圆柱)。
img

RDD运算逻辑

在Spark应用中,整个执行流程在逻辑上运算之间会形成有向无环图。Action算子触发之后会将所有累积的算子形成一个有向无环图,然后由调度器调度该图上的任务进行运算。Spark的调度方式与MapReduce有所不同。Spark根据RDD之间不同的依赖关系切分形成不同的阶段(Stage),一个阶段包含一系列函数进行流水线执行。图中的A、B、C、D、E、F、G,分别代表不同的RDD,RDD内的一个方框代表一个数据块(分区)。数据从HDFS输入Spark,形成RDD A和RDD C、RDD E,RDD C上执行map操作,转换为RDD D,RDD E和 RDD D 合并形成 RDD F,RDD B和RDD F进行join操作转换为G,而在B到G的过程中又会进行Shuffle。最后RDD G通过函数saveAsSequenceFile输出保存到HDFS中。
img

RDD依赖关系(narrow | wide)

RDD间依赖分为 窄依赖 (narrow dependencies) 和宽依赖 (wide dependencies) 。窄依赖是指父 RDD 的每个分区都只被子RDD的一个分区所使用,例如map、filter。相应的,那么宽依赖就是指父 RDD 的分区被多个子 RDD 的分区所依赖,例如groupByKey、reduceByKey等操作。如果父RDD的一个Partition被一个子RDD的Partition所使用就是窄依赖,否则的话就是宽依赖。
img
窄依赖支持在一个结点上管道化执行。例如基于一对一的关系,可以在 filter 之后执行map 。其次,窄依赖支持更高效的故障还原。因为对于窄依赖,只有丢失的父 RDD 的分区需要重新计算。而对于宽依赖,一个结点的故障可能导致来自所有父 RDD 的分区丢失,因此就需要完全重新执行。因此对于宽依赖,Spark 会在持有各个父分区的结点上,将中间数据持久化来简化故障还原,就像 MapReduce 会持久化 map 的输出一样。Spark状态切换是遇到宽依赖就会生成一个State。

RDD操作

1)创建RDD

1
2
3
4
5
scala> sc.makeRDD(List(1,2,3,4,5,6))
scala> sc.makeRDD(List(1,2,3,4,5,6))
scala> sc.makeRDD(Array(1,2,3,4,5,6))
scala> sc.parallelize(Array(1,2,3,45),10)
scala> sc.textFile("/root/hello.txt")

2)RDD算⼦(转换算⼦、Action算⼦)
转换算⼦
img
Action算⼦
img

map、flatMap、filter⽤法

1
2
3
4
5
6
7
8
val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
val rdd2 =sc.parallelize(List(5,6,4,7,3,8)).map(_*2).sortBy(x=>x,true)
val rdd3 = rdd2.filter(_>5)
val rdd4 sc.parallelize(List(5,6,4,7,3,8)).map(_*2).sortBy(x=>x+"",true)
val rdd5 =
sc.parallelize(List(5,6,4,1,10)).map(_*2).sortBy(x=>x.toString,true)
val rdd6 = sc.parallelize(Array("a b c", "d e f", "h i j"))
rdd6.flatMap(_.split(' ')).collect

union求并集(类型要⼀致)

1
2
3
4
val rdd6 = sc.parallelize(List(5,6,4,7))
val rdd7 = sc.parallelize(List(1,2,3,4))
val rdd8 = rdd6.union(rdd7)
rdd8.distinct.sortBy(x=>x).collect

intersection求交集

1
2
3
val rdd1 = sc.parallelize(List(("zs", 1), ("ls", 2), ("ww", 3)))
val rdd2 = sc.parallelize(List(("zs", 9), ("win7", 8), ("zl", 7)))
rdd1.intersection(rdd2).collect

join操作

1
2
3
4
5
val rdd1 = sc.parallelize(List(("zs", 1), ("ls", 2), ("ww", 3)))
val rdd2 = sc.parallelize(List(("zs", 9), ("win7", 8), ("zl", 7)))
rdd1.join(rdd2).collect
rdd1.leftOuterJoin(rdd2).collect
rdd1.rightOuterJoin(rdd2).collect

groupByKey

1
2
3
4
val rdd1 = sc.parallelize(List(("zs", 1), ("ls", 2), ("ww", 3)))
val rdd2 = sc.parallelize(List(("zs", 9), ("win7", 8), ("zl", 7)))
val rdd3 = rdd1 union rdd2
rdd3.groupByKey().map(x => (x._1,x._2.sum))

WordCount

1
2
sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((_,1)).groupByKey.map(t=>(t._1, t._2.sum)).collect

cogroup(相同key聚集在⼀起)

1
2
3
4
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3),("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
val rdd3 = rdd1.cogroup(rdd2)
val rdd4 = rdd3.map(t=>(t._1, t._2._1.sum + t._2._2.sum))

笛卡尔积

1
2
3
val rdd1 = sc.parallelize(List("tom", "jerry"))
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
val rdd3 = rdd1.cartesian(rdd2)

Spark 实战案例

例⼦1,统计字符

1
2
3
4
scala> sc.wholeTextFiles("/root/demo/src").map(x => for(i <-
x._2.split("\n"))yield i).flatMap(x=>x).map(x => for(i <- x.split("
"))yield (i,1) ).flatMap(x =>
x).reduceByKey(_+_).sortBy(_._2,false).collect

例⼦2,美国 1880 - 2014 年新⽣婴⼉数据统计

下载数据
https://catalog.data.gov/dataset/baby-names-from-social-security-card-applications-national-level-data
img
代码模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.baizhi.demo
import org.apache.spark.{SparkConf, SparkContext}
object BabyCount {
def main(args: Array[String]): Unit = {
var conf=new SparkConf().setAppName("BabyCount")
var sc=new SparkContext(conf)
sc.wholeTextFiles("/root/demo/src",40)
.map((x)=> for (i <- x._2.split("\r\n")) yield(x._1.substring(20,
24), i)).flatMap(x => x).map((x) => (x._1, x._2.split(",")
(3).toInt*x._2.split(",")(2).toDouble)).reduceByKey((x,y)=> (x+y),1)
.map(x => (x._1+"\t"+x._2)).saveAsTextFile("/root/res")
sc.stop()
}
}

提交任务

1
2
3
[root@CentOS spark-2.3.0-bin-hadoop2.6]# ./bin/spark-submit --class
com.baizhi.demo.BabyCount --executor-memory 512m --total-executor-cores 1
--master spark://CentOS:7077 /root/babycount-1.0.jar

例⼦3,统计⽤户订单数据

模拟数据
img
代码模型

1
2
3
4
5
6
7
[root@CentOS spark-2.3.0-bin-hadoop2.6]# ./bin/spark-shell --master
spark://CentOS:7077 --executor-memory 512M --total-executor-cores 1

scala> sc.wholeTextFiles("/root/demo/src",40).map((x)=> for (i <-
x._2.split("\n")) yield (i)).flatMap(x => x).map(x => (x.split(" ")(4) ,
x.split(" ")(2).toDouble * x.split(" ")
(3).toInt)).reduceByKey(_+_,1).sortBy(_._2,true).collect

IDEA开发 Spark
1) 导⼊依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.1</scala.version>
<spark.version>2.3.0</spark.version>
<hadoop.version>2.6.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependen
cies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

2)设置IDEA的Scala的编译版本为scala-2.11版本
img

3) 编写wordcount代码实现计算

1
2
3
4
5
6
var conf=new SparkConf().setAppName("WordsCount");
var sc=new SparkContext(conf)
sc.textFile(args(0)).flatMap(_.split(" "))
.map((_,1)).reduceByKey(_+_).sortBy(_._2,false)
.map(x=>(x._1+"\t"+x._2)).saveAsTextFile(args(1))
sc.stop()

4)调⽤spark-submit提交任务

1
2
[root@CentOS spark-2.3.0-bin-hadoop2.6]# ./bin/spark-submit --master
spark://CentOS:7077 --class com.baizhi.demo.WordCount --executor-memory 512m --total-executor-cores 1 /root/bb-1.0.jar /root/hello.txt /root/res

5)访问主⻚⾯查看计算结果;
img