数据库分区分片框架

jopen 11年前

序言

一直在做企业应用,目前要做一些互联网应用,当然只是应用是放在互联网的,数据量距离真正的互联网应用还是有相当大的差距的。但是不可避免的,在数据库出现瓶颈的情况还是有的,现在做互联网上的应用,当然也要未雨绸缪,要考虑数据量大的时候的解决方案。

这个目前开源的商用的也都有不少解决方案,一来,做技术的都有这么个臭毛病,即使是使用别人的方案,自己也要搞清楚内部的一些实现机制,这样才会有真正的体会,否则去评估一个方案的时候,就只能盲人摸象了。

为此,构建一个验证型的分布式数据库框架,来解决数据库的垂直与水平扩展方面的问题,由于是验证性开发,所以,思考不完善的地方肯定存在,欢迎批评指正。

提升数据库处理能力方案

读写分离方案

海量数据的存储及访问,通过对数据库进行读写分离,来提升数据的处理能力。读写分离它的方案特点是数据库产生多个副本,数据库的写操作都集中到一个数据库上,而一些读的操作呢,可以分解到其它数据库上。这样,只要付出数据复制的成本,就可以使得数据库的处理压力分解到多个数据库上,从而大大提升数据处理能力。

  • 优点:由于所有的数据库副本,都有数据的全拷贝,因此所有的数据库特性都可以实现,部分机器当机不影响系统的使用。
  • 缺点:数据的复制同步是一个问题,要么采用数据库自身的复制方案,要么自行实现数据复制方案。需要考虑数据的迟滞性,一致性方面的问题。

数据分区方案

原来所有的数据都是在一个数据库上的,网络IO及文件IO都集中在一个数据库上的,因此CPU、内存、文件IO、网络IO都可能会成为系统瓶颈。而分区的方案就是把某一个或某几张相关的表的数据放在一个独立的数据库上,这样就可以把CPU、内存、文件IO、网络IO分解到多个机器中,从而提升系统处理能力。

  • 优点:不存在数据库副本复制,性能更高。
  • 缺点:分区策略必须经过充分考虑,避免多个分区之间的数据存在关联关系,每个分区都是单点,如果某个分区宕机,就会影响到系统的使用。

数据分表方案

不管是上面的读写分离方案还是数据分区方案,当数据量大到一定程度的时候,都会导致处理性能的不足,这个时候就没有办法了,只能进行分表处理。也就是把数据库当中数据根据按照分库原则分到多个数据表当中,这样,就可以把大表变成多个小表,不同的分表中数据不重复,从而提高处理效率。

  • 优点:数据不存在多个副本,不必进行数据复制,性能更高。
  • 缺点:分表之间的数据很少进行集合运算;分表都是单点,如果某个分表宕机,如果使用的数据不在此分表,不影响使用。

分表也有两种方案:

1. 同库分表:所有的分表都在一个数据库中,由于数据库中表名不能重复,因此需要把数据表名起成不同的名字。

  • 优点:由于都在一个数据库中,公共表,不必进行复制,处理更简单
  • 缺点:由于还在一个数据库中,CPU、内存、文件IO、网络IO等瓶颈还是无法解决,只能降低单表中的数据记录数。表名不一致,会导后续的处理复杂。

2. 不同库分表:由于分表在不同的数据库中,这个时候就可以使用同样的表名。

  • 优点:CPU、内存、文件IO、网络IO等瓶颈可以得到有效解决,表名相同,处理起来相对简单
  • 缺点:公共表由于在所有的分表都要使用,因此要进行复制、同步。

混合方案

通过上面的描述,我们理解了读写分离,数据分区,数据分表三个解决方案,实际上都各有优点,也各有缺 ,因此,实践当中,会把三种方案混合使用。由于数据不是一天长大的,实际上,在刚开始的时候,可能只采用其中一种方案,随着应用的复杂,数据量的增长,会逐步采用多个方案混合的方案。以提升处理能力,避免单点。

实现路线分析

正所谓条条大路通罗马,解决这个问题的方案也有多种,但究其深源,都可以归到两种方案之上,一种是对用户透明的方案,即用户只用像普通的JDBC数据源一样访问即可,由框架解决所有的数据访问问题。另外一种是应用层解决,具体一般是在Dao层进行封装。

JDBC层方案

  • 优点:开发人员使用非常方便,开发工作量比较小;可以实现数据库无关。
  • 缺点:框架实现难度比较大,性能不一定能做到最优。

