Elasticsearch 并发修改乐观锁
edph2008
9年前
来自: http://blog.csdn.net//jiao_fuyou/article/details/50482117
版本控制的一个例子
curl -XPOST http://localhost:9200/test/test/1 -d '{"msg": "test"}' { "_index": "test", "_type": "test", "_id": "1", "_version": 1, "created": true } curl -XPOST http://localhost:9200/test/test/1 -d '{"msg": "test"}' { "_index": "test", "_type": "test", "_id": "1", "_version": 2, "created": true } curl -XPOST http://localhost:9200/test/test/1?version=1 -d '{"msg": "test"}' { "error": "RemoteTransportException[[elasticsearch_226][inet[/172.16.18.226:9300]][indices:data/write/index]]; nested: VersionConflictEngineException[[test][0] [test][1]: version conflict, current [4], provided [1]]; ", "status": 409 }
提示:版本冲突
使用外部系统提供的版本号
Elasticsearch 的乐观锁,可以使用外部系统提供的版本号:
curl -XPOST http://localhost:9200/test/test/1?version=10&version_type=external -d '{"msg": "test"}' { "_index": "test", "_type": "test", "_id": "1", "_version": 10, "created": true } curl -XPOST http://localhost:9200/test/test/1?version=9&version_type=external -d '{"msg": "test"}' { "error": "VersionConflictEngineException[[test][0] [test][1]: version conflict, current [10], provided [9]]", "status": 409 }
这时Elasticsearch将只检查提供的版本是否比当前保存在索引中的版本大(大多少不重要),如果是成功,否则失败。
Elasticsearch内部版本号
在Elasticsearch中,更新请求实际上是分为两个阶段,获取文档,修改文档,然后保存文档。
那么当两个更新请求同时要修改文档的时候,系统乐观的认为不会有两个并发请求对一个系统操作。
文档原本的版本为1,请求A获取了version为1的文档,请求B也获取了version为1的文档,然后请求A修改完文档后,并且先执行了保存操作,这个时候,系统中的文档version变为了2。
这个时候,B再执行保存操作的时候,告诉系统我要修改version为1的文档。系统就会抛出一个错误,说文档版本不匹配。然后这个错误由应用程序自己来进行控制。
这种机制在请求量大的时候会比悲观锁机制好。但是缺点是需要程序处理版本冲突错误,可能一般的方法是封装更新操作,并且设置重复重试次数。
注意这里的更新指的是:
curl -XPOST http://localhost:9200/test/test/1/_update -d '{"msg": "test"}'
而不是:
curl -XPOST http://localhost:9200/test/test/1 -d '{"msg": "test"}'
这个是create,这种在ES内部没有乐观锁的概念,是直接覆盖,这种想解决数据被覆盖,就要显示的带着version
下面的程序,并发6个进程同时批量更新一个文档,在结果能看到有的请求出错了,返回版本冲突。
<?php require_once dirname(__FILE__).'/../shjf/vendor/autoload.php'; $client = new Elasticsearch\Client(['hosts' => ['172.16.18.226:9200']]); for($i=0;$i<5;$i++){ if (pcntl_fork() == 0) break; } $params=[ 'index' => 'test', 'type' => 'test', 'body' => [], ]; for($i=0;$i<1;$i++){ $params['body'][] = ['update' => ['_id' => 111]]; $params['body'][] = ["doc" => ["id" => $i]]; } try { $result = $client->bulk($params); var_dump($result) . "\n"; } catch(Exception $e) { echo $e->getMessage() . "\n"; $info = $client->transport->getLastConnection()->getLastRequestInfo(); print_r($info); exit; } echo "end\n"; ?> { "took":1, "errors":true, "items":[ {"update": { "_index":"test", "_type":"test", "_id":"111", "status":409, "error":"VersionConflictEngineException[[test][0] [test][111]: version conflict, current [3], provided [2]]"} } ] }
参考资料:
- Elasticsearch官网:
- 《Elasticsearch权威指南》处理冲突