MongoDB分片存储的集群架构实现
如果需要存储大量数据,或者系统的读写吞吐量很大的时候,单个server就很难满足需求了。这个时候我们可以使用MongoDB的分片机制来解决这些问题。
分片的基本概念
分片(sharding)是一种水平扩展(horizontal scaling)的方式,把一个大的数据集分散到多个片服务器上,所有的片服务器将组成一个逻辑上的数据库来存储这个大的数据集。分片对用户(应用层)是透明的,用户不会知道数据很被存放到哪个片服务器上。
这种方式有两个好处:
- 分片之后,每个片服务器处理的请求数量将远远小于不分片的情况,这样就可以水平扩展整个分片集群的存储量和吞吐量
- 分片之后每个片服务器需要存储、维护的数据量将大大减少
分片以及分片集群
下面的图来自MongoDB文档,图片大概展示了分片的概念。1TB的数据被分配到四个片服务器上,四个片服务器组成了逻辑Collection I。
在MongoDB中,我们可以搭建分片集群(如下图)来实现数据分片的功能。分片集群中有三个基本部分组成:
- Mongos(路由服务器):是MongoDB中的路由进程,它将路由所有来自客户端的请求,分发到每个分片server;然后它将聚合各个分片server的结果,返回个客户端
- config server(配置服务器):该服务器存储了集群的配置信息:数据和片的对应关系。路由器将根据数据和分片的mapping,把数据操作分配到特定的分片上
- shard server(片服务器):用来保存子集合的数据容器。片服务器可以是单个的mongod服务器,也可以是一个副本集
片键
当设置分片时,需要从集合里面选择一个或几个键,把选择出来的键作为数据拆分的依据,这个键叫做片键或复合片键。
选好片键后,MongoDB将不允许插入没有片键的文档,但是允许不同文档的片键类型不一样。
块(chunk)
在一个shard server内部,MongoDB还是会把数据分为chunks,每个chunk代表这个shard server内部一部分数据。chunk的产生,会有以下两个用途:
- Splitting:
- 当一个chunk的大小超过配置中的chunk size时,MongDB的后台进程会把这个chunk切分成更小的chunk,从而避免chunk过大的情况
- Balancing:
- 在MongoDB中,balancer是一个后台进程,负责chunk的迁移,从而均衡各个shard server的负载 </ul> </li> </ul>
- 启动分片中需要的MongDB实例,在这里为了简单,片服务器使用单个MongoDB实例,而不是副本集
启动配置服务器
mongod.exe --dbpath="C:\mongodb\db\config" --port 11110
启动路由服务器
mongos.exe --port=11111 --configdb=127.0.0.1:11110
启动片服务器
mongod.exe --dbpath="c:\mongodb\db\shard0" --port=22220 mongod.exe --dbpath="c:\mongodb\db\shard1" --port=22221
- 添加片服务器
通过MongoDB shell连接到mongos实例,并且通过以下命令添加
mongo.exe 127.0.0.1:11111 use admin db.runCommand({"addshard":"127.0.0.1:22220",allowLocal:true}) db.runCommand({"addshard":"127.0.0.1:22221",allowLocal:true})
- 启动分片,并且设置片键
mongos> use admin switched to db admin mongos> db.runCommand({"enablesharding":"test"}) { "ok" : 1 } mongos> db.runCommand({"shardcollection":"test.student","key":{"sid":1}}) { "collectionsharded" : "test.student", "ok" : 1 } mongos>
- 分片集群中所有的片服务器
- 分片集群中所有的数据库,以及哪些数据库已经启动分片
- 已分片数据库的主片和片键
- chunks表:通过这张表可以看到分片集群中的数据块
- databases表:表里显示了所有在分片集群中存储的数据库,同时可以看到那些数据库是启动分片的,并且指示出主分片
- settings表:这张表包含分片集群的配置项,其中块大小(chunk size)就是通过这个文件设置
- shards表:这张表包含了分片集群中的所有片
- 分片集群已经加入片服务器”127.0.0.1:22222″
- 分片集群中原有的需要分片的数据库(test)的数据被Balancing到新的片服务器上
- 新加入的片服务器上的数据库(school)没有开启分片
- 数据量很大单个存储服务器无法满需需求
- 系统中活动数据量的大小很快将要超过系统RAM的最大容量
- 系统有大量的读写请求,单个单个存储服务器不足以支持
分片集群的搭建
搭建
下面我们开始单键一个分片集群,来通过一些实践体验一下分片集群。
查看分片集群状态
通过下面的命令可以查看分片集群的状态,可以得到以下信息:
mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0000", "host" : "127.0.0.1:22220" }, { "_id" : "shard0001", "host" : "127.0.0.1:22221" } ], "ok" : 1 } mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0000", "host" : "127.0.0.1:22220" } { "_id" : "shard0001", "host" : "127.0.0.1:22221" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0000" } test.student shard key: { "sid" : 1 } chunks: shard0000 1 { "name" : { "$minKey" : 1 } } -->> { " sid " : { "$maxKey" : 1 } } on : shard0000 Timestamp(1, 0) mongos>
删除分片
在上面的输出中,可以看到以下输出语句。在所有的片服务器中,都会有一个主片,这个片的删除需要特殊的命令。同时,删除分片可能需要一定的时间,因为MongoDB会把要删除分片上的数据移动到其他的分片上。
{ "_id" : "test", "partitioned" : true, "primary" : "shard0000" }
首先,从简单的开始,先删除非主片shard0001,从输出可以看到,shard0001处于draining状态,表示数据正在转移,当需要迁移的数据量很大的时候,可能就需要等待一些时间
mongos> db.runCommand({"removeshard":"127.0.0.1:22221"}) { "msg" : "draining started successfully", "state" : "started", "shard" : "shard0001", "ok" : 1 } mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0000", "host" : "127.0.0.1:22220" }, { "_id" : "shard0001", "draining" : true, "host" : "127.0.0.1:22221" } ], "ok" : 1 } mongos>
再次使用删除操作,可以看到shard0001已经被删除
mongos> db.runCommand({"removeshard":"127.0.0.1:22221"}) { "msg" : "removeshard completed successfully", "state" : "completed", "shard" : "shard0001", "ok" : 1 } mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0000", "host" : "127.0.0.1:22220" } ], "ok" : 1 } mongos>
下面重新添加刚才删除的分片,这次尝试删除主分片,会遇到以下错误
mongos> db.runCommand({"removeshard":"127.0.0.1:22220"}) { "msg" : "draining started successfully", "state" : "started", "shard" : "shard0000", "note" : "you need to drop or movePrimary these databases", "dbsToMove" : [ "test" ], "ok" : 1 } mongos>
这时,使用moveprimary命令把shard0001设置为主分片
mongos> db.runCommand({"moveprimary":"test","to":"127.0.0.1:22221"}) { "primary " : "shard0001:127.0.0.1:22221", "ok" : 1 } mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0000", "draining" : true, "host" : "127.0.0.1:22220" } { "_id" : "shard0001", "host" : "127.0.0.1:22221" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0001" } test.student shard key: { " sid " : 1 } chunks: shard0001 1 { "name" : { "$minKey" : 1 } } -->> { " sid " : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 0) mongos>
然后就可以直接删除shard0000了
mongos> db.runCommand({"removeshard":"127.0.0.1:22220"}) { "msg" : "removeshard completed successfully", "state" : "completed", "shard" : "shard0000", "ok" : 1 } mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0001", "host" : "127.0.0.1:22221" } ], "ok" : 1 } mongos>
最后重新添加刚才删除的分片,将遇到以下错误,提示test数据库已经存在
mongos> db.runCommand({"addshard":"127.0.0.1:22220",allowLocal:true}) { "ok" : 0, "errmsg" : "can't add shard 127.0.0.1:22220 because a local database 'test' exists in another shard0001:127.0.0.1:22221" } mongos>
这时,就需要通过MongoDB shell连接到这个实例,删除test数据库。然后重新添加。
C:\mongodb\bin>mongo.exe --port=22220 MongoDB shell version: 2.4.6 connecting to: 127.0.0.1:22220/test > use test switched to db test > db.dropDatabase() { "dropped" : "test", "ok" : 1 } >
mongos> db.runCommand({"addshard":"127.0.0.1:22220",allowLocal:true}) { "shardAdded" : "shard0002", "ok" : 1 } mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0001", "host" : "127.0.0.1:22221" }, { "_id" : "shard0002", "host" : "127.0.0.1:22220" } ], "ok" : 1 } mongos>
在这个部分中可以看到,通过上面三个步骤,一个简单的分片集群就建立起来了。下面我们开始使用这个分片集群。
分片集群的管理
首先切换到config数据库,看看都有哪些表
mongos> use config switched to db config mongos> show collections changelog chunks collections databases lockpings locks mongos settings shards system.indexes tags version mongos>
然后看看下面几张表中的数据
mongos> db.chunks.find() { "_id" : "test.student-sid_MinKey", "lastmod" : Timestamp(1, 0), "lastmodEpoch" : ObjectId("548c02d79355a9edbf5f0193"), "ns" : "test.studen t", "min" : { "sid" : { "$minKey" : 1 } }, "max" : { "sid" : { "$maxKey" : 1 } }, "shard" : "shard0002" } mongos> db.databases.find() { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } mongos> db.settings.find() { "_id" : "chunksize", "value" : 1 } mongos> db.shards.find() { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } mongos>
默认的chunk size是64MB,为了方便看到下面平衡(Balancing)的过程,我们重设chunk size为1MB
mongos> use config switched to db config mongos> db.settings.save({ _id:"chunksize", value: 1}) mongos> db.settings.find() { "_id" : "chunksize", "value" : 1 } mongos>
分片集群的使用
基本测试
基于前面创建的分片集群,我们进行一些测试,首先插入10W条文档,然后查询分片集群的状态。从下面的输出中可以看到,10W条文档一共使用了10个chunk,2个chunk在shard0002上,8个chunk在shard0001上。
mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } test.student shard key: { "sid" : 1 } chunks: shard0002 2 shard0001 8 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 0 } on : shard0002 Timestamp(2, 1) { "sid" : 0 } -->> { "sid" : 6673 } on : shard0002 Timestamp(1, 3) { "sid" : 6673 } -->> { "sid" : 20765 } on : shard0001 Timestamp(2, 2) { "sid" : 20765 } -->> { "sid" : 34839 } on : shard0001 Timestamp(2, 4) { "sid" : 34839 } -->> { "sid" : 48913 } on : shard0001 Timestamp(2, 6) { "sid" : 48913 } -->> { "sid" : 63011 } on : shard0001 Timestamp(2, 8) { "sid" : 63011 } -->> { "sid" : 74942 } on : shard0001 Timestamp(2, 10) { "sid" : 74942 } -->> { "sid" : 86877 } on : shard0001 Timestamp(2, 12) { "sid" : 86877 } -->> { "sid" : 98836 } on : shard0001 Timestamp(2, 14) { "sid" : 98836 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 15) { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } mongos>
经过一段时间后,再次查询分片集群的状态,可以看到,经过Balancing过程,所有的chunk基本要均匀的分布到了两个片服务器中。
MongoDB这种平衡的能力能够对各个片服务器进行负载均衡,但是如果数据特别大话则可能会非常的慢,因为数据迁移了会很大。
mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } test.student shard key: { "sid" : 1 } chunks: shard0002 5 shard0001 5 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 0 } on : shard0002 Timestamp(2, 1) { "sid" : 0 } -->> { "sid" : 6673 } on : shard0002 Timestamp(1, 3) { "sid" : 6673 } -->> { "sid" : 20765 } on : shard0002 Timestamp(3, 0) { "sid" : 20765 } -->> { "sid" : 34839 } on : shard0002 Timestamp(4, 0) { "sid" : 34839 } -->> { "sid" : 48913 } on : shard0002 Timestamp(5, 0) { "sid" : 48913 } -->> { "sid" : 63011 } on : shard0001 Timestamp(5, 1) { "sid" : 63011 } -->> { "sid" : 74942 } on : shard0001 Timestamp(2, 10) { "sid" : 74942 } -->> { "sid" : 86877 } on : shard0001 Timestamp(2, 12) { "sid" : 86877 } -->> { "sid" : 98836 } on : shard0001 Timestamp(2, 14) { "sid" : 98836 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 15) { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } mongos>
当然,也可以通过下面命令查看某个collection的数据在分片中的分布情况
mongos> db.student.getShardDistribution() Shard shard0001 at 127.0.0.1:22221 data : 4.28MiB docs : 51087 chunks : 5 estimated data per chunk : 878KiB estimated docs per chunk : 10217 Shard shard0002 at 127.0.0.1:22220 data : 4.1MiB docs : 48913 chunks : 5 estimated data per chunk : 840KiB estimated docs per chunk : 9782 Totals data : 8.39MiB docs : 100000 chunks : 10 Shard shard0001 contains 51.08% data, 51.08% docs in cluster, avg obj size on shard : 88B Shard shard0002 contains 48.91% data, 48.91% docs in cluster, avg obj size on shard : 88B mongos>
对现有数据库分片
现在假设我们有一个”127.0.0.1:22222″的MongDB实例,我们想对上面已有的数据进行分片。
首先插入测试数据
use school for(var i=0;i<100000;i++){ var randAge = parseInt(5*Math.random()) + 20; var gender = (randAge%2)?"Male":"Female"; db.student.insert({"sid":i, "name":"Will"+i, "gender": gender, "age": randAge}); }
然后,在现有的分片集群中加入”127.0.0.1:22222″作为一个片服务器,从下面的输出可以看到:
mongos> use admin switched to db admin mongos> db.runCommand({"addshard":"127.0.0.1:22222",allowLocal:true}) { "shardAdded" : "shard0003", "ok" : 1 } mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } { "_id" : "shard0003", "host" : "127.0.0.1:22222" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } test.student shard key: { "sid" : 1 } chunks: shard0003 3 shard0002 4 shard0001 3 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 0 } on : shard0003 Timestamp(7, 0) { "sid" : 0 } -->> { "sid" : 6673 } on : shard0002 Timestamp(7, 1) { "sid" : 6673 } -->> { "sid" : 20765 } on : shard0002 Timestamp(3, 0) { "sid" : 20765 } -->> { "sid" : 34839 } on : shard0002 Timestamp(4, 0) { "sid" : 34839 } -->> { "sid" : 48913 } on : shard0002 Timestamp(5, 0) { "sid" : 48913 } -->> { "sid" : 63011 } on : shard0003 Timestamp(6, 0) { "sid" : 63011 } -->> { "sid" : 74942 } on : shard0003 Timestamp(8, 0) { "sid" : 74942 } -->> { "sid" : 86877 } on : shard0001 Timestamp(8, 1) { "sid" : 86877 } -->> { "sid" : 98836 } on : shard0001 Timestamp(2, 14) { "sid" : 98836 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 15) { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } { "_id" : "school", "partitioned" : false, "primary" : "shard0003" } mongos>
当我们使用以下命令开启school的分片时,会得到一个错误,提示我们sid没有索引。
根据MongDB文档,所有分片的集合在片键上都必须建索引,这是MongoDB自动执行的。所以,对已有的数据库加入分片集群中的时候,可能就需要手动建立索引了。
根据这一点我们也可以得到,片键最好是查询中经常用到的字段,因为如果选择某个字段作为片键但是基本不在这个字段做查询那么等于浪费了一个索引,而增加一个索引总是会使得插入操作变慢。
mongos> db.runCommand({"enablesharding":"school"}) { "ok" : 1 } mongos> db.runCommand({"shardcollection":"school.student","key":{"sid":1}}) { "proposedKey" : { "sid" : 1 }, "curIndexes" : [ { "v" : 1, "key" : { "_id" : 1 }, "ns" : "school.student", "name" : "_id_" } ], "ok" : 0, "errmsg" : "please create an index that starts with the shard key before sharding." }
回到刚才的错误,我们现在手动建立一个sid的索引,然后重新设置片键,这次成功了,并且school中的数据已经被Balancing到各个片服务器上。
mongos> use school switched to db school mongos> db.student.getIndexes() [ { "v" : 1, "key" : { "_id" : 1 }, "ns" : "school.student", "name" : "_id_" } ] mongos> db.student.ensureIndex({"sid":1}) mongos> use admin switched to db admin mongos> db.runCommand({"shardcollection":"school.student","key":{"sid":1}}) { "collectionsharded" : "school.student", "ok" : 1 } mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } { "_id" : "shard0003", "host" : "127.0.0.1:22222" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } test.student shard key: { "sid" : 1 } chunks: shard0003 3 shard0002 4 shard0001 3 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 0 } on : shard0003 Timestamp(7, 0) { "sid" : 0 } -->> { "sid" : 6673 } on : shard0002 Timestamp(7, 1) { "sid" : 6673 } -->> { "sid" : 20765 } on : shard0002 Timestamp(3, 0) { "sid" : 20765 } -->> { "sid" : 34839 } on : shard0002 Timestamp(4, 0) { "sid" : 34839 } -->> { "sid" : 48913 } on : shard0002 Timestamp(5, 0) { "sid" : 48913 } -->> { "sid" : 63011 } on : shard0003 Timestamp(6, 0) { "sid" : 63011 } -->> { "sid" : 74942 } on : shard0003 Timestamp(8, 0) { "sid" : 74942 } -->> { "sid" : 86877 } on : shard0001 Timestamp(8, 1) { "sid" : 86877 } -->> { "sid" : 98836 } on : shard0001 Timestamp(2, 14) { "sid" : 98836 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 15) { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } { "_id" : "school", "partitioned" : true, "primary" : "shard0003" } school.student shard key: { "sid" : 1 } chunks: shard0001 6 shard0002 5 shard0003 6 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 5957 } on : shard0001 Timestamp(2, 0) { "sid" : 5957 } -->> { "sid" : 11915 } on : shard0002 Timestamp(3, 0) { "sid" : 11915 } -->> { "sid" : 17873 } on : shard0001 Timestamp(4, 0) { "sid" : 17873 } -->> { "sid" : 23831 } on : shard0002 Timestamp(5, 0) { "sid" : 23831 } -->> { "sid" : 29789 } on : shard0001 Timestamp(6, 0) { "sid" : 29789 } -->> { "sid" : 35747 } on : shard0002 Timestamp(7, 0) { "sid" : 35747 } -->> { "sid" : 41705 } on : shard0001 Timestamp(8, 0) { "sid" : 41705 } -->> { "sid" : 47663 } on : shard0002 Timestamp(9, 0) { "sid" : 47663 } -->> { "sid" : 53621 } on : shard0001 Timestamp(10, 0) { "sid" : 53621 } -->> { "sid" : 59579 } on : shard0002 Timestamp(11, 0) { "sid" : 59579 } -->> { "sid" : 65537 } on : shard0001 Timestamp(12, 0) { "sid" : 65537 } -->> { "sid" : 71495 } on : shard0003 Timestamp(12, 1) { "sid" : 71495 } -->> { "sid" : 77453 } on : shard0003 Timestamp(1, 12) { "sid" : 77453 } -->> { "sid" : 83411 } on : shard0003 Timestamp(1, 13) { "sid" : 83411 } -->> { "sid" : 89369 } on : shard0003 Timestamp(1, 14) { "sid" : 89369 } -->> { "sid" : 95327 } on : shard0003 Timestamp(1, 15) { "sid" : 95327 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0003 Timestamp(1, 16) mongos>
总结
通过本篇文章了解了分片的基本概念,同时搭建了一个简单的分片集群,进行了一些基本测试。
对于什么时候要使用分片这个问题,可以主要有下面几个参考点:
来源:田小计划