同样是JDBC方案,也有两种解决方案,一种是有代理模式,一种是无代理模式。

有代理模式,有一台专门的代理服务器,来接收用户请求,然后发送请求给数据库集群中的数据,并对数据进行汇集后再提交给请求方。

无代理模式,就是说没有代理服务器,集群框架直接部署在应用访问端。

有代理模式,能够提供的功能更强大,甚至可买提供中间库进行数据处理,无代理模式处理性能较强有代理模式少一次网络访问,相对来说性能更好,但是功能性不如有代理模式。

DAO层方案

  • 优点:开发人员自由度非常大,性能调优更精准。
  • 缺点:开发人员在一定程度上受影响,与具体的Dao技术实现相关,较难做到数据库无关。

由于需要对SQL脚本进行判断,然后进行路由,因此DAO层优化方案一般都是选用iBatis或Spring Jdbc Template等方案进行封装,而对于Hibernate等高度封装的OR映射方案,实现起来就非常困难了。

需求

需求决定了后续的解决方案及问题领域:

  • 采用JDBC层解决方案:对于最终用户来说,要完全透明
  • 采用无代理解决方案:数据库集群框架代码直接放在应用层
  • 支持读写分离、分区、分表三种方式及其混合使用方式:三种方式可以混用可以提供极大的灵活性及对未来的扩展性
  • 需要提供灵活的分区及分表规则支持
  • 对于读写分离的方案,需要提供灵活的路由规则,比如:平均路由规则、加权路由规则,可以提供写库的备用服务器,即主写入服务器当机之后,即可写入备用服务器当中。
  • 支持高性能分布式主键生成器
  • 有良好的集群事务功能
  • 可以通过扩展点来对框架进行扩展,以便于处理分区、分表相关的操作。
  • 支持各种类型支持JDBC驱动的数据库
  • 支持异构数据库集群
  • 支持count、sum、avg、min、max等统计函数
  • 支持排序
  • 支持光标移动
  • 支持结果集合并

明确不支持的内容或限定条件:

  • 不支持分区之间的联合查询
  • 主键不支持自增长型,必须调用分布式主键生成器来生成
  • 对于与分区分表相关的处理,如果框架没有实现,则需要根据框架接口扩展自行实现

结构设计

框架采用三层设计:最上层是Cluster,一个Cluster相当于我们常规的一个数据库;一个Cluster当中可以包含一到多个Partition,也就是分区;而一个Partition中可以包含一到多个Shard,也就是分片。

所以一个就形成了一个树状结构,通过Cluster->Partion->Shard就构成了整个数据库集群。但是对于开发人员来说,实际上并不知道这个内部结构,他只是连接上了一个JDBC数据源,然后做它应该做的事情就可以了。

Cluster

以完整的形态对外提供服务,它封装了Cluster当中所有Partition及其Shard的访问。把它打开是一个数据库集群,对于使用者来说是一个完整的数据库。

属性名

类型

说明

id

String

集群标识

userName

String

连接集群时的用户名

Password

String

连接集群时的密码

dataSources

List<DataSourceConfig>

集群中需要访问的数据源列表

partitions

List<Partition>;

集群中包含的分区列表

Partition

分区,分区有两种模式,一种是主从模式,用于做读写分离;另外一种模式是分片模式,也就是说把一个表中的数据分解到多个表中。一个分区只能是其中的一种模式。但是一个Cluster可以包含多个分区,不同的分区可以是不同的模式。

属性名

类型

说明

id

String

分区标识

mode

int

分区类型,可以是主从,也可以是分表

Password

String

连接集群时的密码

shards

List<Shard>

分区中包含的分片列表

partitionRules

List<PartitionRule>

分区规则,当进行处理的时候,路由到哪个分区执行

Shard

Shard与一个物理的数据源相关联。

属性名

类型

说明

id

String

分区标识

dataSourceId

String

实际访问的数据库配置ID

readWeight

int

读权重,仅用于主从读写分离模式

writeWeight

int

写权重,仅用于主从读写分离模式

shardRules

List<ShardRule>

分片规则,当进行处理的时候,路由到哪个分片执行,仅用于分模式

tableMappings

List<TableMapping>;

表名映射列表,仅用于同库不同表名分表模式

分布式主键接口

/**    * 分布式Key获取器    *    * @param <T>    */    public interface ClusterKeyGenerator<T> {    T getKey(String tableName);    }

