封面画师:画师JW     封面ID:78362042

HBase高级

架构原理

HBase详细架构图

HBase依赖于HDFS与Zookeeper。HBase在启动时需要启动的两大进程——HMaster、HRegionServer。HMaster管理一些DDL的操作,HRegionServer则是管理一些DML的操作,同时HMaster也会对HRegionServer进行管理。HLog,即:预写日志,用于灾难恢复使用,在Region Server宕机后,可以从log中回滚还没有持久化的数据。一个HRegionServer上,可以有多个Region;而一个表中,也可能存在多个Region。一个Region中又有多个Store(列族中可以有多个Store),Store的存储是隔离的,放在不同的文件夹。

一个列族对应多个Store,那么这些Store一定不会在同一个Region中。一个Region里,不同的Store一定不是同一个列族的。(绕口令?)

(在HBase中,列相当于是数据的一部分,因为你在插入时指定了列,而建表时只指定了列族,而不是列。)

每个Store中包含与其对应的MemStore以及一个或多个StoreFile(StoreFile是实际数据存储文件HFile的轻量级封装,HFile也是一种文件格式,HFile是存在DataNode中的,但因为它是HBase的组成部分,因此将其归于HBase中)。MemStore是在内存中的,保存了修改的数据,MemStore中的数据写到文件中就是StoreFile。HLog会实时写入DFS Client,用于数据恢复,同时HFile也会写入DFS Client,然后经过DFS Client的作用,将数据储存在HDFS中的DataNode。

Zookeeper会分担HMaster的一部分工作,对客户端来说,将表数据的操作分担给Zookeeper,注意,是表数据,而不是表。

参考链接:详解HBase架构原理HBase原理深入解析(一)----HBase架构总览

写流程

参考链接:

有态度的HBase/Spark/BigData

HBase架构详解和数据的读写流程

HBase读写数据流程


HBase是一个读比写慢的框架。

HBase写流程
  1. Client先访问Zookeeper,获取hbase:meta表位于哪个Region Server。(meta表中存储着表的位置信息,但并不是说对A表就只有一条信息,如果A数据太大而被划分成多个Region,那么meta表中就会存储A表的多个信息,这些多个信息是以RowKey进行区分。)
  2. 访问对应的Region Server,获取hbase:meta表,根据请求的namespace:table/rowkey,查询出目标数据位于哪个Region Server中的那个Region中。并将该table的region信息以及meta表的位置信息缓存在客户端的meta cache中,方便下次访问;
  3. 与目标Region Server进行通讯;
  4. 将数据顺序写入(追加)到WAL;
  5. 将数据写入对应的Memstore,数据会在Memstore中进行排序;
  6. 向客户端发送ack;
  7. 等达到Memstore的**刷写(Flush)**时机后,将数据写入HFile。

我们可以说HBase是先将数据写入WAL,在写入Memstore,这种说法是一种不严谨的说法,但并不是错的,实际的操作顺序是:

  1. Hbase做写操作时,先记录在本地的WAL中,但是不同步到HDFS
  2. 把数据写入Memstore
  3. 开始将WAL同步到HDFS
  4. 最后如果WAL同步成功则结束,如果同步失败则回滚Memstore

MemStore Flush

参考链接:

HBase Flush 解析

HBase参数配置及说明

MemStore Flush

不同的store位于不同的列族,因此经过flush后,在HDFS中也是位于不同的文件夹。

hbase.regionserver.global.memstore.size:

RegionServer中有全局MenStore的大小,超过该大小就会触发flush到磁盘的操作,默认是堆大小的40%,而且RegionServer级别的Flush会阻塞客户端读写。

hbase.regionserver.global.memstore.size.lower.limit:

在刷写时,有一个安全设置,有时候集群的“写负载”非常高,写入量一直超过了flush的量,这时,我们就希望MemStore不要超过一定的安全设置。在这种情况下,写操作将被一直阻塞到一个MemStore可被管理的大小。这个大小在默认情况下是 堆大小 * 0.4 * 0.95,即:堆大小的38%。换句话说,也就是当RegionServer级别的flush操作(Flush顺序是按照Memstore由大到小执行的)发生后,会阻塞客户端读写,一直阻塞到整个RegionServer级别的MemStore大小为堆大小的38%。

