编程技术分享平台

网站首页 > 技术教程 正文

数据开发必经之路-数据倾斜(数据倾斜如何解决)

xnh888 2024-10-31 15:49:07 技术教程 37 ℃ 0 评论

前言

数据倾斜是数据开发中最常见的问题,同时也是面试中必问的一道题。那么何为数据倾斜?什么时候会出现数据倾斜?以及如何解决呢?

何为数据倾斜:数据倾斜其本质就是数据分配不均匀,部分任务处理大量的数据量导致整体job的执行时间拉长。

什么时候出现数据倾斜:无论是spark,还是mapreduce,数据倾斜大部分都是出现在shuffle阶段,也就是所谓的洗牌,由于使用的洗牌策略不一样,那么数据划分也就不一样,一般常用的也就是hash算法了。

基于上面两个问题的解答,对于数据倾斜的解决方案其本质就是如何把数据分配均匀。

笔者认为根据优化策略可以分为业务层面的优化和技术层面的优化

首先业务层面的优化就是结合实际的业务场景和数据特性进行优化,而技术层面的优化其本质就是对存储和计算两大组件的优化,然后根据不同的技术(hive,spark)使用不同的参数或者函数方法。

接下来将分别对这两种策略进行详细讲解

技术层面优化

Hive

这里讨论的是仍然是以mapreduce为底层引擎,hive on tez这种模式不做讲解,其优化思想还有参数大多数都是一样的。

1.参数优化

这里给出一些关于数据倾斜相关的参数配置,一般只能起到缓解的作用,不能完全解决倾斜问题。其中有些优化参数暂时未涉及(如map端,reduce端,jvm,压缩等有优化点)

参数参数值描述hive.map.aggrtruemap端聚合,相当于Combiner,其思想主要是分发到reduce的数据量减少hive.groupby.skewindatatrue设置为true时会生成两个mr job.在第一个job中,Map端输出的结果会随机分布到reduce中,每个reduce做部分聚合,并返回结果,其目的是将相同的key有可能分发到不同的reduce中,起到负载均衡.第二个mr job再根据预处理的结果按照key分布到reduce中(这一步的目的是可以保证相同的key最终会被分配到同一个reduce中)hive.auto.convert.jointrue是否将common join(reduce端join)转换成map joinhive.mapjoin.smalltable.filesize25000000判断为小表的输入文件大小阈值,默认25Mhive.groupby.mapaggr.checkinterval100000在Map端进行聚合操作的条目数目hive.mapjoin.cache.numrows25000缓存build table的多少行数据到内存hive.optimize.skewjointrue当开启该选项时,在join过程中Hive会将计数超过阈值hive.skewjoin.key(默认100000)的倾斜key对应的行临时写进文件中,然后再启动另一个job做map join生成结果hive.merge.mapfilestrue合并小文件,减少对应的map数hive.skewjoin.key100000判断数据倾斜的阈值,如果在join中发现同样的key超过该值,则认为是该key是倾斜key

2.Sql优化

数据倾斜出现的原因一方面是数据特性,另一方面是人为导致的,sql开发较粗糙导致的(占主要部分)。这里给出几个常见的sql倾斜的场景以及解决思路

2.1 Join优化

hive根据join key进行划分并发生shuffle,所以选用的key尽量分布均匀。目前出现的场景无非就是大表和小表关联,小表和小表关联(一般不会出现倾斜),大表和大表关联,这里针对这几种情况分别进行讲解

2.1.1 大表join小表-MapJoin

这里的大小表是相对来说的,如果一个A表数据量有1亿,而B表有1千万,那么B表就是一个小表。当B表中的key分布比较集中,那么在进行shuffle的时候就会有一个或者某几个reduce上的数据量较高于平均值,也就更容易出现倾斜。针对这种场景一般通过mapjoin的方式来解决。

MapJoin的原理就是把小表全部加载到内存中(注意如果内存无法存储1千万的数据,需要对内存进行调节),在map端进行join,这样就不会有shuffle阶段了。

 1--原始sql
 2select 
 3  lnc.request_url,
 4  count(uuid) as pv
 5from wedw_dwd.log_ng_channel lnc
 6join 
 7(
 8  select 
 9     request_url,
10     visit_time,
11     uuid
12  from wedw_dwd.track_beacon_log
13)t
14on lnc.request_url = t.request_url
15group by lnc.request_url
16
17--mapjoin 
18select /*+ MAPJOIN(lnc) */  
19  lnc.request_url,
20  count(uuid) as pv
21from wedw_dwd.log_ng_channel lnc
22join 
23(
24  select 
25     request_url,
26     visit_time,
27     uuid
28  from wedw_dwd.track_beacon_log
29)t
30on lnc.request_url = t.request_url
31group by lnc.request_url

