第一章、基础概念

目前的主流大数据技术:Hadoop、Spark、Flink、Beam.

Hadoop

Alt text

MapReduce

像写单机任务一样.

使用前提:任务必须满足可以分而治之.
Alt text

YARN

Hadoop2.0出现.

统一调配资源,实现底层数据无缝共享.
Alt text

Spark

全能选手.
Alt text

Hadoop的缺点

Alt text

有些不能转成Map和Reduce的业务无法使用Hadoop.
Alt text

需要反复迭代,磁盘IO开销大.
Alt text
Alt text
Alt text

磁盘IO,任务之间的衔接延迟高.

衔接延迟:3个Map任务全部结束才能执行REduce任务.
Alt text
因此难以胜任多阶段的复杂任务.

Spark的优势

Spark本质上也属于MapReduce,但是不再局限于Map和Reduce这两个操作,还可以执行其它操作,比如groupby,filter,join等等,因此具有更强大的表达能力.

Alt text

Spark可以将数据读到内存中进行操作,提升了迭代时的运行效率.
Alt text
Alt text

Spark使用了有向无环图的调度机制,使得相关的操作可以优化,从而很多操作可以进行流水线化处理.
Alt text
Alt text

对比Hadoop和Spark的读取和查询:Hadoop每次都要读写磁盘,而Spark可以直接在内存中完成,所以性能更好.
Alt text
Alt text

Spark和Hadoop的关系

Spark只是用来计算的,而存储仍然靠Hadoop中的HDFS。因此,Spark并不是取代Hadoop,而是取代Hadoop中的计算框架MapReduce.
Alt text
Alt text

与Spark是竞争关系.
Alt text
Alt text
Alt text

Beam

一种编程接口,就可以翻译成不同(Hadoop, Spark)框架,想一统天下,但未果.
Alt text

第二章、Spark的设计与运行原理

Alt text

Spark的特点

运行速度快.
Alt text
Alt text
Alt text

容易使用.
Alt text

通用性.
Alt text

运行模式多样.
Alt text
Alt text
Alt text

对比Spark与Hadoop

Alt text

Hadoop:
Alt text
Alt text
Alt text

Spark:
Alt text
Alt text

Spark生态系统

Alt text

需要生态系统的原因:
Alt text
Alt text
Alt text

Spark的生态系统:
Alt text
Alt text

Alt text
Alt text
Alt text

Spark的基本概念

Alt text

RDD:数据可多可少,少时可以放一台机器,多了可以存放在很多台机器的内存中.
Alt text

DAG:反映RDD之间的依赖关系
Alt text

Executor:
Alt text
Alt text
Alt text

Task:
Alt text

Job:提交Application后,会被分解成多个作业.
Alt text

Stage: 作业的基本调度单位,每个作业会被分解成多组任务,每组任务的集合就叫做Stage.
Alt text

以上概念之间的关系如下:
Alt text

Spark的架构设计

Alt text
Alt text

执行过程如下.

开始执行:
Alt text

执行完毕:
Alt text
or
Alt text

Spark运行的基本流程

Alt text
Driver节点创建一个指挥官,即SparkContext.

申请资源:
Alt text

分配资源:
Alt text

执行:
Alt text
Alt text
Alt text
Alt text

分发任务:
Alt text
遵循”计算向数据靠拢”的原则.
Alt text

执行完任务,返回结果:
Alt text

RDD运行原理: 概念、操作和特性

在MapReduce中含有大量的序列化/反序列化过程,开销大
Alt text

RDD的提供了抽象的数据结构,将一系列行为表达为转换处理:
Alt text

不同的RDD转换之间形成依赖关系:
Alt text

借助有向无环图表达依赖关系,避免了数据落磁盘,而是马上可以给另一个操作作为输入:
Alt text

RDD可以存在单个机器上:
Alt text

如果RDD很大,也可以分布式存储:
Alt text

