优秀开源项目kombu源码分析之registry和entrypoint
EstBrumfiel
8年前
<p>我曾经在一些公众场合说过心中的优秀Python开发者。Flask和Requests的作者就不说了,21世纪最缺的就是idea,他们不仅有而且还都用非常优美的方式做出来了。另外我还提到了Celery作者Ask Solem,并不是因为Celery很有名它的主要作者就优秀了,我对ask的欣赏,完全是看Celery及其相关依赖的源代码的时候产生的。</p> <p>有多年后台开发的工程师想必清楚,Celery本身涉及到的技术点其实在业界应用是很广泛的。Celery能这么流行,我们先排除没有进行技术深入下的盲从,和它诞生的非常早以外,我认为这和项目的内部设计的非常好也是有关的。</p> <p>接下来的几篇文章我将分析Celery使用的Kombu库中的一些设计实现让大家对这个优秀项目更了解,并从中学习可扩展开发的实践。</p> <h3>Kombu是什么?</h3> <p>当一个项目变得越来越复杂,就要考虑只保留核心,并把其他部分分拆到不同的项目中以便减少未来的维护和开发的成本。Flask、IPython都是这样做的。</p> <p>Kombu是一个把消息传递封装成统一接口的库。 Celery一开始先支持的RabbitMQ,也就是使用AMQ协议。由于要支持越来越多的消息代理,但是这些消息代理是不支持AMQ协议的,需要一个东西把所有的消息代理的处理方式统一起来,甚至可以理解为把它们「伪装成支持AMQ协议」。Kombu的最初的实现叫做carrot, 后来经过重构才成了Kombu。</p> <h3>registry</h3> <p>registry也就是「注册」,有按需加入的意思,在Python标准库和一些优秀开源项目中都有应用。我们先看个django的 <a href="/misc/goto?guid=4959742673430297267" rel="nofollow,noindex">场景</a> ,为了减少篇幅我没有列出CheckRegistry类中其他方法:</p> <pre> <code class="language-python">### source code start from itertools import chain classCheckRegistry: def__init__(self): self.registered_checks = [] self.deployment_checks = [] defregister(self, check=None, *tags, **kwargs): kwargs.setdefault('deploy', False) definner(check): check.tags = tags if kwargs['deploy']: if check not in self.deployment_checks: self.deployment_checks.append(check) elif check not in self.registered_checks: self.registered_checks.append(check) return check if callable(check): return inner(check) else: if check: tags += (check, ) return inner deftag_exists(self, tag, include_deployment_checks=False): return tag in self.tags_available(include_deployment_checks) deftags_available(self, deployment_checks=False): return set(chain(*[check.tags for check in self.get_checks(deployment_checks) if hasattr(check, 'tags')])) defget_checks(self, include_deployment_checks=False): checks = list(self.registered_checks) if include_deployment_checks: checks.extend(self.deployment_checks) return checks registry = CheckRegistry() register = registry.register tag_exists = registry.tag_exists ### source code end @register('mytag', 'another_tag') defmy_check(apps, **kwargs): pass print tag_exists('another_tag') print tag_exists('not_exists_tag') </code></pre> <p>可以看到每次用registry.register都能动态的添加新的tag,最后还用 register = registry.register 这样的方式列了个别名。执行结果如下:</p> <pre> <code class="language-python">❯ python django_example.py True False </code></pre> <p>kombu库包含对消息的序列化和反序列化工作的实现,可以同时支持多种序列化方案,如pickle、json、yaml和msgpack。假如你从前没有写过这样可扩展的项目,可能想的是每种的方案的loads和dumps都封装一遍,然后用一个大的if/elif/else来控制最后的序列化如何执行。</p> <p>那么在kombu里面是怎么用的呢?我简化下它的 实现 :</p> <pre> <code class="language-python">import codecs from collections import namedtuple codec = namedtuple('codec', ('content_type', 'content_encoding', 'encoder')) classSerializerNotInstalled(Exception): pass classSerializerRegistry(object): def__init__(self): self._encoders = {} self._decoders = {} self._default_encode = None self._default_content_type = None self._default_content_encoding = None defregister(self, name, encoder, decoder, content_type, content_encoding='utf-8'): if encoder: self._encoders[name] = codec( content_type, content_encoding, encoder, ) if decoder: self._decoders[content_type] = decoder def_set_default_serializer(self, name): try: (self._default_content_type, self._default_content_encoding, self._default_encode) = self._encoders[name] except KeyError: raise SerializerNotInstalled( 'No encoder installed for {0}'.format(name)) defdumps(self, data, serializer=None): if serializer and not self._encoders.get(serializer): raise SerializerNotInstalled( 'No encoder installed for {0}'.format(serializer)) if not serializer and isinstance(data, unicode): payload = data.encode('utf-8') return 'text/plain', 'utf-8', payload if serializer: content_type, content_encoding, encoder = \ self._encoders[serializer] else: encoder = self._default_encode content_type = self._default_content_type content_encoding = self._default_content_encoding payload = encoder(data) return content_type, content_encoding, payload defloads(self, data, content_type, content_encoding): content_type = (content_type if content_type else 'application/data') content_encoding = (content_encoding or 'utf-8').lower() if data: decode = self._decoders.get(content_type) if decode: return decode(data) return data registry = SerializerRegistry() dumps = registry.dumps loads = registry.loads register = registry.register </code></pre> <p>其实kombu还实现了unregister限于篇幅我就不展开了。现在我们想添加yaml的支持,只需要加这样一个函数:</p> <pre> <code class="language-python">defregister_yaml(): try: import yaml registry.register('yaml', yaml.safe_dump, yaml.safe_load, content_type='application/x-yaml', content_encoding='utf-8') except ImportError: defnot_available(*args, **kwargs): """Raise SerializerNotInstalled. Used in case a client receives a yaml message, but yaml isn't installed. """ raise SerializerNotInstalled( 'No decoder installed for YAML. Install the PyYAML library') registry.register('yaml', None, not_available, 'application/x-yaml') register_yaml() </code></pre> <p>这样就支持yaml了。如果希望默认使用yaml来序列化,可以执行:</p> <pre> <code class="language-python">registry._set_default_serializer('yaml') </code></pre> <p>是不是非常好扩展,如果哪天我希望去掉对pickle(安全问题),就可以直接注释对应的函数就好了。写个小例子试验下:</p> <pre> <code class="language-python">yaml_data = """\ float: 3.1415926500000002 int: 10 list: [george, jerry, elaine, cosmo] string: The quick brown fox jumps over the lazy dog unicode: "Th\\xE9 quick brown fox jumps over th\\xE9 lazy dog" """ content_type, content_encoding, payload = dumps(yaml_data, serializer='yaml') print content_type, content_encoding assert loads(payload, content_type=content_type, content_encoding=content_encoding) == yaml_data </code></pre> <p>运行的结果就是:</p> <pre> <code class="language-python">❯ python kombu_example.py application/x-yaml utf-8 </code></pre> <h3>entrypoint</h3> <p>在我的书里面介绍过如果使用标准库自带的pkg_resources.iter_entry_points实现一个简单的插件系统。这在kombu上面也有应用,在序列化实现模块的最后加了这么几句:</p> <pre> <code class="language-python">from pkg_resources import iter_entry_points for ep in iter_entry_points('kombu.serializers'): args = ep.load() register(ep.name, *args) </code></pre> <p>这是什么东西呢?pkg_resources是一个用于包发现和资源访问的模块,我们可以实现不同的kombu扩展,如果在这个扩展项目的setup.py里面设置对应的entry_points,在安装之后,运行上述代码的时候就会自动找到这些扩展,并注册进来。这就是一个扩展系统。Flake8就是最好的这个扩展玩法的范例。</p> <p>kombu的扩展不多,我选择 kombu-fernet-serializers 来进行介绍。首先看一下它的setup.py文件:</p> <pre> <code class="language-python">... entry_points={ 'kombu.serializers': [ 'fernet_json = kombu_fernet.serializers.json:register_args', 'fernet_yaml = kombu_fernet.serializers.yaml:register_args', 'fernet_pickle = kombu_fernet.serializers.pickle:register_args', 'fernet_msgpack = kombu_fernet.serializers.msgpack:register_args', ] } ... </code></pre> <p>注意到了吧,这个entry点就是kombu.serializers,安装之后就多了4个序列化方案,我们看一下fernet_json的实现:</p> <pre> <code class="language-python">import anyjson as _json from . import fernet_encode, fernet_decode MIMETYPE = 'application/x-fernet-json' register_args = ( fernet_encode(_json.dumps), fernet_decode(_json.loads), MIMETYPE, 'utf-8', ) </code></pre> <p>而fernet_yaml也被放进了模块的方式,其实和在函数内殊途同归:</p> <pre> <code class="language-python">rom kombu.exceptions import SerializerNotInstalled from . import fernet_encode, fernet_decode try: import yaml except ImportError: defnot_available(*args, **kwargs): """In case a client receives a yaml message, but yaml isn't installed.""" raise SerializerNotInstalled( 'No decoder installed for YAML. Install the PyYAML library') yaml_encoder = not_available yaml_decoder = None else: yaml_encoder = yaml.safe_dump yaml_decoder = yaml.safe_load MIMETYPE = 'application/x-fernet-yaml' register_args = ( fernet_encode(yaml_encoder), fernet_decode(yaml_decoder) if yaml_decoder else None, MIMETYPE, 'utf-8', ) </code></pre> <p>事实上,我们并不需要了解fernet_encode和fernet_decode是如何对消息做对称加密的,只是感受下这样添加扩展的方式是不是很优雅呢?</p> <p> </p> <p>来自:http://www.dongwm.com/archives/优秀开源项目kombu源码分析之registry/</p> <p> </p>