失效链接处理 |
Spark性能优化 PDF 下载
本站整理下载:
相关截图:
主要内容:
Spark performance optimization
一般在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。资源参数设置过小,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。
我们以pyspark开发的代码为例子来说明一下。
运行pyspark程序可以使用终端命令模式,也是就是在Linux终端输入pyspark,然后复制粘贴代码,也可以spark-submit命令行或像Hive一样用yarn调度运行。
# -*-coding:utf-8-*-
from pyspark.sql import HiveContext, SparkSession
# 初始化SparkContext,同时启用Hive支持,
# 将终端命令行的测试模式下输出字段的最大长度设置为100个字符
spark = SparkSession.builder.appName("name").config(
"spark.debug.maxToStringFields", 100).enableHiveSupport().getOrCreate()
# 初始化HiveContext
hive = HiveContext(spark.sparkContext)
# 启用SparkSQL的表连接支持
spark.conf.set("spark.sql.crossJoin.enabled", "true")
# 读取parquet文件数据的代码
# Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,AWS中也使用
# parquet文件数据存储在AWS S3上
# AWS使用S3作为数据存储的服务,S3 全名是 Simple Storage Service,也就是简便的存储服务
df1 = spark.read.load(
path='<parquet文件路径>',
format='parquet', header=True)
# 读取CSV文件数据的代码
# 这边以CSV文件作为手工交换文件的标准,
# 主要的原因是因为csv的格式简单,它的数字类型数据是字符串存储的,所以它的精度也可以保证
df2 = spark.read.load(
path='<csv文件路径>',
format='csv', header=True)
# 读取Hive表或视图数据的代码
df3 = hive.sql("""
select
*
from <数据库名>.<表名>""")
# 对于多次使用的小表数据集进行数据的内存缓存(第一条Spark优化策略),
# 这样的话,之后pyspark代码在多次调用数据的时候Spark就不会重复地读取相同的文件数据了
df4 = spark.read.load(
path='<parquet文件路径>',
format='parquet', header=True).cache()
# 将刚才得到的数据集命名,以便放入SparkSQL编写查询语句
df1.createOrReplaceTempView("DF1")
df2.createOrReplaceTempView("DF2")
df3.createOrReplaceTempView("DF3")
df4.createOrReplaceTempView("DF4")
# 创建SparkSQL数据集的代码
# 如果数据量比较多,而且业务逻辑复杂的话,可以将数据临时缓存在存储服务/磁盘上,
# 从而避免之后pyspark代码在使用SparkSQL调用这里的SparkSQL数据集的时候,
# 这里的SparkSQL数据集被重复运行计算逻辑,从而节约计算资源(第二条Spark优化策略)
df5 = spark.sql("""
SELECT
...
from DF1 AS D1
LEFT JOIN DF2 AS D2
ON ...
LEFT JOIN DF4 AS D4
ON ...
WHERE ...
""").persist()
# 由于count是Action算子,会触发spark-submit事件,让之前的persist()缓存操作即刻生效,
# 不使用count()操作,persist()缓存操作会在下一个Action算子处或程序结束处生效
df5.count()
df5.createOrReplaceTempView("DF5")
# 创建SparkSQL数据集的代码
df6 = spark.sql("""
SELECT
...
from DF5 AS D5
LEFT JOIN DF3 AS D3
ON ...
LEFT JOIN DF4 AS D4
ON ...
WHERE ...
""")
# 写入结果数据集到parquet文件
df6.write.parquet(
path='<parquet文件路径2>',
mode="overwrite")
# 释放磁盘缓存
df5.unpersist()
# sparkContext停止
spark.stop()
|