hbase.regionserver.optionalcacheflushinterval:

内存中的文件在自动刷新之前能够存活的最长时间,默认是1h。当内存中最后一条数据的存活时间达到1h时,将被flush。

hbase.hregion.memstore.flush.size:

单个Region里单个MemStore的缓存大小,超过后那么整个HRegion就会被flush,默认128M。

读流程

HBase读流程

HBase读数据时,并不是按照内存、缓存、磁盘的顺序进行读取的,而是一起读取的。

在实际情况中,我们在查询(读取)数据时,返回的是时间戳大的数据,那如果按照内存MemStore、缓存Block Cache、磁盘StoreFile的顺序进行假设读取呢?我们假定磁盘中有一条数据A,其时间戳为a,这时我们向HBase插入数据B,并令时间戳为b,且a > b(AA、BB是同一个单元格不同版本的数据)。由于HBase还没有将B数据flush到磁盘中,此时数据B存在内存中。如果按照内存、缓存、磁盘的顺序进行查询(读取),此时返回给我们的数据是B,即:时间戳小的数据,这显然与我们的实际情况不符,也验证了HBase读数据时,不是按照内存、缓存、磁盘的顺序进行读取的。

HBase在读取数据时,Block Cache的使用情况:假设磁盘中保存了数据AA,其时间戳为aa,这时候我们查询数据AA,AA就会被写入Block Cache。然后我们向HBase中插入数据BB,其时间戳为bb,显然:bb > aa,然后我们手动将数据BB flush 到磁盘中(AA、BB是同一个单元格不同版本的数据)。然后我们查询这个单元格,HBase在进行数据读取时,会读取Block Cache中的数据AA,磁盘中的数据BB,而不会读取磁盘中的数据AA(因为在Block Cache读取数据AA,所以就不再磁盘中读取数据AA了)。

StoreFile Compaction

由于MemStore每次刷写都会生成一个新的HFile,且同一字段的不同版本(TimeStamp)和不同类型(Put/Delete)有可能会分布在不同的HFile中,因此查询时需要遍历所有的HFile。为了减少HFile的个数,以及清理掉过期和删除的数据,会进行StoreFile Compaction。

Compaction分为两种,分别是Minor Compaction(小合并)Major Compaction(大合并)。MinorCompaction会将临近的若干个较小的HFile合并成一个较大的HFile,但不会清理过期和删除的数据。Major Compaction会将一个Store下的所有HFile合并成一个大的HFile,并且会清理掉过期和删除的数据

大合并和小合并都是在Region的内部进行的,并不涉及到Region之间的合并。merge_region是指将两个Region合并为一个Region。

StoreFile Compaction

在HBase Shell中也有关于Compact的命令:compact、compact_rs、major_compact、merge_region。

默认文件超过3个,执行compact触发大合并(作用等于major_compact)。

为了保证数据的一致性,合并生成新文件后并不会立即删除旧文件,而是会等一会再删除。

HBase合并Shell命令:hbase compact


hbase.hregion.majorcompaction:

一个Region进行Major Compaction合并的周期,在这个点的时候,这个Region下的所有HFile都会进行合并。默认时间是7天。这个操作十分消耗资源,在生产环境中我们常将其设置为0以关闭这个操作,而在应用空闲的时间手动触发。

同时还有一个抖动比例的设置,但在生产环境中我们都会将其关闭,因此不再叙述。

hbase.hregion.compationThreshold:

一个Store里面允许存的HFile的个数,超过这个个数会被写到新的一个HFile里面,也就是每个Region的每个列族对应的MemStore在flush为HFile的时候默认情况下达到3个HFile的时候(大于等于)就会对这些文件进行合并重写为一个新的文件,设置个数越大可以减少触发合并的时候,但是每次合并的时间就会越长。