RDD本质上是一个只读的分区记录集合,不允许被改来改去:
Alt text

如果要改,需要生成新的RDD:
Alt text

比如,给每个学生加5分:
Alt text

当然,加5分后得到的新的RDD也是只读的.

RDD的操作类型如下:
Alt text

它们都是粗粒度的,只能对整体操作:
Alt text
Alt text
Alt text

尽管如此受限,但Spark提供了丰富的转换操作,使得Spark几乎具有其它框架的所有操作:
Alt text
Alt text

Spark的执行过程:
Alt text

Alt text

注意,Spark中有一个惰性机制,即:前面的各种转换操作,并不是说立即执行,而是只有在触发动作操作后才从头开始执行,有点像tf的静态图机制.
Alt text
Alt text

RDD具有高效运算能力的3个原因.
Alt text
Alt text
天然的容错性:
Alt text

RDD的运行原理:RDD之间的依赖关系

Alt text
Alt text

是否包含Shuffle操作是区分宽依赖和窄依赖的依据.

所谓Shuffle,中文是洗牌,下面的交叉线就是一种Shuffle:
Alt text

如果发生了Shuffle,就是宽依赖,否则就是窄依赖.
Alt text

具体来讲,窄依赖指的是以下两种情况:

1.一个父RDD对应一个子RDD
Alt text

  1. 多个父RDD对应一个子RDD
    Alt text

举几个栗子:
Alt text
Alt text

而宽依赖指的是”一个父RDD对应一个RDD的多个分区”:
Alt text

举几个栗子:
Alt text
Alt text

RDD的运行原理:阶段的划分以及RDD的运行过程

Alt text

因为宽依赖不能进行流水线优化,所以才把宽依赖分成不同的阶段(Stage).

宽依赖之所以不能进行流水线优化,跟Spark的优化原理有关.

RDD的一次转换,就是一次fork和join的过程:fork负责将每个分区送到不同机器上并行执行,再提供join把每台机器上的结果组合起来得到结果.
Alt text

fork&join操作一般是多次的:
Alt text

举个栗子:
全校学生从北京飞到厦门,一种方案如下:
Alt text

这个方案中,在上海的join操作存在无意义的等待,因此可以优化如下:不在上海停
Alt text

这样时间从6小时变成了5小时,优化后的方案其实就对应了窄依赖,即:可以进行流水线优化.
Alt text

如果现在条件变了,要求在上海之后,改为班长带1班飞…,那么各个同学就要从以男女为单位改成以班级为单位,于是发生了数据Shuffle,而只要哦发生Shuffle就一定会写磁盘,因此就无法进行流水线优化,这就对应了宽依赖:
Alt text

以上就解释了为什么宽依赖为什么需要划分成多个阶段(Stage)进行处理.

那如何划分成多个阶段呢?
Alt text
即:窄依赖可单独作为一个阶段,宽依赖划分成多个不同阶段(每个阶段是窄依赖的).

一个划分的栗子:
Alt text

每一个阶段内部都没有Shuffle,都是窄依赖,可以流水线化处理,而阶段与阶段之间存在Shuffle,因而存在等待.

总结一下RDD的整个运行过程:

把写的代码交给框架,框架会转成DAG图,然后分成不同阶段,向右走,向上走.
Alt text

Spark的部署和应用方式

可单机也可集群.
Alt text

3种集群模式:
Alt text
Alt text
Alt text

第三章、环境配置

在pyspark中运行代码

Alt text

Alt text
Alt text
Alt text

Alt text

举个栗子:
Alt text

启动时只写pyspark,则默认进入local[*]模式:
Alt text

Alt text

Alt text

开发Spark独立应用程序

一个简单的栗子:
Alt text

写完代码后,可以直接使用python运行:
Alt text

也可以使用spark-submit提交应用程序:
Alt text
Alt text

一个调试小技巧:
Alt text

Spark集群环境搭建

配置主节点:
Alt text
Alt text
Alt text