主键接口可以用来生成各种主键类型,如:字符串、整型、长整型,入口参数必须是表名,框架已经实现了字符串、整型、长整型的分布式高效主键生成器,当然,也可以自行实现。

集群管理器

public interface ClusterManager {    /**    * 返回是否是分片语句    *    * @param partition    * @param sql    * @return    */    boolean isShardSql(Partition partition, String sql);    /**    * 添加语句处理器    *    * @param statementProcessor    */    void addStatementProcessor(StatementProcessor statementProcessor);    /**    * 返回语句处理器列表    *    * @return    */    List<StatementProcessor> getStatementProcessorList();    /**    * 给某个集群的数据表产生主键    *    * @param cluster    * @param tableName    * @param <T>    * @return    */    <T> T getPrimaryKey(Cluster cluster, String tableName);    /**    * 返回SQL对应的Statement    *    * @param sql    * @return    */    Statement getSqlStatement(String sql);    /**    * 添加集群    *    * @param cluster    */    void addCluster(Cluster cluster);    /**    * 获取集群    *    * @param clusterId    * @return    */    Cluster getCluster(String clusterId);    /**    * 返回某个分区与sql是否匹配    *    * @param partition    * @param sql    * @return    */    boolean isMatch(Partition partition, String sql);    /**    * 返回某个分片是否匹配    *    * @param shard    * @param sql    * @return    */    boolean isMatch(Partition partition, Shard shard, String sql);    /**    * 返回分片执行语句    *    * @param partition    * @param shard    * @param sql    * @return    */    String getSql(Partition partition, Shard shard, String sql);    /**    * 获取匹配的分区<br>    *    * @param clusterId    * @param sql    * @return    */    Collection<Partition> getPartitions(String clusterId, String sql);    /**    * 获取匹配的首个分区    *    * @param clusterId    * @param sql    * @return    */    Partition getPartition(String clusterId, String sql);    /**    * 获取匹配的首个分区    *    * @param cluster    * @param sql    * @return    */    Partition getPartition(Cluster cluster, String sql);    /**    * 获取匹配的分区    *    * @param cluster    * @param sql    * @return    */    List<Partition> getPartitions(Cluster cluster, String sql);    /**    * 获取匹配的分片    *    * @param partition    * @param sql    * @return    */    List<Shard> getShards(Partition partition, String sql);    /**    * 返回分片均衡器    *    * @return    */    ShardBalance getShardBalance();    /**    * 设置分片均衡器    *    * @param balance    */    void setShardBalance(ShardBalance balance);    }

分区规则

/**    * 分区规则接口<br>    * 规则参数在实现类中定义    *    */    public interface PartitionRule {    /**    * 返回是否命中,如果有多个命中,则只用第一个进行处理    *    * @param sql    * @return    */    boolean isMatch(String sql);    }

框架自带了常用分区规则,但是也可以根据情况自已扩展。

分片规则

/**    * 分片规则    *    */    public interface ShardRule {    /**    * 返回是否属于当前分片处理    *    * @param sql    * @return    */    boolean isMatch(Partition partition, String sql);    }

框架自带了常用的分片规则,但是也可以根据情况自已扩展

语句处理器

/**    * 用于对SQL进行特殊处理并进行结果合并等<br>    * <p/>    * 比如sql语句是select count(*) from abc<br>    * 则会到所有的shard执行,并对结果相加后返回    *    */    public interface StatementProcessor {    /**    * 返回是否由此SQL处理器进行处理    *    * @param sql    * @return    */    boolean isMatch(String sql);    /**    * 返回处理器转换过之后的SQL    *    * @param sql    * @return    */    String getSql(String sql);    /**    * 对结果进行合并    *    * @param results    * @return    * @throws SQLException    */    ResultSet combineResult(List<ResultSet> results) throws SQLException;    }

在分片之后,许多时候单纯查找回的数据已经是不正确的了,这个时候就需要进行二次处理才能保证返回的结果是正确的。

比如:用户输入的SQL语句是:Select count(*) from aaa

这个时候就会用分片指令到各个分片去查找并返回结果,默认的处理结果是简单合并结果集的方式。这个时候,如果有5个分片,会返回5条记录给最终用户,这当然不是他想要的结果。这个时候就是语句处理器大显身手的时候了,他可以偷梁换柱,也可以改头换面,通过它的处理,就可以返回正确的结果了。

