面试题答案
一键面试- 设计思路:
- 由于HBase表数据模式是复杂嵌套结构,需要一种能处理这种结构的输入格式。HBase自带的
TableInputFormat
主要适用于简单的HBase数据读取。对于复杂嵌套结构,我们可以继承TableInputFormat
并重写一些方法来适应数据结构。 - 要考虑如何将复杂嵌套的数据解析为Map函数能理解的键值对形式。通常,键可以设计为能唯一标识数据的信息,如行键;值则是解析后的复杂嵌套数据对象。
- 由于HBase表数据模式是复杂嵌套结构,需要一种能处理这种结构的输入格式。HBase自带的
- 关键步骤:
- 继承TableInputFormat:
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; public class CustomHBaseInputFormat extends TableInputFormat { // 后续重写方法 }
- 重写getSplits方法:
- 该方法用于将HBase表数据划分为多个分片(splits),以便在多个Map任务间并行处理。
- 根据HBase表的Region分布等信息来合理划分。例如:
@Override public List<InputSplit> getSplits(JobContext job) throws IOException { Configuration conf = job.getConfiguration(); byte[] startRow = TableInputFormatBase.getStartRow(conf); byte[] endRow = TableInputFormatBase.getEndRow(conf); // 获取HBase表的RegionLocator RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(conf.get(TableInputFormat.INPUT_TABLE))); List<HRegionLocation> regionLocations = regionLocator.getAllRegionLocations(); List<InputSplit> splits = new ArrayList<>(); for (HRegionLocation regionLocation : regionLocations) { byte[] regionStartKey = regionLocation.getRegionInfo().getStartKey(); byte[] regionEndKey = regionLocation.getRegionInfo().getEndKey(); // 根据实际需求,判断是否与startRow和endRow有交集等,来构建InputSplit if ((startRow == null || Bytes.compareTo(startRow, regionEndKey) <= 0) && (endRow == null || Bytes.compareTo(endRow, regionStartKey) > 0)) { HTableSplit split = new HTableSplit(regionLocation.getRegionInfo(), regionLocation.getServerName()); splits.add(split); } } return splits; }
- 重写createRecordReader方法:
- 此方法用于创建一个
RecordReader
,负责从输入分片中读取数据并转换为键值对。 - 由于数据是复杂嵌套结构,在
RecordReader
的nextKeyValue
方法中,需要自定义逻辑来解析复杂嵌套数据。例如:
@Override public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { HTableSplit hTableSplit = (HTableSplit) split; Configuration conf = context.getConfiguration(); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(conf.get(TableInputFormat.INPUT_TABLE))); return new CustomRecordReader(table, hTableSplit.getStartRow(), hTableSplit.getEndRow()); } class CustomRecordReader extends RecordReader<ImmutableBytesWritable, Result> { private Table table; private byte[] startRow; private byte[] endRow; private ResultScanner scanner; private Result currentResult; public CustomRecordReader(Table table, byte[] startRow, byte[] endRow) { this.table = table; this.startRow = startRow; this.endRow = endRow; } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { Scan scan = new Scan(startRow, endRow); scanner = table.getScanner(scan); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { currentResult = scanner.next(); return currentResult != null; } @Override public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { return new ImmutableBytesWritable(currentResult.getRow()); } @Override public Result getCurrentValue() throws IOException, InterruptedException { return currentResult; } @Override public float getProgress() throws IOException, InterruptedException { // 可根据已读取行数和总行数估算进度 return 0; } @Override public void close() throws IOException { scanner.close(); table.close(); } }
- 此方法用于创建一个
- 在MapReduce作业中使用自定义InputFormat:
import org.apache.hadoop.mapreduce.Job; public class HBaseMapReduceJob { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); Job job = Job.getInstance(conf, "HBase MapReduce with Custom InputFormat"); job.setJarByClass(HBaseMapReduceJob.class); job.setMapperClass(MyMapper.class); job.setInputFormatClass(CustomHBaseInputFormat.class); // 设置其他作业相关参数,如输出格式等 System.exit(job.waitForCompletion(true)? 0 : 1); } }
- 继承TableInputFormat:
在上述步骤中,通过继承TableInputFormat
并合理重写关键方法,设计出适合复杂嵌套结构HBase数据的输入格式,能够有效地读取数据并传递给Map函数进行处理。