配置从节点:
Alt text
slave01和slave02是(从)主机的具体名称.

Alt text

第一行完成Spark和Hdoop的挂接

第二行说明Hadoop相关配置信息的目录

第三行设置Spark管家节点(主节点)的ip地址(事先配置好的)

完成以上操作后:
Alt text
Alt text

开始启动,先启动Hadoop, 后启动Spark:
Alt text
Alt text

在浏览器输入访问地址即可进入:
Alt text

使用完毕,关闭:
Alt text

##在集群上运行Spark应用程序

Alt text

  • 使用Standalone集群管理器

启动&运行:
Alt text

运行pyspark:
Alt text

查看运行信息:
Alt text

  • 使用Hadoop YARN集群管理器

启动&运行:
Alt text

运行pyspark:

Alt text

查看运行信息:
Alt text
Alt text

查看整个集群的运行信息:
Alt text

第四章、RDD编程

Alt text
RDD编程,指的是对Spark Core进行编程.

由于Spark Core就是对RDD进行不断地转换,因此被称为RDD编程.

RDD编程基础如下:
Alt text

接下来各个击破.

RDD创建

主要有以下两种RDD创建方式:
Alt text

  • 从本地加载数据

Alt text
Alt text

举个栗子:
Alt text
sc是SparkContext对象,位于Master节点中.

在交互式环境中,系统已经默认创建好了一个SparkContext 对象,而单独一个py文件中,则需要自行导入.

第一行将本地的word.txt从磁盘加载进来,生成一个RDD(抽象结构).

Alt text

  • 从分布式文件系统HDFS中加载数据

Alt text

Alt text

  • 通过并行集合(数组)创建RDD

Alt text
Alt text
Alt text

RDD操作:转换操作

Alt text

所有这些转换操作构成了一张有向无环图DAG:
Alt text

转化操作并不会发生真正的计算:
Alt text

只有遇到动作操作时,才会发生真正的从头到尾的计算:
Alt text

这被称为RDD转换操作的惰性机制.

常用的RDD转换操作如下:
Alt text

  • filter操作

Alt text
Alt text

  • map操作

Alt text
Alt text

Alt text
Alt text

  • flatMap

Alt text
Alt text

  • groupByKey

Alt text
Alt text

  • reduceByKey

对所有key相同的键值对做聚合操作.
Alt text
Alt text
Alt text

RDD操作:行动操作

Alt text
Alt text

其中reduce的执行过程如下:
Alt text

Alt text

RDD持久化

看个栗子:
Alt text

由于每遇到一个行动操作,就会从头到尾执行一次,所以在运行第3行和第4行时,都分别执行了第1行的操作,造成了重复生成相同的rdd.

所以可以将rdd持久化,即缓存下来:
Alt text

常用持久化方法如下:
Alt text
Alt text
Alt text

当不再需要持久化存储时,可以手动移除:
Alt text

看个栗子:
Alt text

RDD分区

分区的作用
增加并行度
Alt text

减少通信开销
Alt text
Alt text
Alt text

分区的原则
Alt text

不同模式下的分区数:
Alt text
Alt text
Alt text

而对于Standalone和YARN模式,分区数=max(集群中所有CPU核心数目的总和, 2).

设置分区的方法:
Alt text
Alt text
Alt text

自定义分区的方法:
Alt text

举个栗子:
Alt text

Alt text
Alt text

之所以弄成键值对(值全为1)的形式,是因为partitionBY只能操作键值对.

最后保存的路径仅仅传入了一个目录,是因为分区后会形成10个分区,每个分区一个文件,所有在传入目录下会新生成共10个文件.

写完代码,就可以运行了:
Alt text
Alt text

##一个综合实例:词频统计
Alt text
实现代码如下:
Alt text

具体解释:
Alt text
Alt text
Alt text
Alt text

以上是在单机上进行的,当数据分布在多台机器上时,需要多机并行执行.