读写拓展

我们可以Kill 掉 HMaster的端口,然后在一张表内进行插入数据、查询数据等操作,这时候并不会报错且操作成功;但如果我们创建一张表,就会显示“无法从Zookeeper中获取Master”的错误,告诉我们这样的操作无法实现。我们可以得出:元数据的操作需要有HMaster的存在,而表内数据的操作并不需要。

通过读写的流程我们能看到,在读写时没有HMaster的参与,但是都有Zookeeper的参与。同时,在上一段中我们也说了Master的获取需要依赖Zookeeper,根据这两条,我们在写客户端代码时,只需要Zookeeper的地址就可以了。

虽然表内数据的读写没有HMaster的参与,但并不是就能将HMaster关闭。长期在HMaster不存在的情况下进行读写会导致集群不健康。当Region需要切分时,需要依靠Master修改元数据,但由于没有Master的存在导致不能进行切分。就算能够进行切分,但切分成功后需要将切分后的Region调度到其他节点,调度时需要依靠Master而导致无法进行调度(正是因为文件太大才进行切分,结果切分了又无法调度到其他节点,那么这样的切分又有什么用呢?)。

数据真正删除的时间

  • Flush时
  • 合并时

Flush真正删除的数据:

同一个内存(MemStore)中,过时的数据、被删除标记标记上的数据。不能真正删除跨越了多个文件的数据,只能真正删除在同一内存。比如,我在磁盘中存有一条张三的信息,这时候我将内存中的李四(张三与李四同表、同列族、同列,仅时间戳存在差异)Flush到磁盘,磁盘中张三的数据并不会被真正删除。

Compact(major)真正删除的数据:

即:大合并时真正删除的数据。大合并是将磁盘中的多个文件写入内存,然后再rewrite回磁盘合并成一个文件,在这种情况下,上述列举的张三、李四就会被真正删除一个(相当于它俩在内存中见面了,所以会被真正删除一个)。


DeleteColumn标记真正删除的时机:不会在Flush时删除,会在大合并时删除。 为什么?

我们假设在一单元格内有一数据 A,时间戳为 a,然后我们将其Flush(刷洗)进硬盘。这时候,我们在向这单元格插入数据 B,时间戳为 b,显然:b > a。这时候我们对内存中的数据B进行deleteall操作,然后会产生一个DeleteColumn标记。如果我们进行版本查询,我们依然能够查看到数据 B,也能看到DeleteColumn标记。

然后我们对这个表进行flush(合并内存中的数据),再进行版本查询,发现数据B已经不见,而DeleteColumn标记依燃存在。最后我们对表进行大合并,发现DeleteColumn标记和数据A都被真正删除。这证明了我们的观点,那么HBase为什么要这么做呢?

最开始我们已将数据 A 放进磁盘,然后我们在同一单元格插入了数据 B ,这时候按照我们的逻辑,数据 A 应该被真正删除,但是按照HBase的逻辑,数据 A 并不会被真正删除,数据A依然存在磁盘当中,只是HBase返回了时间戳大的数据 B ,给用户的感觉好像是已经真正删除了数据 A 。再然后,我们删除(不是真正的删除,只是执行了delete操作)了数据B(由于数据B至今未被Flush,因此它还在内存中)并对表进行Flush。由于我们删除了数据B,按照我们的逻辑,因为数据A被数据B覆盖,数据B又被我们删除,用户应该查询不到这个单元格的数据,而真实情况也是这样。

但如果DeleteColumn标记在FLush时被删除,用户在查询表时就会看到数据A,这显然是不符合常情的。因此我们需要在进行大合并时删除DeleteColumn标记,因为只有大合并时会将磁盘中的数据写会磁盘,然后进行比较,真正删除数据A与DeleteColumn标记。


如果我们设置了多个版本,在进行真正删除的时候,会保留最大的几个版本。

Split流程

