Pyleus 介绍:使用纯 Python 的构建 Storm 拓扑的开源框架
大声的宣誓,我们喜欢python,现在使用python的做web开发的人有相当大的比例,在大数据的行业中,python也是相当热门。
Pylenus 是一个新的开源框架,这个框架的目标是完成一些和其他框架一样的
Storm (事件处理器)
例如Hadoop:让开发者全部使用python完成快速的开发迭代,使用大量的时间去解决商业之间的联系问题,使用少量的时间去构建基础的平台。
首先一个简短的介绍在Storm来自这个网站,Apache Storm 是一个免费得开源的分布式的即时的开源计算系统,Storm可以很简单的可靠的无限的处理数据流在大量的数据,还可以进行数据实时的批处理就像hadoop一样。
一个Pyleus拓扑包括,最少情况下,一个YAML文件描述的拓扑结构,声明每个组件以及它们之间的顺序。pyleus的命令行工具包含一个内建的Storm JAR,这个JAR可以提交任何Storm簇。
当涉及到海量数据处理演示时,“字数统计”是最典型的。由于Storm是用来操作“无限量数据流”,我们不能计算每个单次的总数,因为输入流可以无限地继续下去。相反,我们的拓扑结构将记录每个次的递增数,来记录每一次我们看到它的值。
所以该如何使用Pyleus建立一个单词计数的Storm拓扑呢?你所需要的是一个pyleus_topology.yaml和一些Python组件。
作为简单的演示,你需要知道Storm的三个核心概念:
-
tuple是Storm拓扑中的数据单元,流入和流出Storm组件。
-
Spouts是将tuple导入拓扑的组件. 通常,spout消耗来自外部源的数据,如Kafka或Kinesis,然后将记录标记元组。
-
Bolts订阅一个或多个其他spouts和bolts的输出流,做一些处理,然后标记为自己的元组。
这种拓扑有三个组成部分:一个spout发出随机行的“lorem ipsum”文本,一个bolt将行拆分为单词,bolt完成计数并记录的相同的单词出现的次数。
pyleus_topology.yaml word_count/ __init__.py line_spout.py split_words.py count_words.py
下面是pyleus_topology.yaml的内容:
name: word_count topology: - spout: name: line-spout module: word_count.line_spout - bolt: name: split-words module: word_count.split_words groupings: - shuffle_grouping: line-spout - bolt: name: count-words module: word_count.count_words groupings: - fields_grouping: component: split-words fields: - word
spout的配置是自解释的,但bolts必须注明它所订阅的元组流。拆词的bolt绑定到shuffle_grouping——这意味着从line-spout发出的tuples应为split-words所有实例情况下的均匀和随机分布,比如可以为一,五,或五十。
count-words,还是使用fields_grouping在 ‘word(词)’ 域。这迫使所有元组发出split-words与‘word(词)’一致的count-words实例。 这允许代码word_count.count_words去做一个假设,它能“see(看)”到所有在同样的处理中的一样的单词。
word_count/line_spout.py:
import random from pyleus.storm import Spout LINES = """ Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur pharetra ante eget nunc blandit vestibulum. Curabitur tempus mi ... vitae cursus leo, a congue justo. """.strip().split('\n') class LineSpout(Spout): OUTPUT_FIELDS = ["line"] def next_tuple(self): line = random.choice(LINES) tup = (line,) self.emit(tup) if __name__ == '__main__': LineSpout().run()
word_count/count_words.py:
from collections import defaultdict from pyleus.storm import SimpleBolt class CountWordsBolt(SimpleBolt): def initialize(self): self.words = defaultdict(int) def process_tuple(self, tup): word, = tup.values self.words[word] += 1 msg = "'{0}' has been seen {1} times\n".format(word, self.words[word]) with open("/tmp/word_counts.txt", 'a') as f: f.write(msg) if __name__ == '__main__': CountWordsBolt().run()
word_count/split_words.py 留给读者作为练习。 (或者, 你可以在GitHub上查看所有的例子 full example)
现在,在pyleus目录下运行将会产生一个文件word_count.jar。你可以使用pyleus submit来提交,或者你也可以使用pyleus local来在本地运行测试。
对于Pyleus topology(拓扑)来说代码是非常简单的,但是有一个非常有意思的特征值得我们关注的是它自己内部对virtualenv(https://virtualenv.readthedocs.org/)的集成。简单的包含一个requirements.txt文件和你自己的pyleus_topology.yaml,pyleus build将会产生一个virtualenv使得你的代码在这个JAR包中使用以及嵌入。你甚至可以重用Pyleus相关的组建包,并且在pyleus_topology.yaml里面直接关联他们接可以了!
在Yelp的团队已经开发出一种Pyleus端口为了从一个内部资源源消费数据,并且为其建立了一个Python包。现在,公司内其他人可以添加一行代码到他们的 requirements.txt 和在他们的 pyleus_topology.yaml 中使用端口就无需编写一行代码了。
Pyleus现在是beta版软件,但feedback和pull requests都很愉快地接受了。
开始使用Pyleus,用 pip install pyleus 安装它,然后查看源码在GitHub的上更多示例。