此时需要把Hadoop和Spark同时部署在同一个集群中,具体地,将集群中的某个节点既作为HDFS的存储节点,同时也作为Saprk的工作节点.
Alt text
Alt text

键值对RDD的创建

键值对RDD: RDD中的每一个元素都是一个键值对.

Alt text
Alt text

键值对RDD的转换操作

Alt text

  • reduceByKey

对所有key相同的键值对做聚合操作.
Alt text

  • groupByKey

对具有相同key的键值对进行分组,不进行聚合计算.
Alt text
Alt text

对比reduceByKey和groupByKey:
Alt text
Alt text

  • keys ,values

把Pair(键值对)RDD 中的key(value)回形成一个新的RDD.
Alt text
Alt text

  • sortByKey

Alt text

对比sortByKey和sortBy:

前者只能根据key进行排序,而后者更灵活.
Alt text
Alt text

  • mapValues

对键值对RDD中的每个value都应用一个函数,key不会发生变化.

Alt text

  • join

Alt text

一个综合实例

Alt text
Alt text

文件数据读写

本地文件的读写:
Alt text
Alt text
Alt text

分布式文件的读写:
Alt text
Alt text

HBase数据读写:HBase简介

Alt text
Alt text

HBase是Hadoop家族的一个组件,是一个分布式数据库,是构建在HDFS基础上的,也就是说,HBase是一个分布式数据库,但它的数据被保存在了底层的HDFS中.

Alt text
Alt text

Alt text
Alt text
Alt text

在关系型数据库中,给定行和列(2维)就能够定位:
Alt text

而在HBase中,则需要4维才能定位:
Alt text

Alt text
Alt text

HBase数据读写:创建HBase表

安装HBase,然后先启动Hadoop,再启动HBase:
Alt text
Alt text

假设要创建的student表如下:
Alt text

开始创建:
Alt text

关系型数据可以一列直接写入,但HBase只能一个单元格一个单元格的写:
Alt text
Alt text

HBase数据读写:配置Spark

Alt text
Alt text
Alt text
Alt text

HBase数据读写:编写程序读取HBase数据

Alt text
Alt text

count统计总共有多少个键值对(行键:行键对应的值).

Alt text
Alt text

HBase数据读写:编写程序向HBase写入数据

Alt text
Alt text
Alt text
Alt text

案例1:求top值

Alt text

代码:
Alt text

逐行解释:
Alt text
Alt text
之所以弄成键值对,是因为接下来的SortByKey排序是针对于键值对进行的;repartition(1)将分区弄成1个,保证全局有序:
Alt text
Alt text

案例2:文件排序

Alt text

代码:
Alt text

逐行解释:
Alt text
Alt text
Alt text
Alt text

案例3:二次排序

Alt text
Alt text

思路:用一个辅助的SecondSortKey,排好序后再丢掉它,只保留value:
Alt text
具体思路如下:
Alt text

代码:
Alt text
Alt text

逐行解释:
Alt text
Alt text
Alt text

第五章、SparkSQL

SparkSQL简介

在Hadoop平台上的Hive本身不存储数据,而是使用底层HDFS中存储的数据,因此,Hive可以看作是一个编程接口:
Alt text

在Spark平台上,直接引进了Hive,称之为Shark:
Alt text

对比而言,在Hadoop中,Hive将SQL语句转换成底层的MapReduce程序,而Shark将SQL转换成底层的Spark程序。

Shark既有优点也有缺点。

优点:
Alt text

缺点:
Alt text
Alt text

2014年6月1日,停止了对Shark的开发。

此后有了两个分支:
Alt text

SparkSQL出现了。

Alt text
Alt text
Alt text

推出SparkSQL的原因:
Alt text

DataFrame概述

Alt text

SparkCore的数据抽象是RDD,SparkSQL的数据抽象是DataFrame.
Alt text

DataFrame的创建

Alt text
Alt text