2.1.2 大表join大表-Skewjoin

当两个表都非常大,无法直接加载到内存中,那么这个时候就需要评估join key的分布是否均匀。

情况一:当key分布均匀,那么这个时候一般就不是倾斜的范畴了,需要考虑增加reduce数量等其他调优手段了

情况二:当key分布不均匀,如果只是某几个key数据量比较大,那么就需要把这几个key单独拿出来进行计算;如果大部分key的数据量都很大,那么这个时候就需要进行增加随机前缀的方式了,也就是二次聚合的思想。

  1--参数调节
  2set hive.optimize.skewjoin=true;
  3set hive.skewjoin.key=100000;
  4set hive.optimize.skewjoin.compiletime=true;
  5
  6-- 某几个key的数据量比较大,需要单独进行计算
  7select 
  8  lnc.request_url,
  9  count(uuid) as pv
 10from wedw_dwd.log_ng_channel lnc
 11join 
 12(
 13  select 
 14     request_url,
 15     visit_time,
 16     uuid
 17  from wedw_dwd.track_beacon_log
 18  where request_url!='www.baidu.com'
 19)t
 20on lnc.request_url = t.request_url
 21group by lnc.request_url
 22
 23union all 
 24
 25select 
 26  lnc.request_url,
 27  count(uuid) as pv
 28from wedw_dwd.log_ng_channel lnc
 29join 
 30(
 31  select 
 32     request_url,
 33     visit_time,
 34     uuid
 35  from wedw_dwd.track_beacon_log
 36  where request_url='www.baidu.com'
 37)t
 38on lnc.request_url = t.request_url
 39group by lnc.request_url
 40
 41--大部分key的数据量都比较大,采用随机前缀的方式,右表的数据量同样也需要进行扩充
 42select 
 43  split(request_url,'&')[1] as request_url,
 44  sum(cnt) as cnt
 45from 
 46(
 47  select 
 48    t1.request_url,
 49    count(uuid) as cnt
 50  from 
 51  (
 52    select 
 53       concat(cast(round(rand()*10) as int),'&',request_url) as request_url
 54    from wedw_dwd.log_ng_channel
 55  )t1
 56  left join 
 57  (  -- 扩充10倍
 58    select 
 59      concat('1&',request_url) as request_url,
 60      uuid
 61    from wedw_dwd.track_beacon_log
 62    union all 
 63    select 
 64      concat('2&',request_url) as request_url,
 65      uuid
 66    from wedw_dwd.track_beacon_log
 67    union all 
 68    select 
 69      concat('3&',request_url) as request_url,
 70      uuid
 71    from wedw_dwd.track_beacon_log
 72    union all 
 73    select 
 74      concat('4&',request_url) as request_url,
 75      uuid
 76    from wedw_dwd.track_beacon_log
 77    union all 
 78    select 
 79      concat('5&',request_url) as request_url,
 80      uuid
 81    from wedw_dwd.track_beacon_log
 82    union all 
 83    select 
 84      concat('6&',request_url) as request_url,
 85      uuid
 86    from wedw_dwd.track_beacon_log
 87    union all 
 88    select 
 89      concat('7&',request_url) as request_url,
 90      uuid
 91    from wedw_dwd.track_beacon_log
 92    union all 
 93    select 
 94      concat('8&',request_url) as request_url,
 95      uuid
 96    from wedw_dwd.track_beacon_log
 97    union all 
 98    select 
 99      concat('9&',request_url) as request_url,
100      uuid
101    from wedw_dwd.track_beacon_log
102    union all 
103    select 
104      concat('10&',request_url) as request_url,
105      uuid
106    from wedw_dwd.track_beacon_log
107  )t2
108  on t1.request_url = t2.request_url
109  group by t1.request_url
110)t
111group by split(request_url,'&')[1]

2.2 distinct 优化

distinct操作也会经历shuffle阶段,通常会和group by 进行结合使用,也是数据倾斜的高频操作。通常对于需要distinct的操作,我们可以换种思路来解决,即先进行group by后再进行后续的操作。例子如下:

 1--原始sql
 2select 
 3   request_url,
 4   count(distinct uuid)
 5from wedw_dwd.log_ng_channel
 6group by request_url
 7
 8--上面的sql可以改写为
 9select 
10  request_url,
11  sum(1) as cnt
12from 
13(
14  select 
15  request_url
16  ,uuid
17from wedw_dwd.log_ng_channel
18group by request_url,uuid
19)t
20group by request_url

