面试题答案
一键面试数据倾斜对MapReduce数据流向优化算法的影响
- 数据分布不均:导致大量数据集中在少数几个Reducer上,其他Reducer处理的数据量极少,造成负载不均衡。例如,某个特定的Key出现频率极高,使得所有具有该Key的数据都被发送到同一个Reducer,而其他Reducer处于空闲状态。
- 性能瓶颈:数据倾斜会严重拖慢整个MapReduce任务的执行速度。因为只有等所有Reducer都完成任务,整个MapReduce作业才算完成,少数Reducer处理大量数据会成为性能瓶颈,延长作业运行时间。
- 资源浪费:由于大部分Reducer处理数据量少,计算资源闲置,而少数几个处理大量数据的Reducer资源紧张,造成集群资源利用效率低下。
解决数据倾斜的策略和方法
- 预聚合
- 原理:在Map阶段对数据进行初步聚合,减少进入Reducer的数据量。
- 示例:假设处理的是销售记录,以地区和产品ID为Key,销售额为Value。在Map阶段,按地区和产品ID对销售额进行初步累加,减少相同Key的数据量。
- 加盐处理
- 原理:给Key添加随机前缀,打散原本集中的Key。例如,对于高频Key "product1",添加0 - 9的随机前缀,变成 "0_product1", "1_product1" 等,使得这些数据分散到不同Reducer。
- 示例:在Map阶段,对于高频Key,生成新的Key,让数据均匀分布到Reducer。
- 自定义Partitioner
- 原理:根据业务逻辑自定义数据分配到Reducer的规则,而不是使用默认的HashPartitioner。例如,根据数据的时间范围、地区等属性分配到不同Reducer,避免数据集中在某些Reducer。
- 示例:根据数据的某个属性(如日期),设计Partitioner逻辑,将不同日期的数据分配到不同Reducer。
Java代码示例 - 以预聚合为例
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class PreAggregationExample {
public static class PreAggregationMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class PreAggregationReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Pre - Aggregation Example");
job.setJarByClass(PreAggregationExample.class);
job.setMapperClass(PreAggregationMapper.class);
job.setCombinerClass(PreAggregationReducer.class);
job.setReducerClass(PreAggregationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}
在上述代码中,通过设置CombinerClass(这里和Reducer逻辑相同)实现了预聚合。在Map阶段输出后,Combiner先对本地数据进行聚合,减少了传输到Reducer的数据量,从而缓解数据倾斜问题。