两种创建DataFrame的写法:
Alt text
Alt text

实例:
Alt text

读取上述json文件的代码:
Alt text

DataFrame保存

Alt text
Alt text

DataFrame的常用操作

首先读取文件得到DataFrame,然后执行printSchema()操作来打印模式信息,用select()选取某些列:
Alt text

执行常用操作:
Alt text

排序:
Alt text

将RDD格式数据转换成DataFrame的方法: 利用反射机制推断RDD模式

上栗子。

Alt text
Alt text

代码实现:
Alt text

逐行解释:
Alt text

这样得到了一个新的RDD,其中的每个元素都是Row()对象。

代码中的 .rdd将DataFrame转成RDD格式的数据。

将RDD格式数据转换成DataFrame的方法: 利用编程方式定义RDD模式

当无法提前获知数据结构时,采用编程方式定义RDD模式。

上栗子。
Alt text
Alt text

代码实现:
Alt text
Alt text
Alt text

MySQL数据库准备工作

Alt text
Alt text

下载相应驱动程序,以使得SparkSQL能够读取MySQL:
Alt text

将驱动程序放在jar文件夹,启动Spark.
Alt text

使用SparkSQL读写MySQL数据库

Alt text

第2个option中的spark指的是mysql数据库中的spark这个数据库,是一个数据库的名字;

第3个option中的student指的是要使用这个数据库下的名字为student的这张表。

Alt text
Alt text

上面生成了表头,接下来生成表的内容,然后将表头和表的内容结合起来,写入MySQL数据库中:
Alt text

部分代码解释如下:
Alt text

Alt text

第六章、Spark Streaming

流计算概述

Alt text

典型的数据类型可划分为两种:静态数据流数据

静态数据:
Alt text

流数据:
Alt text

比如:
Alt text

流数据的特征:
Alt text

流计算出现的原因:
Alt text

流计算:
Alt text

Alt text

鉴于此,流计算框架出现了,它需要满足如下特点:
Alt text

  • 高性能:每秒处理几十万条数据
  • 海量式:支持TB级、PB级的数据规模
  • 实时性:低延迟、达到秒级、毫秒级
  • 分布式:支持大数据基本架构,平滑扩展
  • 易用性:快速进行开发和部署
  • 可靠性:可靠地处理流数据

三类流计算框架和平台:
Alt text

Alt text
Alt text
Alt text

传统的数据处理流程:先把数据存起来,再有用户主动发起查询
Alt text

流计算数据处理流程:
Alt text

比如:
Alt text

数据实时计算:
Alt text

实时查询服务:
Alt text

传统的数据处理流程中,需要应护照互动发起查询,而在流数据处理系统中,数据是主动推送给用户的
Alt text

总结流处理系统和传统的数据处理系统的区别:
Alt text

Spark Streaming

Alt text

SparkStreaming并不是真正的流计算看框架,而是把连续的数据流切分成许多段,用快速的微小批处理来模拟流计算:
Alt text
Alt text

SparkStreaming的数据抽象是DStream:
Alt text

之前介绍的Spark Core和Spark SQL也有相应的数据抽象:
Alt text

对比SparkStreaming和真正的流计算框架Storm:
Alt text

SparkStreaming容错性强,兼容批量和实时数据处理。

Spark出现之前:
Alt text

Spark出现之后:
Alt text

DStream操作概述

DStream本质上就是一堆RDD的集合。

SparkStreaming的工作机制:
Alt text

receiver组件一直在监控有没有数据到达,这些数据的来源很多,比如套接字流,文件流,从Kafka中读取的输入流:
Alt text

receiver组件将接收到的数据送给流计算框架进行处理。

编写Spark Streaming程序固有的步骤:
Alt text
Alt text
Alt text
Alt text
Alt text

创建一个DStream对象:
Alt text

Alt text
Alt text

Spark Streaming基本数据源:文件流

  1. 在pyspark交互窗口中创建文件流。