2.3 过滤/拆分

过滤:通常在进行统计的时候,表中总会有很多脏数据或者空数据,当实际的需求中并不关心这些脏数据或者空数据,那么我们可以先进行过滤,然后进行后续的操作。通过减少数据量来避免数据倾斜

拆分:这里和上面讲述到的SkewJoin优化思想很类似。

情况一:例如表中有很多NULL值,在整个key分布中占比最高,但是实际需求还不能对这些null值进行过滤,那么对需要单独把这些null值拿出来计算,或者以随机数进行填充

情况二:例如当表中的大部分key占比都比较高,那么这个时候就需要对这些key增加随机前缀,使得reduce分布均匀

 1--过滤
 2select 
 3  request_url,
 4  count(1) as cnt
 5from wedw_dwd.log_ng_channel
 6where request_url is not null and length(request_url)>0 and to_date(visit_time)>='2020-10-01'
 7group by request_url
 8
 9--拆分
10select 
11  request_url,
12  count(1) as cnt
13from wedw_dwd.log_ng_channel
14where request_url is not null and length(request_url)>0
15group by request_url
16union all 
17select 
18  request_url,
19  count(1) as cnt
20from wedw_dwd.log_ng_channel
21where request_url is  null 
22group by request_url

Spark

针对spark,当某一个或几个task处理时间较长且处理数据量很大,那么就是倾斜的问题了,对于spark的数据倾斜,和Hive的解决思路是一样的,但是Hive通常是以sql为主,而Spark是对rdd的操作,所以优化细节还是有些区别的。其实无论是spark还是hive,数据倾斜的问题无非就是数据特性(本身分布就不均匀/数据量本身就比较大)或者是后续人为开发编写逻辑导致的。

1.排查数据源

在spark中,Stage的划分是通过shuffle算子为界限的,同一个Stage的不同partition可以并行处理,不同Stage之间只能串行处理。一个Stage的整体耗时是由最慢的task来决定的,针对同一个Stage内的不同task,排除每个计算能力差异的前提下,处理时长是由每个task所处理的数据量来决定的,而Stage的数据源主要分为两大类:

获取上一个stage的shuffle数据

直接对接数据源,如kafka,hdfs,local filesystem

如果对接kafka,则需要结合kafka监控来排查分区数据分布是否均匀,如果某一个分区的消息比其他分区都要多,那么这个时候就要对分区分配策略进行调整;或者分区数比较少,则需要增大partition数量;

如果对接hdfs且不可分割的文件,每个文件对应一个partition,这个时候就要看每个文件的数据量是否比较均匀了(注意这里不能仅看文件大小,如果是压缩文件,需要看数据量)。

对于hdfs可切分文件,每个split大小由以下算法决定。其中goalSize等于所有文件总大小除以minPartitions。而blockSize,如果是HDFS文件,由文件本身的block大小决定;如果是Linux本地文件,且使用本地模式,由fs.local.block.size决定。

1protected long computeSplitSize(long goalSize, long minSize, long blockSize) { 2 return Math.max(minSize, Math.min(goalSize, blockSize)); 3}

一般情况下每个split的大小相当于一个block的大小,通常不会出现倾斜。如果有的话则调整对于的参数即可。

接下来对含有shuffle阶段的现象进行排查定位。

总结:尽量使用可切分的文件,源头增加并行度,避免倾斜

2.定位导致倾斜的代码

根据第一步排查数据源分布,如果是均匀的,那么出现倾斜就有可能是人为开发导致的,这时候就需要定位到具体代码导致的数据倾斜(一般找发生shuffle算子,如distinct,groupByKey,join,repartition,reduceByKey,cogroup等算子)

2.1.查看task运行时间和处理数据量

这里可以通过spark web ui界面来查看每个task的处理时长和处理的数据量

2.2.推断倾斜代码

基于spark web ui查看到每个task的处理情况,那么可以查看到该task是处于哪个Stage阶段,然后在代码中查找会出现shuffle的算子,以此来定位具体是哪段代码引起的数据倾斜。这里给出wordcount的例子来简单说明一下

1val conf = new SparkConf()
2val sc = new SparkContext(conf)
3val lines = sc.textFile("hdfs://project/log/test/word.txt")
4val words = lines.flatMap(_.split(","))
5val pairs = words.map((_, 1))
6val wordCounts = pairs.reduceByKey(_ + _)
7wordCounts.collect().foreach(println(_))

通过上面的代码可以看到只有reduceByKey是会经历shuffle阶段的,因此只有这里才会有倾斜的可能

3.解决倾斜