JDBC层封装

当然,要想外部程序用JDBC的方式进行访问,就得做JDBC层的实现。这个部分做了大量的处理,使得即高效又与用户期望的方式相匹配。

可以说上面所有的准备都是为了这一层做准备的,毕竟最终要落到真正的数据库访问上。由于接口就是标准的JDBC接口,因此就不再详述。

事务问题

在分区或分表模式中,由于写操作会被分解到不同的物理数据库上去,这就会导致出现事务问题。因此框架内部集成了JTA,使得事务保持一致。

代码实现

没什么好说的,噼里啪啦,噼里啪啦,一阵乱响,代码就绪了,下面看看测试场景。

测试用例

JDBC方式访问集群

Class.forName("org.tinygroup.dbcluster.jdbc.TinyDriver");    Connection conn = DriverManager.getConnection("jdbc:dbcluster://cluster1", "username", "password");    Statement stmt = conn.createStatement();    stmt.execute(“select * from aaa”);

代码解释:

上面完全是按照JDBC的方式访问数据库的,url必须以“jdbc:dbcluster://”开始,后面跟着的是集群的ID名称,上面示例中就是“cluster1”;用户名、密码必须与集群中配置的相一致。接下来就与普通的jdbc数据源没有任何区别了。

同库分表

创建测试表

在同一个数据库中创建同样结构的表,比如:

CREATE TABLE `aaa0` (    `id` int(11) NOT NULL,    `aaa` varchar(45) DEFAULT NULL,    PRIMARY KEY (`id`)    );    CREATE TABLE `aaa1` (    `id` int(11) NOT NULL,    `aaa` varchar(45) DEFAULT NULL,    PRIMARY KEY (`id`)    );    CREATE TABLE `aaa2` (    `id` int(11) NOT NULL,    `aaa` varchar(45) DEFAULT NULL,    PRIMARY KEY (`id`)    );

测试代码:

