Pyleus 介绍:使用纯 Python 的构建 Storm 拓扑的开源框架

jopen 10年前

大声的宣誓,我们喜欢python,现在使用python的做web开发的人有相当大的比例,在大数据的行业中,python也是相当热门。

Pylenus 是一个新的开源框架,这个框架的目标是完成一些和其他框架一样的

Storm (事件处理器)

例如Hadoop:让开发者全部使用python完成快速的开发迭代,使用大量的时间去解决商业之间的联系问题,使用少量的时间去构建基础的平台。

首先一个简短的介绍在Storm来自这个网站,Apache Storm 是一个免费得开源的分布式的即时的开源计算系统,Storm可以很简单的可靠的无限的处理数据流在大量的数据,还可以进行数据实时的批处理就像hadoop一样。

一个Pyleus拓扑包括,最少情况下,一个YAML文件描述的拓扑结构,声明每个组件以及它们之间的顺序。pyleus的命令行工具包含一个内建的Storm JAR,这个JAR可以提交任何Storm簇。

当涉及到海量数据处理演示时,“字数统计”是最典型的。由于Storm是用来操作“无限量数据流”,我们不能计算每个单次的总数,因为输入流可以无限地继续下去。相反,我们的拓扑结构将记录每个次的递增数,来记录每一次我们看到它的值。

所以该如何使用Pyleus建立一个单词计数的Storm拓扑呢?你所需要的是一个pyleus_topology.yaml和一些Python组件。

作为简单的演示,你需要知道Storm的三个核心概念:

  • tuple是Storm拓扑中的数据单元,流入和流出Storm组件。

  • Spouts是将tuple导入拓扑的组件. 通常,spout消耗来自外部源的数据,如KafkaKinesis,然后将记录标记元组。

  • Bolts订阅一个或多个其他spoutsbolts的输出流,做一些处理,然后标记为自己的元组。

这种拓扑有三个组成部分:一个spout发出随机行的“lorem ipsum”文本,一个bolt将行拆分为单词,bolt完成计数并记录的相同的单词出现的次数。

pyleus_topology.yaml  word_count/      __init__.py      line_spout.py      split_words.py      count_words.py

下面是pyleus_topology.yaml的内容:

name: word_count    topology:        - spout:          name: line-spout          module: word_count.line_spout        - bolt:          name: split-words          module: word_count.split_words          groupings:              - shuffle_grouping: line-spout        - bolt:          name: count-words          module: word_count.count_words          groupings:              - fields_grouping:                  component: split-words                  fields:                      - word

spout的配置是自解释的,但bolts必须注明它所订阅的元组流。拆词的bolt绑定到shuffle_grouping——这意味着从line-spout发出的tuples应为split-words所有实例情况下的均匀和随机分布,比如可以为一,五,或五十。

count-words,还是使用fields_grouping在 ‘word(词)’ 域。这迫使所有元组发出split-words与‘word(词)’一致的count-words实例。 这允许代码word_count.count_words去做一个假设,它能“see(看)”到所有在同样的处理中的一样的单词。 

word_count/line_spout.py:

import random    from pyleus.storm import Spout    LINES = """  Lorem ipsum dolor sit amet, consectetur  adipiscing elit. Curabitur pharetra ante eget  nunc blandit vestibulum. Curabitur tempus mi  ...  vitae cursus leo, a congue justo.  """.strip().split('\n')      class LineSpout(Spout):        OUTPUT_FIELDS = ["line"]        def next_tuple(self):          line = random.choice(LINES)          tup = (line,)          self.emit(tup)      if __name__ == '__main__':      LineSpout().run()

word_count/count_words.py:

from collections import defaultdict    from pyleus.storm import SimpleBolt      class CountWordsBolt(SimpleBolt):        def initialize(self):          self.words = defaultdict(int)        def process_tuple(self, tup):          word, = tup.values            self.words[word] += 1            msg = "'{0}' has been seen {1} times\n".format(word, self.words[word])          with open("/tmp/word_counts.txt", 'a') as f:              f.write(msg)      if __name__ == '__main__':      CountWordsBolt().run()

word_count/split_words.py 留给读者作为练习。 (或者, 你可以在GitHub上查看所有的例子 full example)

现在,在pyleus目录下运行将会产生一个文件word_count.jar。你可以使用pyleus submit来提交,或者你也可以使用pyleus local来在本地运行测试。

对于Pyleus topology(拓扑)来说代码是非常简单的,但是有一个非常有意思的特征值得我们关注的是它自己内部对virtualenv(https://virtualenv.readthedocs.org/)的集成。简单的包含一个requirements.txt文件和你自己的pyleus_topology.yaml,pyleus build将会产生一个virtualenv使得你的代码在这个JAR包中使用以及嵌入。你甚至可以重用Pyleus相关的组建包,并且在pyleus_topology.yaml里面直接关联他们接可以了!

在Yelp的团队已经开发出一种Pyleus端口为了从一个内部资源源消费数据,并且为其建立了一个Python包。现在,公司内其他人可以添加一行代码到他们的 requirements.txt 和在他们的  pyleus_topology.yaml 中使用端口就无需编写一行代码了。

Pyleus现在是beta版软件,但feedbackpull requests都很愉快地接受了。

开始使用Pyleus,用 pip install pyleus 安装它,然后查看源码在GitHub的上更多示例。