No-SQL数据库中的事务性设计—VLCP中的ObjectDB简要介绍

etfilm 8年前
   <p><img src="https://simg.open-open.com/show/b44ce9e372cb7255c2ca70b9610c7fbb.png"></p>    <p>之前的一篇文章 <a href="/misc/goto?guid=4959725676388322695" rel="nofollow,noindex">No-SQL数据库中的事务性设计</a> 中,我们简单介绍了一种在No-SQL数据库中实现事务性的方法,这种方法也是VLCP内置的ObjectDB模块的基本原理。ObjectDB是VLCP的核心模块之一,它将一切配置存储、状态返回、分布式协调、事务性的问题都统一到了同一组数据库风格的接口上,同时,它可以对下对接实现了相应接口的任意存储(目前支持Redis和ZooKeeper),只需要实现相对简单的一组无状态接口即可。</p>    <p>VLCP是一个开源的基于协程模型的异步IO框架, 它是一个开源SDN控制器,不过同时也可以用于快速的Web开发,尤其适合编写微服务,协程对于高并发和协调一致性的良好支持、ObjectDB对于数据一致性的高度抽象,再加上配置子系统、“搭积木”式的模块管理、以及一些称手的小工具,可以让你在几个小时之内完成一个简单系统的开发测试部署工作。题外话,前段时间刚刚为公司内部的kibana写了一个提供权限控制的拦截器,通过介入HTTP请求的方式控制不同用户能够看到的页面内容,包含了用户权限控制的API——使用VLCP框架,仅仅用了5个小时的时间就完成了开发、测试到上线,甚至还自带了无状态、高可用的时髦能力。</p>    <p>VLCP的项目地址是 <a href="/misc/goto?guid=4959725676484490928" rel="nofollow,noindex"> https:// github.com/hubo1016/vlc p </a></p>    <h2>ObjectDB的设计目标与保证</h2>    <p>ObjectDB的接口,在上一篇文章提到的WALK和MUPDATE的基础上,做了一部分改进,最重要的一点是实现了数据的 <strong>主动推送</strong> 机制: <strong>允许用户获取一组数据,之后保持对这些key上的数据的监听,随时得到最新的值,并在数据发生变化时收到推送通知。</strong> 这个功能的灵感来源于OVSDB,不过保持更新的方法上,ObjectDB要简单得多,而且ObjectDB充分利用了VLCP的协程设计,对编程更加友好。</p>    <p>ObjectDB也扩展了对事务一致性的保证,将这个保证扩展到了整个程序中的不同协程,以及数据推送上:</p>    <ol>     <li>在整个程序中,任意时间点上,所有协程对同一个key获取到的数据都是完全相同的同一份;这意味着即使是不同批次WALK(MGET)得到的数据,也可以随时保持事务一致性。</li>     <li>同一个MUPDATE事务写入的多个key,总是被同时推送到任意一个客户端上,并被一致性地更新,因此客户端以任意方式获得的数据,都永远不会看到它们出现事务不一致的情况。当等待更新通知时,如果多个key是在同一个事务中被更新的,可以保证等待这个通知的协程只会收到一次通知。</li>    </ol>    <p>这个保证,加上前一篇文章中论述的事务性的保证,加在一起构成了非常强的一致性保证,实际上可以以任意的方式来使用和修改数据,而不用担心出现任何形式的Race Condition,也不需要增加复杂的锁机制。我将这种一致性称为 <strong>视图一致性</strong> 。后端存储中存储的数据的量可能很大,任意时刻,我们都可以选择监控这个后端存储中的一个子集的数据,这些数据以事务一致的方式展现在整个程序中的所有协程面前,并且随时跟随其他客户端的写入而更新,同时这个选定的子集也可以随着我们要求的变化而随时扩展或缩小。对于许多应用来说,这是梦寐以求的理想模型。</p>    <p>设计这样严格的指标,主要是因为VLCP的设计目标:SDN控制器。基于OpenFlow的SDN控制器,需要随时随着网络拓扑的变化,将实时的流表下发到OpenFlow交换机中,实时性、一致性、正确性、稳定性都是最基本的要求,同时,也要求一定程度的扩展性,保证性能随着网络拓扑复杂性的增加而不急剧下降(这就要求视图可以只选择需要的子集)。</p>    <h2>ObjectDB的接口简介</h2>    <p>在VLCP中,与其他服务一样,ObjectDB以一个独立模块的方式发布(参考 <a href="/misc/goto?guid=4959725676557475118" rel="nofollow,noindex"> https:// zhuanlan.zhihu.com/p/21 813920?refer=sdnnetwork </a> ),调用ObjectDB的接口也使用统一的ModuleAPI的风格。也就是说,在任意协程中调用ObjectDB的API,都是通过callAPI的方式:</p>    <pre>  <code class="language-python">for m in callAPI(container, 'objectdb', method_name, method_params):      yield m  result = container.retvalue</code></pre>    <p>关于VLCP的协程编程模型可以参考(VLCP协程框架简介)</p>    <p><strong>存储类型</strong></p>    <p>首先是ObjectDB中存储的数据类型的约定,ObjectDB通常使用JSON序列化的方式存储数据(也可以通过配置选择改用pickle),为了方便起见,所有存储的对象都是DataObject类的一个子类,类似于下面的方式定义:</p>    <pre>  <code class="language-python">from vlcp.utils.dataobject import DataObject    class PhysicalNetwork(DataObject):      _prefix = 'viperflow.physicalnetwork'      _indices = ("id",)</code></pre>    <p>它需要声明_prefix和_indices两个属性。ObjectDB本质上来说仍然是KVDB,每个数据都会关联到一个唯一的key上,这个key的构造方式是_prefix + _indices中的属性值,比如说上面定义的结构,如果有某个PhysicalNetwork对象,它的id属性的值是my_vlan_network,那么这个对象在存储的时候就会自动选择viperflow.physicalnetwork.my_vlan_network作为自己的key,以后获取的时候也需要用这个key来获取。除此以外的属性可以任意赋值,只要这些属性值可以正确被JSON序列化就可以,包括字符串、数字、列表、字典(只能以字符串为key)、元组(会转换为列表)以及定义了jsonencode/jsondecode方法的其他对象(这是VLCP规定的一种简单的JSON序列化协议)</p>    <p>基本上来说,我们定义的类型可以存进任意的数据,只要组织成合适的形式就可以,也很容易在以后进行扩展。</p>    <p><strong>事务性的级联查询——walk方法</strong></p>    <p>ObjectDB基本的查询方法是walk方法,这是个很有特色的方法,与基于SQL的数据库截然不同,很适合查询有关联性的多个key。它的接口声明是这样的:</p>    <pre>  <code class="language-python">def walk(self, keys, walkerdict, requestid, nostale = False):          "Recursively retrieve keys with customized functions. walkerdict is a dictionary key->walker(key, obj, walk, save)."</code></pre>    <p>它是我们上一篇文章中论述的WALK原语的一种贴近实际应用的变种。比起上一篇文章中输入单一的walker,walker接受keys、values作为参数,这里的walk方法,接受一个字典,其中指定对于每个key使用哪个walker。这样的好处是我们可以针对不同的数据类型写不同的walker,然后在一个walk过程中将多个walker组合起来。同时,将大的walker拆分成比较小的walker,也可以提高执行效率。keys是要初始获取的列表,它可以比walkerdict中指定的key更多,这样如果我们确信walker会在后续过程中使用某个key,就可以提前将它获取回来,提高执行效率。</p>    <p>字典中传入的walker应当是以下形式的函数:</p>    <pre>  <code class="language-python">def my_walker(key, value, walk, save):      ...</code></pre>    <p>key和value是初始的key和对应的(DataObject类型的)值。walk和save是两个从外部传入的函数,调用walk方法可以获取某个新的key的值,而调用save方法可以告知系统某个key是需要返回的。当调用walk方法时,有可能会抛出KeyError,表示目前这个key的值还没有取回,一般我们需要捕获这个异常,暂时停止进一步的walk过程。典型的写法如下:</p>    <pre>  <code class="language-python">def _walk_lognet(self, key, value, walk, save):          save(key)          if value is None:              return          if self._parent.prepush:              # Acquire all logical ports              try:                  netmap = walk(LogicalNetworkMap.default_key(value.id))              except KeyError:                  pass              else:                  save(netmap.getkey())                  for logport in netmap.ports.dataset():                      try:                          p = walk(logport.getkey())                      except KeyError:                          pass                      else:                          #if p is not None and hasattr(p, 'mac_address') and hasattr(p, 'ip_address'):                          save(logport.getkey())</code></pre>    <p>注意到我们可以充分利用Python的语言特性,这个walker可以是某个类的方法,也可以是一个闭包,其中可以使用外部的参数值。一般我们在walk外面使用try结构,处理掉KeyError,当没有返回KeyError的时候,使用save来保存成功获取的值。在walk的同时,可以通过获取到的对象的值来进行筛选,只返回需要的部分。这样就可以实现很复杂的级联查询。</p>    <p>我们常见的带有JOIN和WHERE的SQL语句可以很容易转换成walker,并且更容易理解。比如说:</p>    <pre>  <code class="language-python">SELECT * FROM student JOIN grade ON student.grade = grade.id  WHERE student.name = "LiLei"</code></pre>    <p>我们在关系型数据库中有grade和student两个表,在student的grade字段上建立外键。在ObjectDB中,这种查询可以以更自由的方式进行:</p>    <pre>  <code class="language-python">class Student(DataObject):      _prefix = "school.student"      _indices = ("name",)    class Grade(DataObject):      _prefix = "school.grade"      _indices = ("id",)    def query_student(name, container, requestid):      def _walker(key, value, walk, save):          # Start with Student          if value is None:              # Student is not found              return          save(key)          if hasattr(value, 'grade'):              # Query grade              try:                  grade = walk(Grade.default_key(value.grade))              except KeyError:                  # Not retrieved, will automatically retry                  pass              else:                  save(grade.getkey())      student_key = Student.default_key(name)      for m in callAPI(container, 'objectdb', 'walk',              {'keys': (student_key,),               'walkerdict': {student_key: _walker},               'requestid': requestid}):          yield m      # Return values are: (saved_keys, saved_values)      # There is no guarentees for the order      keys, values = container.retvalue      if not keys:          raise ValueError('Student is not found')      elif len(keys) == 1:          return (values[0],)      elif values[0].isinstance(Student):  # Notice, not isinstance(values[0], Student)          return (values[0], values[1])      else:          return (values[1], values[0])</code></pre>    <p>当然,对于这种比较简单的外键,后面我们会提到一个更简单的方案。</p>    <p>requestid参数与监听功能有关。它是一个由调用方提供的值,必须是不可变的,可以是字符串、数字、元组甚至对象。当walk成功返回的时候,所有返回的key会被加入到监控列表中,并标记由requestid监控。当这个数据不再需要监听最新值的时候,可以通过unwatch方法取消监控。通常,我们会使用一个和当前功能有关的字符串(比如模块名 + 方法名),加上一个递增的计数器,构成一个元组,来生成一个唯一的requestid,这样比uuid快一些。</p>    <p>注意walker函数可能被多次调用,最终结果是由最后一次调用的结果来决定的。一般我们需要保证walker函数的幂等性——使用相同的数据进行多次调用,产生的结果是相同的。如果某次调用中walker抛出了异常,整个walk方法会失败,并且抛出相应的异常——当出现问题的时候,通过这种方法中断一个查询也是很不错的。</p>    <p>另外,如果walk返回了None,或者value的初始值就是None,表示当前数据库中不存在这个key。不存在的key也同样可以通过save()返回并监控,在这种情况下,我们可以等待这个key被创建出来,并且得到相应的更新通知。</p>    <p>nostale方法控制当数据库连接中断时,是否允许返回已经缓存的结果(而非最新结果)。默认情况下,当数据库连接不可用时,接口会返回当前已经缓存了的内容,来最大程度维持系统运行——但这也可能让系统因为过期的数据而产生不正常的行为。使用nostale参数要求接口在没有最新数据可用的情况下抛出异常。</p>    <p><strong>监控的取消——unwatch/munwatch</strong></p>    <pre>  <code class="language-python">def unwatch(self, key, requestid):          "Cancel management of a key"        def munwatch(self, keys, requestid):          "Cancel management of keys"</code></pre>    <p>这两个接口很简单直接,通过keys和requestid,指定取消某一组keys的监控。unwatch是munwatch的单个key的版本。</p>    <p>有的时候我们希望用完之后无论如何都取消监听,防止在出现异常的时候永远监听下去,可以用watch_context帮助方法:</p>    <pre>  <code class="language-python">from vlcp.utils.dataobject import watch_context    with watch_context(keys, values, requestid, container):      ...</code></pre>    <p>当以任意方式离开watch_context的范围(正常,或者抛出异常),会自动启动新的协程并且调用munwatch来正确取消对这些key的监听。</p>    <p><strong>简便方法——get,mget,getonce,mgetonce,watch,mwatch</strong></p>    <p>有时候我们需求比较简单,仅仅是获取一组key的值,并不需要walk进行级联查询,虽然这也可以通过一个立即save()的walk方法实现,但更简单的还是直接调用这组简便方法。所有m开头的接口接受多个key的元组作为参数,而非m开头的接受单个key。</p>    <pre>  <code class="language-python">def mget(self, keys, requestid, nostale = False):          "Get multiple objects and manage them. Return references to the objects."        def get(self, key, requestid, nostale = False):          "Get an object from specified key, and manage the object. Return a reference to the object or None if not exists."        def mgetonce(self, keys, nostale = False):          "Get multiple objects, return copies of them. Referenced objects are not retrieved."        def getonce(self, key, nostale = False):          "Get a object without manage it. Return a copy of the object, or None if not exists. Referenced objects are not retrieved."        def watch(self, key, requestid, nostale = False):          "Try to find an object and return a reference. Use reference.isdeleted() to test whether the object exists. "\          "Use reference.wait(container) to wait for the object to be existed."        def mwatch(self, keys, requestid, nostale = False):          "Try to return all the references, see watch()"</code></pre>    <p>参数的意义与walk中相似。它们之间有一些细微的差异,watch/mwatch与walk的特性最为接近,相当于对其中每个key执行立即save()的操作,如果这个key对应的值不存在,会返回一个空的数据引用(ReferenceObject,后面会提到),可以通过这个引用接收key的更新通知,在key被创建出来时收到通知。get/mget在key对应数据存在时与watch的行为一致,但如果key对应的值不存在,会返回None,并且不会监控相应的值。getonce/mgetonce有比较不一样的特性,它会立即返回这组key当前值的一个副本,并不会监控相应的值,这个副本也不会随着视图进行更新,这在需要快速测试某些key的当前值、或者调试时有用。</p>    <p><strong>检查监控列表——watchlist</strong></p>    <pre>  <code class="language-python">def watchlist(self, requestid = None):          "Return a dictionary whose keys are database keys, and values are lists of request ids. Optionally filtered by request id"</code></pre>    <p>通常用于调试,可以用来观察当前监控的key的列表,以及它们分别被哪些requestid保留着。VLCP中所有的ModuleAPI,都可以通过vlcp.service.manage.webapi.WebAPI模块导出到Web接口中,这样需要的时候可以随时查看当前ObjectDB的工作状态,比如假如在8181端口上开启了WebAPI:</p>    <pre>  <code class="language-python">curl http://localhost:8181/objectdb/watchlist | python -m json.tool</code></pre>    <p><strong>写入数据——transact</strong></p>    <pre>  <code class="language-python">def transact(self, keys, updater, withtime = False):          "Try to update keys in a transact, with an updater(keys, values), which returns (updated_keys, updated_values). "\          "The updater may be called more than once. If withtime = True, the updater should take three parameters: "\          "(keys, values, timestamp) with timestamp as the server time"</code></pre>    <p>基本上来说是上一篇文章中的MUPDATE原语的翻版。它需要传入一个updater函数作为参数,这个函数描述了需要进行的事务操作,它接受keys,values两个参数,如果withtime=True,还接受一个额外的timestamp参数,代表来自于数据库服务器的时间戳,单位是微秒(对ZooKeeper来说实际精度只有毫秒)。使用timestamp可以保证在任意客户端上使用的时间都是连续的数据库服务器时间,防止因为客户端本地时间的差异而导致顺序问题。通常会选择使用timestamp来实现超时逻辑:在写入数据时,同时在某个字段中记录超时时间;在读取数据时,如果发现超时,则忽略,同时删除已经超时的数据。再配合定时器不断更新超时时间,可以实现一个可靠的发现(Discover)协议。</p>    <p>一个常见的updater写法的例子:</p>    <pre>  <code class="language-python"># Using network driver together with IPAM driver      rets = []      def _ipam_stage(keys, values, timestamp):          reservepool = values[0]          removed_keys = self._remove_staled_pools(reservepool, timestamp)          poolids = [poolid for poolid, (cidr, _) in reservepool.reserved_pools.items()                     if cidr == request_cidr]          if not poolids:              raise ValueError('Pool %r is not reserved by VLCP IPAM plugin' % (request_cidr,))          docker_ipam_poolid = poolids[0]          rets[:] = [docker_ipam_poolid]          removed_keys.append(IPAMReserveMarker.default_key(reservepool.reserved_pools[docker_ipam_poolid][0]))          del reservepool.reserved_pools[docker_ipam_poolid]          return ((keys[0],) + tuple(removed_keys), (reservepool,) + (None,) * len(removed_keys))      for m in callAPI(self, 'objectdb', 'transact', {'keys': (IPAMPoolReserve.default_key(),),                                                      'updater': _ipam_stage,                                                      'withtime': True}):          yield m      self.retvalue = (rets[0],)</code></pre>    <p>它的参数和返回值都包含(keys, values)的键值对,这个结构是比较适合级联的,可以在已有的updater外层包上新的逻辑,添加新的key和value,增加额外的功能,这样可以很容易将多个updater合并成为一个transact过程。updater传入的keys的顺序与transact方法接收到的keys的顺序相同,而values的顺序与keys一一对应。updater返回的keys的列表与传入的keys的列表不需要有关联,既可以修改传入的keys,也可以直接修改没有传入的keys。如果updater对某个key返回None作为相应的value,表示从数据库中删除这个key。</p>    <p>updater可以很容易地替代关系型数据库中的BEGIN/COMMIT之间的多条语句,以非常自由、一致、Pythonic的方式进行事务操作。它的局限性是必须提前指定需要进行update的key的列表,不过这可以通过上一篇文章中提到的WRITEWALK的逻辑,用walk配合update来完成(目前还没有相应的writewalk接口,以后可能会增加)。updater一般通过闭包的形式提供,它也可以使用可变对象或者Python3中的nonlocal变量额外返回一些值(比如上面例子中的rets列表就是用来返回值的)</p>    <p>一次update会自动创建一次事务写入,并且推送这些key的更新到已经监听了这些key的客户端上,从而将自己的更改推送到所有的其他客户端的视图中。如果在updater中抛出异常,这个异常会中断事务的执行,这种情况下不会写入任何内容,异常会抛出到transact方法的调用方,通过这种方式可以安全地中断一个事务的执行,比如发现不满足事务的执行条件的情况(相当于T-SQL中的Rollback)。</p>    <p><strong>等待更新——DataObjectUpdateEvent</strong></p>    <p>当ObjectDB第一次获取某个key、或者收到某个key的更新通知、或者从数据库连接断开的状态下恢复时,ObjectDB会在完成对这个key的获取后,发送DataObjectUpdateEvent,通过接收并处理这个Event,其他协程可以等待某个或者某些特定key的更新。一般来说不需要直接处理这个事件,而是通过以下三个方法之一进行处理:</p>    <pre>  <code class="language-python">for m in refobj.wait(container):      yield m  for m in refobj.waitif(container, expr):      yield m  for m in refobj.waitif(container, expr, nextchange = True):      yield m  for m in multiwaitif(references, container, expr):      yield m  for m in multiwaitif(references, container, expr, nextchange = True):      yield m</code></pre>    <p>wait()和waitif()方法可以等待单个的对象的状态变化。wait()方法等待一个被删除的对象重新创建起来。waitif()方法通过传入一个expr函数,来指定等待的条件:expr是一个接受单个参数的函数,通常是个lambda表达式,它的第一个参数接受更新之后的新的值,如果expr(refobj)返回真值则停止等待,否则继续等待;如果使用lambda x: x.isdeleted(),就可以等待一个对象直到它被删除。</p>    <p>multiwaitif是waitif的多个key的版本,它的第一个参数是需要同时等待的多个对象,expr是接受两个参数的函数:</p>    <pre>  <code class="language-python">def my_expr(references, updated_refs):      ...</code></pre>    <p>第一个参数是references列表,可以用来获取最新的值;第二个是这一次更新的值的列表,可以用来判断哪些值发生了更新,哪些值没有发生更新。</p>    <p>nextchange参数控制等待的行为,如果nextchange = True,则忽略当前状态,至少等待到下一次变化再返回。这种情况下可以用waitif(container, lambda x: True, True)来等待任意的变化。</p>    <h2>技术细节</h2>    <p><img src="https://simg.open-open.com/show/425cb32685a3fe072f345b9b71f54a78.png"> ObjectDB模块使用上图的结构进行搭建。对外,它提供了一组ModuleAPI;而内部来说,它也同样依赖更为底层的模块,这些模块中,负责KVStorage的模块提供了最基础的MGET和MUPDATE方法(参考上篇文章) ,它可以是RedisDB与ZooKeeperDB中的一个;Notifier提供了一个订阅/发布接口,可以让ObjectDB将数据更新的消息推送到其他客户端,底层实现目前是Redis的Pub/Sub或ZooKeeper的WATCH机制,但也可以选择其他实现,从逻辑上来说,KVStorage和Notifier可以无关。</p>    <p><strong>写入过程</strong></p>    <p>写入的过程基本遵循前一篇文章中的方法,区别在于在MUPDATE成功执行完成之后,会通过Notifier模块执行一个PUBLISH过程。这个PUBLISH过程会将这一次更新的所有的key的列表,打包成一个消息,推送给其他客户端;于是收到消息的客户端会同时收到这一批写入的所有key的列表,保证使用一个MGET将它们取回,从而保证取回数据的视图一致性。</p>    <p>当数据库中有非常多的key的时候,我们通常不希望收到所有的通知,而是只收到与我们监控的key有关的通知,这样在共用一套数据库的客户端很多的时候,不至于所有的写入都发送都其他客户端上,引起性能下降。为了实现这个目标,我们会将这一次更新的keys列表发送到每个key命名的channel上,这样只有订阅了这个key的客户端才会收到通知,多个key的情况下可能会收到一次更新的多个通知,会有一个去重复的机制。但是,如果一次更新的key过多,由于要发送的数据量是key * key的量级,会造成严重的性能问题,因此当一次更新的key过多时,会选择发送到一个特殊的全局channel上,一次通知所有的客户端,这也是一种权衡。</p>    <p>注意到PUBLISH在MUPDATE成功之后执行,这意味着如果更新通知到达,我们一定可以获取到至少是这一次更新时的数据,但是,我们也可能:直接获取到了更加新的数据;或者,在更新通知到达之前,就获取到了这一次更新的数据。后续读取过程中会充分考虑到这些因素。</p>    <p><strong>读取过程</strong></p>    <p>读取的过程与前一篇文章中描述的方法略有区别,由于数据更新推送的需求,ObjectDB中有一层有状态的逻辑。具体来说,所有取回的数据,会在当前进程中保存一个全局唯一的副本,这个副本叫做数据镜像,它是实际的后端存储中相应数据的实时镜像。实际返回给调用方的则是指向这个数据的引用(通过ReferenceObject类提供,其中通过__getattr__方法将属性的获取重定向到原始的DataObject中)。</p>    <p>当有新的walk方法调用,或者有来自Notifier的更新通知时,ObjectDB会开启一个新的更新循环,在循环结束之前,新的更新通知、新的walk方法都会统一由这个循环进行执行。在这个过程中,新循环会将获取到的新数据,暂存在一个临时数据层中,暂时不更新到数据镜像中,直到循环执行完毕,再将临时数据层中的数据更新到当前数据镜像层中,形成一个新的视图一致的数据镜像,同时返回之前查询的结果。</p>    <p>具体来说,循环中每次获取回来的最新数据,与之前保存的临时数据,也分到不同的层中,如下图所示:</p>    <p><img src="https://simg.open-open.com/show/0dac31a4ec72548edbe3580c628af7ed.png"> <strong>数据空间划分</strong></p>    <p>在整个循环过程中,由于walker可能不能一次成功,同时也可能有新的请求、新的更新加入,因此每次获取到的数据的列表不完全一致。最简单的保证视图一致性的方法是每次都将当前临时数据层中所有的key,再加上新的需要获取的key,全部通过一个MGET获取回来,但这样当同时执行的walker很多时,每次需要重新获取的key的数量就会很多,对性能有负面影响;变通的方法是,对于每个独立的walker,我们都让它在某个部分一致性的子集中执行即可。</p>    <p>上图中,我们将最新更新到的数据,与之前更新的数据+已经缓存的数据镜像中的数据,拆分成两个数据空间。如果最新获取到的数据中包含了某个key,我们就在现有数据中屏蔽这个key,不允许walker访问。执行walker时,我们会根据第一个key的所在空间,来确定walker需要执行的空间:</p>    <p>如果当前walker使用的是旧数据,则让walker执行过程中,只允许完全使用旧数据,如果试图取回新获取的数据,则失败;</p>    <p>如果当前walker使用的是新数据,则让walker执行过程中,只允许使用上一次取回的数据,如果试图使用此范围外的数据,则失败。</p>    <p>一旦某个walker失败,我们就将walker在执行过程中用到的所有的key,都加入到下一次要取回的列表中。这样一旦某个walker执行中遇到了需要更新的数据,它就会一直在最新数据空间中执行,直到成功一次为止。</p>    <p>这样我们可以保证按我们的顺序获取数据时,walker永远都在一致的数据中执行。</p>    <p><strong>完整逻辑</strong></p>    <p>整个循环的逻辑可以归纳如下:</p>    <ol>     <li>如果有新请求,将新的walker加入到walkers集合;</li>     <li>当前正在等待处理的更新通知 加入到 update_list集合;</li>     <li>对update_list中所有的key订阅更新通知(在获取值之前订阅更新通知,这样可以保证不遗漏后续更新)</li>     <li>对update_list中所有的key,执行MGET,取回一致的结果;</li>     <li>对walkers中所有的walker,依起始的key所在空间选择执行的数据空间,然后依次执行,如果walker请求了不存在于当前数据空间中的key(包括尚未取回,和不在当前数据空间的情况),将这个walker所有用到的key加入update_list集合</li>     <li>将MGET取回的结果更新到update_result(临时数据层)中</li>     <li>如果update_list集合为空,且没有正在等待处理的更新通知,转到8,否则回到1</li>     <li>将update_result更新到managed_result(数据镜像层)中,并且依照managed_result中的数据,依次返回给每个walk请求;对update_result中所有的key,发出数据已变更的通知,并且将所有的通知标记为同一个批次,这样多个key收到的更新通知会被去重。</li>    </ol>    <p>这个逻辑的实现非常依赖于VLCP的协程特性,通过协程调度可以高效地让消息在不同协程之间传递,而不依赖各种各样的锁,这也是ObjectDB得以成立的一个重要原因。</p>    <h2>额外功能</h2>    <p>为了使用方便,ObjectDB在上一篇文章的基础上,增加了许多额外的辅助功能,这些功能内部都是依赖上篇文章中的WALK/MUPDATE逻辑进行的,使用这些功能可以大大简化常规的数据操作,降低walker/updater编写的复杂性</p>    <p><strong>自动引用、ReferenceObject与键值的分拆</strong></p>    <p>前面我们举过一个Student与Grade的关联查询的例子。在许多场景中,我们存储的数据对象都以外键的形式关联到了其他的key,如果每次都需要手写walker来取回这些key是很无趣的,幸运的是ObjectDB提供了一种机制来快速创建外键,并且自动将外键指向的对象取回。</p>    <p>ReferenceObject是ObjectDB使用的内部类型之一,它代表一个对实际DataObject的引用。实际上ObjectDB几乎所有接口,除了getonce返回DataObject的副本以外,其他都返回ReferenceObject。wait(), waitif()也是ReferenceObject的接口。除了这些接口外,对ReferenceObject的属性的获取都会被转到相应的DataObject中。</p>    <p>除了作为ObjectDB的返回值,ReferenceObject的另一项作用是可以保存在某个DataObject中,表示一个当前DataObject的外键。当我们用getonce以外的任意方法(get, mget, walk)返回这个key对应的值的时候,如果这个值包含了ReferenceObject,这些ReferenceObject指向的值也会被自动取回,并且监听;如果包含了ReferenceObject的key停止监听,则这些被自动引用的key也会停止监听。这样就大大简化了外键的使用。</p>    <p>回到前面的Student和Grade的例子中,现在我们可以简单这样做:</p>    <pre>  <code class="language-python">from vlcp.utils.dataobject import updater, set_new    def create_student(name, gradeid, container):      @updater      def _updater(student, grade):          student = set_new(student, Student.create_instance(name))          student.grade = grade.create_reference()          return (student,)      for m in callAPI(container, 'objectdb', 'transact',                      {'keys': (Student.default_key(name),                                Grade.default_key(gradeid)),                       'updater': _updater}):          yield m    def get_student(name, container, requestid):      for m in callAPI(container, 'objectdb', 'get',                       {'key': Student.default_key(name),                        'requestid': requestid}):          yield m      student = container.retvalue      print "Student %r is in grade %r" % (student.name, student.grade.id)</code></pre>    <p>updater注解是书写updater的一个帮助方法,它可以将keys, values形式传入的数据,通过*values的形式传递给参数列表,这样就提高了代码可读性。不过使用这种形式书写的updater必须从左到右写入结果,返回的值的列表会从左到右依次匹配原始的key。set_new会判断原来的值,如果非空则立即报错,否则替换成新的值,在想要创建新的值又要避免因为主键重复而覆盖旧的值的时候很方便。</p>    <p>取回的Student对象中,Student的grade是个ReferenceObject,这个ReferenceObject指向的Grade对象会自动被取回,其中的属性也可以直接通过student.grade.id的方式进行访问。如果取回的对象还有其他ReferenceObject,这些对象也可以自动再次取回,这样很容易取回一整个级联的结果。</p>    <p>内部实现上,ObjectDB会自动为每个key添加一个取回所有Reference的walker,因此一致性的保证与直接使用walk方法是一样的。</p>    <p>对于非常大、包含非常多属性的键值来说,维护起来是比较困难的,每次修改都需要重新读取再重新写入整个值,而且只能整体监听,不能只监听一部分属性。这时候将键值拆分成多个键会是个好主意。使用ReferenceObject的方法,甚至可以在不影响使用的情况下,将一部分键值拆分到独立的key当中,实现多个key之间的共享,或者独立的更新通知。</p>    <p><strong>自动删除关联键</strong></p>    <p>这也是传统的关系型数据库中的一个功能,当有外键关联时,如果删除外键相应的行,则自动删除关联行。在上一节中我们也提到了分拆键值的问题,但是如果分拆出去的键值不能随着主要的键值删除而删除,我们就可能在数据库中残留数据。好在ObjectDB提供了同等的自动删除键值的功能:</p>    <pre>  <code class="language-python">class LogicalPortVXLANInfo(DataObject):      _prefix = 'viperflow.logicalportvxlaninfo'      _indices = ('id',)      def __init__(self, prefix=None, deleted=False):          super(LogicalPortVXLANInfo, self).__init__(prefix=prefix, deleted=deleted)          self.endpoints = []    LogicalPort._register_auto_remove('LogicalPortVXLANInfo', lambda x: [LogicalPortVXLANInfo.default_key(x.id)])</code></pre>    <p>使用_register_auto_remove方法,可以在已经定义的DataObject类型上,添加一个附加的自动删除功能。它有两个参数,第一个是这个自动删除过程的名称,可以用来调试,相同名称只会保留一个;第二个参数是一个函数(lambda表达式),它接受已经定义的旧类型的对象作为参数,然后返回当这个对象删除时,需要一起删除的key的列表。这些会被一起删除的key如果也注册了auto_remove功能,会进一步触发,从而一次删除更多的key。当然,这些删除动作全部都是在一个事务中完成的,如果某一步出错,整个删除过程都会回滚,因此要么都删除,要么都不删除。也不用担心自动删除会有循环引用的问题,可以放心让两个key互相关联为自动删除的关系,保证其中一个删除时另一个一定跟着删除。</p>    <p>从内部实现上来说,这是通过在用户传入的updater外侧级联系统的updater来实现的。</p>    <p><strong>WeakReference与DataObjectSet</strong></p>    <p>WeakReference对象不带有自动引用的功能,它可以用来单纯保存一个key,但不自动取回,这样就防止了取回的对象数量太多的问题。可以使用walk方法来取回需要的key。</p>    <p>DataObjectSet是一个预定义的用来保存WeakReference的集合类型,可以在其中添加或者删除WeakReference,起到保存对象集合的目的。可以用find方法按照保存对象的主键(_indices)进行查找或者筛选。</p>    <p><strong>自动的反向索引</strong></p>    <p>这是一个非常重要的功能。除了使用外键进行索引以外,我们还经常需要按照对象的属性值来反查对象,为了加速这个查找过程,经常需要对对象的属性创建索引,这个索引叫做反向索引(也叫倒排索引)。一般对于No-SQL来说,创建这种索引是需要代码实现的,但对于ObjectDB来说,只需要简单声明就可以了:</p>    <pre>  <code class="language-python">class LogicalPort(DataObject):      _prefix = 'viperflow.logicalport'      _indices = ("id",)      _unique_keys = (('_mac_address_index', ('network', 'mac_address')),                      ('_ip_address_index', ('network', 'ip_address')))</code></pre>    <p>这个声明的方式与SQL中创建索引的方式极其相似,首先起一个索引的名称(不能和同一对象下的其他索引重复),然后声明需要使用的属性值。创建自动索引的时候,会自动形成这样一个key:</p>    <p>indices . DataObject的_prefix . 索引名称 . 属性值拼接成字符串</p>    <p>比如对于network = some_network, mac_address = 02:00:01:02:03:04的某个LogicalPort对象,它的_mac_address_index的索引就是:</p>    <pre>  <code class="language-python">indices.viperflow++logicalport._mac_address_index.some_network++02:00:01:02:03:04</code></pre>    <p>中间的++是字符转义的结果。实际上我们不用关心这些细节,简单用</p>    <pre>  <code class="language-python">mac_key = LogicalPort.unique_key('_ip_address_index', 'some_network', '02:00:01:02:03:04')</code></pre>    <p>就可以生成这个key,用于walk方法。返回对象是UniqueKeyReference类型,它的ref属性是原始DataObject的引用。</p>    <p>除了用于查找以外,unique key的另一重作用跟关系数据库中一样,如果发生了键值重复的情况,会阻止transact写入并且返回键值重复的错误,这样就可以很容易防止MAC地址重复、IP地址重复之类的情况发生。</p>    <p>允许重复的key可以使用_multi_keys来声明, 与_unique_keys类似,不过允许键值重复。multi_key()方法可以返回相应的索引的key,对应的值是MultiKeyReference类型,它的set属性是相应属性值的所有的对象的WeakReference构成的DataObjectSet。</p>    <p>每一个索引依不同属性值创建不同的key,如果想要按属性值的范围查找,比如说查找IP地址在192.168.1.100到192.168.1.200之间的对象,要怎么办呢?反向索引本身也有一个集合用来保存不同属性值的索引的WeakReference,可以用</p>    <pre>  <code class="language-python">UniqueKeyReference.get_keyset_from_key(mac_key)  MultiKeyReference.get_keyset_from_key(multi_key)</code></pre>    <p>返回这个集合的key,它对应的是UniqueKeySet/MultiKeySet对象,它的set属性是UniqueKeyReference/MultiKeyReference对象的WeakReference组成的DataObjectSet。</p>    <p>所有这些自动创建的索引,会随着DataObject的创建而创建,随着DataObject的删除而删除,可以像普通的key一样get、walk或者监听。内部实现上,它是通过在用户传入的updater外层,级联一层系统的updater来实现的。</p>    <p><strong>转化为可视结果</strong></p>    <p>使用vlcp.utils.dataobject.dump方法,可以简单地将DataObject对象转化为字典形式的描述,这个形式可以进一步转化为JSON,写入日志,或者使用pformat格式化成较为容易阅读的形式。</p>    <h2>ObjectDB与React模型、GraphQL等新技术</h2>    <p>一个稳定、可靠的系统,最佳的一种设计方式是这样的:</p>    <p>有一个单一、一致的状态 ;</p>    <p>所有其他输出都是这个状态的函数,状态变化则输出变化,状态相同则输出相同。</p>    <p>这样的系统可以很轻易地实现高可扩展性、高一致性,以及无状态、随时从任意状态中恢复等优良特性。但是实现上的要求比较高,比起传统的workflow的模型,系统需要能够感知状态的变化,跟随状态变化而进行改变。这种模型我称之为React模型。流行的ReactJS就是个不错的例子。</p>    <p>我们可以看到ObjectDB的推送模型就非常适合这种React模型。实际上,VLCP的SDN控制器逻辑基本就是基于React模型的,这样我们可以很容易保证系统从任意状态中恢复。</p>    <p>另一项新技术GraphQL,是一种REST API的替代,它可以在WebService接口上使用关联查询。我们可以看到,它几乎就可以对应ObjectDB的walk方法,如果使用ObjectDB的walk来实现,GraphQL的查询结果将可以保证事务一致性。当然并不是说就一定应当这么实现,因为还有性能上的考虑。</p>    <h2>结论</h2>    <p>VLCP中的ObjectDB是一个重磅级的设计,它完美满足了SDN开发的要求,由它可以开发出一个完全去中心、完全高可用、完全可扩展的系统,与它提供的实时性、一致性相比,现有其他SDN的定时抓取新数据之类的策略就像是个玩具,这也是VLCP作为产品级SDN控制器的牢不可破的一层保障。除了SDN以外,其他任何需要比较强的一致性,或者哪怕只是你懒得处理一致性问题、又懒得写SQL、懒得维护关系型数据库、懒得为每次数据结构变动写升级脚本的情况下,这都会是你一个应当考虑的选择,它会带给你非常多的惊喜。在分布一致性的系统中,需要进行分布式锁、节点发现、主节点竞选之类的过程的时候,也可以很容易派上用场。当底层使用ZooKeeper作为存储的时候,整个系统是高可用的,不会出现任何的单点故障,而且任意一点都可以随时断开或重启,并在重新连接或重新启动之后恢复。</p>    <p>ObjectDB也有它的缺点,与其他数据库方案相比,它的单机读写性能受到Python实现(单核心)的限制,比MySQL等数据库要低,多个事务要使用相同的key时,也会像其他数据库一样的性能下降。但在集群中,不相关的key的写入不会占用相同的资源,总的性能还是比较高的。它比较适合系统配置或者少量的协调一致性这样的并不频繁写入、但对一致性要求比较高的情形。由于数据镜像与主动推送机制的存在,当多个协程读取相同的数据的时候,缓存会生效,因此读取的性能比较好,适合写入少而读取多的系统。对于频繁写入但很少读取的系统则不是特别适用。</p>    <p> </p>    <p>来自:https://zhuanlan.zhihu.com/p/23747209</p>    <p> </p>