public static void main(String[] args) throws Throwable {    Class.forName("org.tinygroup.dbcluster.jdbc.TinyDriver");    Connection conn = DriverManager.getConnection("jdbc:dbcluster://cluster1", "username", "password");    Statement stmt = conn.createStatement();    String sql;    //插入100条数据    for (int i = 0; i < 100; i++) {    sql = "insert into aaa(id,aaa) values (" + clusterManager.getPrimaryKey(cluster, "aaa") + ",'ppp')";    boolean result = stmt.execute(sql);    }    }

运行结果:

Using shard:shard1 to execute sql:insert into aaa(id,aaa) values (1,'ppp')    Using shard:shard2 to execute sql:insert into aaa(id,aaa) values (2,'ppp')    Using shard:shard0 to execute sql:insert into aaa(id,aaa) values (3,'ppp')    Using shard:shard1 to execute sql:insert into aaa(id,aaa) values (4,'ppp')    Using shard:shard2 to execute sql:insert into aaa(id,aaa) values (5,'ppp')    Using shard:shard0 to execute sql:insert into aaa(id,aaa) values (6,'ppp')    …….

可以看出,插入的数据确实分到了三个分片中。

再用Select语句查找插入的数据:

public static void main(String[] args) throws Throwable {    Class.forName("org.tinygroup.dbcluster.jdbc.TinyDriver");    Connection conn =    DriverManager.getConnection("jdbc:dbcluster://cluster1", "username", "password");    Statement stmt = conn.createStatement();    String sql = "select * from aaa order by id";    ResultSet resultSet = stmt.executeQuery(sql);    while (resultSet.next()) {    System.out.printf(" id: %d, aaa: %s \n", resultSet.getInt(1), resultSet.getString(2));    }    }

运行结果如下:

Using shard:shard0 to execute sql:select * from aaa order by id    Using shard:shard1 to execute sql:select * from aaa order by id    Using shard:shard2 to execute sql:select * from aaa order by id    id: 1, aaa: ppp    id: 2, aaa: ppp    id: 3, aaa: ppp    id: 4, aaa: ppp    id: 5, aaa: ppp    id: 6, aaa: ppp    ……

从上面的结果可以看到,明显已经合并了结果并且是按顺序显示的

接下来,把测试的数据删除掉:

public static void main(String[] args) throws Throwable {    Class.forName("org.tinygroup.dbcluster.jdbc.TinyDriver");    Connection conn = DriverManager.getConnection("jdbc:dbcluster://cluster1", " username ", "password");    Statement stmt = conn.createStatement();    String sql = "delete from aaa";    stmt.execute(sql);    }

运行结果如下:

Using shard:shard0 to execute sql:delete from aaa    Using shard:shard1 to execute sql:delete from aaa    Using shard:shard2 to execute sql:delete from aaa

再去数据库中查看,数据确实已经被删除。

不同库分表

在不同的数据库中创建同样结构的表,比如:

CREATE TABLE test0. aaa (    `id` int(11) NOT NULL,    `aaa` varchar(45) DEFAULT NULL,    PRIMARY KEY (`id`)    );    CREATE TABLE test1. aaa(    `id` int(11) NOT NULL,    `aaa` varchar(45) DEFAULT NULL,    PRIMARY KEY (`id`)    );    CREATE TABLE test2. aaa(    `id` int(11) NOT NULL,    `aaa` varchar(45) DEFAULT NULL,    PRIMARY KEY (`id`)    );

测试用例同同库分表,结果测试同样OK。

读写分离

插入与删除等比较简单,就不再展示了,下面看看读指令的执行过程。

public static void main(String[] args) throws Throwable {    Class.forName("org.tinygroup.dbcluster.jdbc.TinyDriver");    Connection conn =    DriverManager.getConnection("jdbc:dbcluster://cluster1", "username", "password");    Statement stmt = conn.createStatement();    for (int i = 1; i <= 100; i++) {    boolean result = stmt.execute(“select * from aaa”);    }    }

运行结果:

Using shard:shard3 to execute sql:select * from aaa    Using shard:shard2 to execute sql:select * from aaa    Using shard:shard1 to execute sql:select * from aaa    Using shard:shard1 to execute sql:select * from aaa    Using shard:shard1 to execute sql:select * from aaa    Using shard:shard1 to execute sql:select * from aaa    Using shard:shard2 to execute sql:select * from aaa    Using shard:shard1 to execute sql:select * from aaa    Using shard:shard1 to execute sql:select * from aaa    Using shard:shard1 to execute sql:select * from aaa    Using shard:shard1 to execute sql:select * from aaa    Using shard:shard1 to execute sql:select * from aaa    Using shard:shard3 to execute sql:select * from aaa

可以看到,读的SQL已经由三个分片进行了均衡执行。

记录集遍历

对于ResultSet的遍历,也有良好的支持,对于各种移动光标的方法都有支持,而且支持排序的移动,同时对于性能也有良好支持,性能接近于单表操作。

下面展示一下绝对定位:

Class.forName("org.tinygroup.dbcluster.jdbc.TinyDriver");    Connection conn = DriverManager.getConnection(    "jdbc:dbcluster://cluster1", "luog", "123456");    Statement stmt = conn.createStatement();    String sql = "select * from aaa order by id";    ResultSet resultSet = stmt.executeQuery(sql);    resultSet.absolute(10);    System.out.printf(" id: %d, aaa: %s \n", resultSet.getInt(1),    resultSet.getString(2));    while (resultSet.next()) {    System.out.printf(" id: %d, aaa: %s \n", resultSet.getInt(1),    resultSet.getString(2));    }

运行结果:

Using shard:shard0 to execute sql:select * from aaa order by id    Using shard:shard1 to execute sql:select * from aaa order by id    Using shard:shard2 to execute sql:select * from aaa order by id    id: 10, aaa: ppp    id: 11, aaa: ppp    id: 12, aaa: ppp    id: 13, aaa: ppp    id: 14, aaa: ppp    id: 15, aaa: ppp    id: 16, aaa: ppp    id: 17, aaa: ppp    id: 18, aaa: ppp    id: 19, aaa: ppp    …….

可以看到确实是从第10条开始显示。

总结

分区分片通用解决方案,确实有相当的通用性,支持各种数据库,提供了非常大的灵活性,支持多种集群单独或混合使用的场景,同时还可以保持数据访问的事务一致性,为用户的访问提供与JDBC一样的用户接口,这也会大大降低开发人员的开发难度。基本上(违反需求中指定的限制条件的除外)可以做到原有业务代码透明访问,降低了系统的迁移成本。同时它在性能方面也是非常杰出的,与原生的JDBC驱动程序相比,性能没有显著降低。当然它的配置也是非常简单的,学习成本非常低。由于做在JDBC层,因此可以对Hibernate,iBatis等各种框架有良好支持。