默认情况下,每个表起初都只有一个Region,随着数据的不断写入,Region会自动进行拆分(按RowKey进行拆分)。刚拆分时,两个子Region都位于当前的Region Server,但处于负载均衡的考虑,HMaster有可能有可能会将某个Region转移给其他的Region Server。

Split时机

  1. 当一个Region中的某个Store下所有StoreFile的总大小超过hbase.hregion.max.filesize时,该Region就会进行拆分(HBase 0.94之前的版本)。

    hbase.hregion.max.filesize:HStoreFile最大的大小,当某个region的某个列族超过这个大小(默认10G)时会进行Region拆分。

  2. 当一个Region中的某个Store下所有StoreFile的总大小超过Min(R2 * “hbase.hregion.memstore.flush.size”, “hbase.hregion.max.filesize”)时,该Region就会进行拆分,其中R为当前Region Server中属于该Table的Region个数(HBase 0.94之后的版本)。

这样的自动切分会产生HBase的数据热点问题,因此我们在生产环境中建表时会进行预分区。

Split自动切分流程


官方建议在建表时不使用多个列族,原因:

如果我们使用了多个列族(假设有列族i1,i2,i3),然后在插入数据时,向i1中put了大量的数据,而i2,i3中仅仅只有几个数据。这时如果触发了全局的Flush时,由于i2,i3中也有几条数据而也会形成一个文件(这是一个小Store),但是i1会形成一个较大的文件(大Store)。如果列族过多,且发生了这种情况,那么就会出现多个小文件,为了避免这种情况的发生,官方做出了这样的建议。

但是我们在实际生产时,也可以创建多个列族,但是要尽量保证每个列族中的数据量同步增长。

API操作

常用 API

导入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.2</version>
</dependency>

