MST
星途 面试题库

面试题:HBase上MapReduce前期准备之数据格式与模式

假设你要在HBase表上执行MapReduce任务,HBase表的数据模式(schema)是复杂嵌套结构,在MapReduce前期准备阶段,如何设计合适的输入格式(InputFormat)来有效地读取HBase数据并传递给Map函数进行处理?请阐述设计思路和关键步骤。
45.0万 热度难度
数据库Hbase

知识考点

AI 面试

面试题答案

一键面试
  1. 设计思路
    • 由于HBase表数据模式是复杂嵌套结构,需要一种能处理这种结构的输入格式。HBase自带的TableInputFormat主要适用于简单的HBase数据读取。对于复杂嵌套结构,我们可以继承TableInputFormat并重写一些方法来适应数据结构。
    • 要考虑如何将复杂嵌套的数据解析为Map函数能理解的键值对形式。通常,键可以设计为能唯一标识数据的信息,如行键;值则是解析后的复杂嵌套数据对象。
  2. 关键步骤
    • 继承TableInputFormat
      import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
      public class CustomHBaseInputFormat extends TableInputFormat {
          // 后续重写方法
      }
      
    • 重写getSplits方法
      • 该方法用于将HBase表数据划分为多个分片(splits),以便在多个Map任务间并行处理。
      • 根据HBase表的Region分布等信息来合理划分。例如:
      @Override
      public List<InputSplit> getSplits(JobContext job) throws IOException {
          Configuration conf = job.getConfiguration();
          byte[] startRow = TableInputFormatBase.getStartRow(conf);
          byte[] endRow = TableInputFormatBase.getEndRow(conf);
          // 获取HBase表的RegionLocator
          RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(conf.get(TableInputFormat.INPUT_TABLE)));
          List<HRegionLocation> regionLocations = regionLocator.getAllRegionLocations();
          List<InputSplit> splits = new ArrayList<>();
          for (HRegionLocation regionLocation : regionLocations) {
              byte[] regionStartKey = regionLocation.getRegionInfo().getStartKey();
              byte[] regionEndKey = regionLocation.getRegionInfo().getEndKey();
              // 根据实际需求,判断是否与startRow和endRow有交集等,来构建InputSplit
              if ((startRow == null || Bytes.compareTo(startRow, regionEndKey) <= 0) &&
                      (endRow == null || Bytes.compareTo(endRow, regionStartKey) > 0)) {
                  HTableSplit split = new HTableSplit(regionLocation.getRegionInfo(), regionLocation.getServerName());
                  splits.add(split);
              }
          }
          return splits;
      }
      
    • 重写createRecordReader方法
      • 此方法用于创建一个RecordReader,负责从输入分片中读取数据并转换为键值对。
      • 由于数据是复杂嵌套结构,在RecordReadernextKeyValue方法中,需要自定义逻辑来解析复杂嵌套数据。例如:
      @Override
      public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
          HTableSplit hTableSplit = (HTableSplit) split;
          Configuration conf = context.getConfiguration();
          Connection connection = ConnectionFactory.createConnection(conf);
          Table table = connection.getTable(TableName.valueOf(conf.get(TableInputFormat.INPUT_TABLE)));
          return new CustomRecordReader(table, hTableSplit.getStartRow(), hTableSplit.getEndRow());
      }
      class CustomRecordReader extends RecordReader<ImmutableBytesWritable, Result> {
          private Table table;
          private byte[] startRow;
          private byte[] endRow;
          private ResultScanner scanner;
          private Result currentResult;
          public CustomRecordReader(Table table, byte[] startRow, byte[] endRow) {
              this.table = table;
              this.startRow = startRow;
              this.endRow = endRow;
          }
          @Override
          public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
              Scan scan = new Scan(startRow, endRow);
              scanner = table.getScanner(scan);
          }
          @Override
          public boolean nextKeyValue() throws IOException, InterruptedException {
              currentResult = scanner.next();
              return currentResult != null;
          }
          @Override
          public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
              return new ImmutableBytesWritable(currentResult.getRow());
          }
          @Override
          public Result getCurrentValue() throws IOException, InterruptedException {
              return currentResult;
          }
          @Override
          public float getProgress() throws IOException, InterruptedException {
              // 可根据已读取行数和总行数估算进度
              return 0;
          }
          @Override
          public void close() throws IOException {
              scanner.close();
              table.close();
          }
      }
      
    • 在MapReduce作业中使用自定义InputFormat
      import org.apache.hadoop.mapreduce.Job;
      public class HBaseMapReduceJob {
          public static void main(String[] args) throws Exception {
              Configuration conf = HBaseConfiguration.create();
              Job job = Job.getInstance(conf, "HBase MapReduce with Custom InputFormat");
              job.setJarByClass(HBaseMapReduceJob.class);
              job.setMapperClass(MyMapper.class);
              job.setInputFormatClass(CustomHBaseInputFormat.class);
              // 设置其他作业相关参数,如输出格式等
              System.exit(job.waitForCompletion(true)? 0 : 1);
          }
      }
      

在上述步骤中,通过继承TableInputFormat并合理重写关键方法,设计出适合复杂嵌套结构HBase数据的输入格式,能够有效地读取数据并传递给Map函数进行处理。