查阅各种资料,网上列举出了8种解决方案,笔者这里进行了分类汇总,尽量把每种方案都融合进来了

3.1 调整并行度

Spark Shuffle默认使用的是HashPartitioner进行数据分区,当执行shuffle read 的时候,根据spark.sql.shuffle.partitions参数来决定read task数量,默认值是200.当有大量的不同key被分配到同一个task的时候,可能会导致该Task所处理的数据远大于其他task.

因此调整并行度的本质就是将使得原本被分配到同一Task的不同Key发配到不同Task上处理,则可降低原Task所需处理的数据量,起到缓解倾斜的作用。具体的调整方式可以通过上述参数指定并行度,或者在使用shuffle算子的时候指定参数值。

注意:这种优化方式对同一个key有大数据量的场景不适用,且只能起到缓解倾斜的作用

如:

 1SparkSession sparkSession = SparkSession.builder()
 2                .appName("wordcount")
 3                .master("local")
 4                .getOrCreate();
 5
 6JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
 7JavaRDD<String> javaRDD = javaSparkContext.textFile("hdfs://bigdatatest-1:8020/tmp/test.txt");
 8javaRDD.mapToPair(x -> new Tuple2<>(x.toLowerCase() , 1))
 9                .reduceByKey((a, b) -> a + b,1000) /**这里的1000就是并行度partition的设置*/