创建连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class HBaseApi {

private static Connection connection = null;
private static Admin admin = null;

static {
try {
//获取配置信息
Configuration configuration = HBaseConfiguration.create();
// 第二个参数为HBase服务器的ip地址,我在此处修改了开发机的host文件
configuration.set("hbase.zookeeper.quorum", "hbase");
//创建连接对象
connection = ConnectionFactory.createConnection(configuration);
//创建Admin对象
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
}

如果引入 Spring,我们通常编写一个配置类,然后将连接信息注入 Spring 中。如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class HBaseUtil {
private static Connection connection = null;
private static Admin admin = null;

public HBaseUtil(Configuration conf) {
try {
connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
}

@Configuration
public class HBaseConfig {
@Bean
public HBaseUtil getHbaseService() {
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hbase");
return new HBaseUtil(conf);
}
}

关闭连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 关闭Admin和Connection
public static void close() {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

判断表是否存在

1
2
3
4
5
6
7
8
9
/**
* 判断表是否存在
* @param tableName
* @return
* @throws IOException
*/
public static boolean isTableExist(String tableName) throws IOException {
return admin.tableExists(TableName.valueOf(tableName));
}

创建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 创建表
* @param tableName 表名
* @param cfs 列族名
* @throws IOException
*/
public static void createTable(String tableName, String... cfs) throws IOException {
// 判断是否存在列族信息
if (cfs.length <= 0) {
System.out.println("请设置列族信息");
return;
}
// 判断表是否存在
if (isTableExist(tableName)) {
System.out.println(tableName + "表已存在");
return;
}
// 创建表描述器
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
//添加列族信息
for (String cf : cfs) {
//创建列族描述器
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
//添加具体的列族信息
hTableDescriptor.addFamily(hColumnDescriptor);
}
//创建表
admin.createTable(hTableDescriptor);
}

删除表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 删除表
* @param tableName 表名
* @throws IOException
*/
public static void dropTable(String tableName) throws IOException {
// 判断表是否存在
if (!isTableExist(tableName)) {
System.out.println(tableName + "表不存在!");
return;
}
// 使表处于不可用状态
admin.disableTable(TableName.valueOf(tableName));
// 删除表
admin.deleteTable(TableName.valueOf(tableName));
}

创建命名空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 创建命名空间
* @param ns namespace 命名空间
*/
public static void createNameSpace(String ns) {
//创建命名空间描述器
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(ns).build();
// 创建命名空间
try {
admin.createNamespace(namespaceDescriptor);
} catch (NamespaceExistException e) {
System.out.println(ns + "命名空间已存在!");
} catch (IOException e) {
e.printStackTrace();
}
//System.out.println("虽然命名空间存在,但还是能够运行到这~");
}

插入值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 向表插入数据
* @param tableName 表名
* @param rowKey 行键
* @param cf 列族
* @param cn 列名
* @param value 值
* @throws IOException
*/
public static void putData(String tableName, String rowKey,
String cf, String cn, String value) throws IOException {
if (isTableExist(tableName)) {
// 获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
//创建Put对象
Put put = new Put(Bytes.toBytes(rowKey));
// 给Put对象赋值
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));
// 插入数据
table.put(put);
table.close();
} else {
System.out.println(tableName + "表不存在!");
}
}

使用 get 方式获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 获取数据(get)
* @param tableName 表名
* @param rowKey 行键
* @param cf 列族
* @param cn 列名
* @return
* @throws IOException
*/
public static Result getData(String tableName, String rowKey, String cf, String cn) throws IOException {

// 获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
//创建get对象
Get get = new Get(Bytes.toBytes(rowKey));
//指定获取的列族
//get.addFamily(Bytes.toBytes(cf));
// 指定列族中的列
get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
//设置获取数据的版本数
get.setMaxVersions(5);
//获取数据
Result result = table.get(get);
//解析Result
/*for (Cell cell : result.rawCells()) {
//打印数据
System.out.println("CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
",Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
}*/
//关闭表连接
table.close();
return result;
}

使用 scan 获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 获取数据(scan)
* @param tableName 表名
* @param filterList 过滤器
* @param isPrint 是否打印
* @return
* @throws IOException
*/
public static ResultScanner scanTable(String tableName, FilterList filterList, boolean isPrint) throws IOException {
//获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
//构建Scan对象 Scan范围扫描,左闭右开
// Scan scan = new Scan(Bytes.toBytes("1001"), Bytes.toBytes("1003"));
Scan scan = new Scan();
scan.setFilter(filterList);
//扫描表
ResultScanner resultScanner = table.getScanner(scan);
if (isPrint){
//解析ResultScanner
for (Result result : resultScanner) {
//解析result并打印
for (Cell cell : result.rawCells()) {
System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
",Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}
table.close();
return resultScanner;
}

删除数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 删除数据
* @param tableName 表名
* @param rowKey 行键
* @param cf 列族
* @param cn 列名
* @throws IOException
*/
public static void deleteData(String tableName, String rowKey, String cf, String cn) throws IOException {
// 获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
//构建删除对象
Delete delete = new Delete(Bytes.toBytes(rowKey));
//设置删除的列
// 注意 addColumn(慎用)与addColumns的区别
/**
* addColumn中指定时间戳,只会删除指定时间戳的数据
*
* */
//delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn));
//设置删除指定的列族
//delete.addFamily(Bytes.toBytes(cf));
//执行删除操作
table.delete(delete);
table.close();
}

过滤器

参考链接:【HBase】Java实现过滤器查询

删除表中数据

指定RowKey进行删除: 使用DeleteFamily标记,删除所有版本。如果传入了时间戳,就会删除小于等于指定时间戳的所有数据。

指定RowKey 与列族: 使用DeleteFamily标记,删除所有版本。如果传入了时间戳,就会删除小于等于指定时间戳的所有数据。

指定RowKey 、列族与列:

  • 使用addColumn方法(慎用):使用Delete标记,删除单个版本。如果不传入时间戳,则删除最大时间戳的数据;如果传入时间戳,只会删除指定时间戳的数据。在不同时间段(数据Flush前后)删除数据后,查询数据得到的结果不同。
  • 使用addColumns方法:使用DeleteColumn标记。如果传入了时间戳,就会删除小于等于指定时间戳的所有数据。