先创建一个文件夹:
Alt text

只要文件夹下有新文件生成,就会被监控到。

开始创建文件流(统计英文词频):
Alt text

其中ssc.textFileStream()定义了一个文件流输入源。

经过lines.flatMap后得到一个RDD,RDD中包含很多元素,每个元素是一个单词。

ssc.start()启动流计算:
Alt text

  1. 采用独立应用程序方式创建文件流

Alt text
Alt text

执行:
Alt text

Spark Streaming基本数据源:套接字流(使用NC程序产生数据)

Socket编程:
Alt text

流计算程序扮演着TCP客户端的角色。

Alt text
Alt text

两个参数,加一个默认的self参数,共3个参数。

现在来定义服务端,这里使用inux中的NC程序:
Alt text

有了服务端和客户端,就可以启动服务了:
Alt text

Spark Streaming 基本数据源:套接字流(使用Socket编程实现自定义数据)

Alt text
Alt text

server.accept()一直在等待客户端的连接。

Alt text

启动服务端后,它就会一直监听看有没有客户端向服务端的localhost:9999发起请求。

当客户端请求到来后,服务端就会响应请求(发送相应的数据给客户端)。
Alt text

Spark Streaming基本数据源:RDD队列流

Alt text

每隔一秒钟就创建一个RDD,将其塞入RDD队列中,每隔两秒对数据进行处理
Alt text

代码如下(统计1到1000内的数字模10的余数出现次数):
Alt text

for循环每隔一秒钟就给rddQueue塞入一个RDD,每个RDD都是1到1000的列表,分成10个分区。

Alt text

Spark Streaming高级数据源:Apache Kafka

  1. Kafka准备工作。

关于Kafka:
Alt text

Alt text

也就是说,在进行不同生态的数据和Hadoop之间的数据交换时,以前需要针对Hadoop的每一个组件开发相应的接口,现在可以只面向Kafka,后续操作交给Kafka就可以了,所以说Kafka作为一个信息传递的枢纽。

Kafka的组件:
Alt text
Alt text
Alt text
Alt text
Alt text
Alt text

一个完整的Kafka结构:
Alt text

Kafka配置&启动:

在安装完Kafka后:
Alt text
Alt text
Alt text
Alt text

产生的数据被存储在名称为wordsendertest的topic中,等待消费者取走。

Alt text

2.Spark准备工作。

Alt text
Alt text
Alt text
Alt text

开始编写流计算程序:
Alt text

需要传入的参数有zookeeper的地址zkQuorum以及主题的名称topic.

kvs是建立的Kafka数据源,里面是键值对,只取值,这里的值是一个一个的句子,存入lines.

开始执行:
Alt text

DStream的转换操作:无状态转换操作

Alt text
Alt text
Alt text
Alt text
Alt text
Alt text
Alt text

countByValue:
Alt text
Alt text
Alt text
Alt text
Alt text

Alt text

DStream的转换操作:有状态转换操作

滑动窗口转换操作

Alt text
Alt text
Alt text

30是滑动窗口长度,10是滑动时间间隔,即每个10秒滑动一次。

离开窗口的(蓝色),用x-y函数给移出去,新加入窗口的(橘色),用x+y函数给加进来,仍留在窗口内的保持不变,这样就极大的减少了计算量。

启动:
Alt text

updateStateByKey

Alt text

Alt text

Alt text

输出操作

把DStream输入到文本文件中:
Alt text
Alt text

把DStream写入到MySQL数据库中 :
Alt text
Alt text

以下是完整的代码文件:
Alt text
Alt text
Alt text
Alt text

在func中,将分区设置为3,避免打开过多的数据库链接,因为原本可能有许多分区;接着使用dbfunc对个分区中的每一个键值对(单词,词频)进行处理并保存(每个分区包含多个这样的键值对)。

在dbfunc中,输入的是一个分区中的所有键值对,然后利用doinsert将每一个键值对插入到数据库中。