10                .collect()
11                .forEach(new Consumer<Tuple2<String, Integer>>() {
12                    @Override
13                    public void accept(Tuple2<String, Integer> stringIntegerTuple2) {
14                        System.out.println("The Key:"+stringIntegerTuple2._1+" Value:"+stringIntegerTuple2._2.toString());
15                    }

3.2 自定义partitioner

根据上面提到的,spark默认使用的是hash partition,有些时候调整并行度仍然不能有效解决数据倾斜问题,那么这个时候就需要结合实际的数据特性来自定义分区.其主要目的还是将不同的key尽量分配到不同的task上,这种方式对同一个key有大数据量的场景并不适用

 1/**
 2*自定义partition
 3*/
 4public class DefinePartition extends Partitioner {
 5    public  DefinePartition(){}
 6
 7    @Override
 8    public int hashCode() {
 9        return super.hashCode();
10    }
11
12    @Override
13    public boolean equals(Object definePartition) {
14        DefinePartition definePartition1 = (DefinePartition) definePartition;
15        return this.numPartitions()==((DefinePartition) definePartition).numPartitions();
16    }
17
18    @Override
19    public int numPartitions() {
20        return 20;
21    }
22
23    @Override
24    public int getPartition(Object key) {
25        int Code  = 0;
26        try {
27            String host = new URL(key.toString()).getHost();
28            Code = host.hashCode()%numPartitions();
29            if(Code<0){
30                Code+=numPartitions();
31            }
32        } catch (MalformedURLException e) {
33            e.printStackTrace();
34        }
35
36        return Code;
37    }
38}

3.3 过滤数据

这里过滤数据有两种说法:

过滤无用数据:这里无用数据指的是针对特定的业务需求场景来说的,如空数据,对本次需求无影响的数据。因此在进行shuffle前可调用filter算子来过滤掉

过滤掉少部分数据量较多的key:这里所说的过滤并不是真正的过滤掉,而是通过抽样的方式统计出哪些key所占有的数据量较多,先提前提取出来进行单独计算。和hive的处理思想是一样的

注意:该种方式仅对于少部分key有倾斜的现象有效

3.4 避免使用shuffle类算子

相信大部分读者都知道在shuffle过程中会把多个节点上同一个key拉取到同一个节点上进行计算,这个时候就会涉及到磁盘io,网络传输,这也就是为什么shuffle效率较差的原因了。

因此在实际开发中,尽量规避使用shuffle类算子。例如不使用join算子,而是使用broadcast+map的方式来实现。

 1 List<Map<String, String>> collect = javaSparkContext.textFile("hdfs://bigdatatest-1:8020/tmp/test.txt")
 2                .mapPartitions(new FlatMapFunction<Iterator<String>, Map<String, String>>() {
 3                    @Override
 4                    public Iterator<Map<String, String>> call(Iterator<String> stringIterator) throws Exception {
 5                        List<Map<String, String>> list = new ArrayList<>();
 6                        HashMap<String, String> hashMap = Maps.newHashMap();
 7                        while (stringIterator.hasNext()) {
 8                            String str = stringIterator.next();
 9                            hashMap.put(str.split(",")[0], str.split(",")[1]);
10                        }
11                        list.add(hashMap);
12                        return list.iterator();
13                    }
14                }).collect();
15
16Broadcast<List<Map<String, String>>> listBroadcast = javaSparkContext.broadcast(collect);
17javaRDD.map(new Function<String, String>() {
18  @Override
19  public String call(String s) throws Exception {
20    Iterator<Map<String, String>> iterator = listBroadcast.getValue().iterator();
21    while (iterator.hasNext()) {
22      Map<String, String> stringMap = iterator.next();
23      if (stringMap.containsKey(s)) {
24        return stringMap.get(s);
25      }
26    }
27    return null;
28  }
29}).collect();

3.5 加盐操作(二次聚合/随机前缀+扩容)

笔者认为二次聚合的手段和随机前缀+扩容的方式其本质都是加盐操作,即对key进行加盐使得分配到不同的task上,然后再进行合并保证同一个key最终会聚合到一起。虽然两者思想一样,但是使用的场景还是有所区别的。

二阶段聚合:

第一次是局部聚合,先给每个key打上随机数

然后执行聚合操作,如reduceByKey,groupByKey,这个时候得到的结果相对原始数据集肯定是少了很多

然后再把key上的随机数给删除,保证原始数据集中的相同key可以被分配到同一个task上

再次进行聚合操作,得到最终结果

注意:二次聚合仅适用于聚合类的shuffle操作

随机前缀+扩容

该种优化手段可以参考前面hive的解决方案,其思想都是一样的。

注意:和二次聚合的场景不一致,这里是针对join类型的操作;无论是少数key倾斜还是大部分key倾斜都适用,但是需要对rdd进行扩容,需要均衡考虑内存资源

3.6 各种join转换

首先我们先来简单了解下spark的几种join实现,已经适用的场景。

Broadcast Hash Join: 适合一张积小的表和一张大表进行join,其原理将其中一张小表广播分发到另一张大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。broadcast适用于小表很小,可以直接广播的场景,当然被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M,如果增加该参数值,请考虑driver端的内存避免oom,因为一般被广播的表需要collect到driver端,然后再分发到executor端。

Shuffle Hash Join: 适合一张小表和一张大表进行join,或者是两张小表之间的join,这里所说的小表要比broadcast hash join场景下的小表大些,且不适合broadcast方式。该join的原理是利用key相同必然分区相同的这个原理,两个表中,key相同的行都会被shuffle到同一个分区中,SparkSQL将较大表的join分而治之,先将表划分成n个分区,再对两个表中相对应分区的数据分别进行Hash Join,这样即在一定程度上减少了driver广播一侧表的压力,也减少了executor端取整张被广播表的内存消耗

Sort Merge Join: 适合两张较大的表之间进行join。其原理首先将两张表按照join key进行了重新shuffle,保证join key值相同的记录会被分在相应的分区,分区后对每个分区内的数据进行排序,排序后再对相应的分区内的记录进行连接,因为两个序列都是有序的,从头遍历,碰到key相同的就输出;如果不同,左边小就继续取左边,反之取右边。其原则就是即用即取即丢

目前网上大部分资料都是针对reduce join转换为map join的解决方案,其原理就是上面第一种join方式,即broadcast+map的方式,笔者认为第二种join方式也就是对应的上面的加盐操作。基于上面三种join方式的简单讲述,读者可根据实际内存资源、带宽资源适量将参数spark.sql.autoBroadcastJoinThreshold调大,让更多join实际执行为broadcast hash join。

业务层面优化

通过上面的技术层面的倾斜解决思路讲解,其实已经把业务层面的相关优化给了出来。

结合实际业务需求过滤无用数据,尽量规避shuffle的发生。

逆向统计:比如说需求方想要统计某个页面的跳出率,正常逻辑的统计是:该页面下一页为空的UV/该页面UV,那么我们可以通过1-(该页面下一页不为空的UV/该页面UV)来得到跳出率,即所谓的逆向推导

打回需求:当然如果读者认为该需求不切合实际,且产出意义不大,是完全可以打回的,不过笔者给出的这种优化实属是下下策,作为一名技术人员,我们还是应该正向面对,彻底解决问题的

总结

如果读者能读到这里,首先非常感谢你的耐心阅读,本文基本上都是给出解决思路,实际案例较少。因为在实际场景中,可能并不是该文中的某一个方案能够彻底解决读者的问题,有时候需要结合多种其他的优化方式结合倾斜的解决思路才能够解决,笔者认为掌握一个问题的解决思路才是重要的。如果读者比较细心的话,可能会发现本文中出现比较多的字就是均匀,因此倾斜对应的反义词就是均匀,这也就是解决数据倾斜的主线,即一切解决方案都是围绕着均衡来展开的。


Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表