性能调优方面
- 网络带宽优化
- 数据压缩:在MapReduce任务的输出端启用数据压缩,如使用Snappy、Gzip等压缩算法。例如,Snappy具有较高的压缩速度和适中的压缩比,能有效减少数据在网络上传输的大小。在Hadoop配置文件(如mapred - site.xml)中设置
mapreduce.output.fileoutputformat.compress=true
,并指定压缩格式mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
。
- 网络拓扑感知:让MapReduce任务调度器感知网络拓扑结构,尽量将数据本地化处理。比如,在YARN调度器中配置相关参数,使任务优先在存储数据的节点上运行,减少跨网络节点的数据传输。可以通过在YARN配置文件(yarn - site.xml)中设置
yarn.resourcemanager.nodemanager - hostname - over - ip - check=false
,并结合网络拓扑脚本配置,让YARN更好地感知节点位置关系。
- I/O读写优化
- 优化HBase表设计:
- 合理设置列族:减少列族数量,避免过多列族带来的I/O开销。例如,对于经常一起查询的数据放在同一个列族中。假设业务场景中用户的基本信息和登录信息经常一起查询,可将它们放在一个列族里。
- 预分区:根据数据分布特征进行预分区,避免数据热点。比如按时间范围、地域等进行预分区。如果数据按时间顺序写入,可以按时间区间(如每月、每季度)进行预分区,在创建HBase表时使用
create 'table_name', {NAME => 'cf', SPLITS => ['2023 - 01 - 01', '2023 - 04 - 01', '2023 - 07 - 01']}
这样的语句。
- MapReduce读写配置:
- 调整Map和Reduce任务数:根据数据量和集群资源合理设置Map和Reduce任务数。例如,数据量较大时适当增加Map任务数,以充分利用集群资源并行处理数据。可以通过设置
mapreduce.job.maps
和mapreduce.job.reduces
参数来调整任务数,如mapreduce.job.maps = 100
(根据实际数据量和节点资源确定合适值)。
- 使用本地缓存:对于MapReduce任务中需要频繁读取的小文件,可以将其缓存到本地节点,减少HDFS的I/O压力。比如在MapReduce代码中使用
DistributedCache.addCacheFile(new URI("hdfs://path/to/file"), context.getConfiguration());
将文件缓存到本地,然后在Map任务中通过File localFile = new File(context.getLocalCacheFiles()[0].getPath());
获取本地文件。
复杂业务场景下数据流向方案设计
- 实时数据采集:使用Flume、Kafka等工具进行实时数据采集。例如,通过Flume从多个数据源(如日志文件、数据库)收集数据,并将其发送到Kafka主题。在Flume配置文件中设置数据源(如
a1.sources = r1
,a1.sources.r1.type = exec
,a1.sources.r1.command = tail -F /var/log/app.log
),设置Kafka为目的地(如a1.sinks = k1
,a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
,a1.sinks.k1.kafka.topic = real - time - data - topic
)。
- 实时聚合:使用Spark Streaming或Storm等实时计算框架从Kafka中消费数据并进行实时聚合。以Spark Streaming为例,代码如下:
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val conf = new SparkConf().setAppName("RealTimeAggregation").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "kafka - server:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("real - time - data - topic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val aggregatedData = stream.map(_.value()).map { data =>
// 解析数据并进行聚合计算
val fields = data.split(",")
(fields(0), fields(1).toInt)
}.reduceByKey(_ + _)
aggregatedData.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val connection = HConnectionManager.createConnection(HBaseConfiguration.create())
val table = connection.getTable(TableName.valueOf("aggregated - data - table"))
partition.foreach { case (key, value) =>
val put = new Put(Bytes.toBytes(key))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("sum"), Bytes.toBytes(value))
table.put(put)
}
table.close()
connection.close()
}
}
ssc.start()
ssc.awaitTermination()
- 写入HBase特定分区:在写入HBase时,根据业务规则确定分区键。例如,如果按地域进行分区,在实时聚合后的数据中,以地域字段作为HBase表的行键前缀,这样可以将数据写入到特定分区。在上述代码中,
key
可以设计为以地域字段开头,如region1:20230801
,HBase会根据行键的字典序将数据分配到相应的分区。