Flink 原理与实现:架构和拓扑概览

iohd0529 8年前
   <p>要了解一个系统,一般都是从架构开始。我们关心的问题是:系统部署成功后各个节点都启动了哪些服务,各个服务之间又是怎么交互和协调的。下方是 Flink 集群启动后架构图。</p>    <p><img src="https://simg.open-open.com/show/d02573c08b8ab595e476f66fd36176dd.png"></p>    <p>当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。</p>    <ul>     <li><strong>Client</strong> 为提交 Job 的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交 Job 后,Client 可以结束进程(Streaming的任务),也可以不结束并等待结果返回。</li>     <li><strong>JobManager</strong> 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。</li>     <li><strong>TaskManager</strong> 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。</li>    </ul>    <p>可以看到 Flink 的任务调度是多线程模型,并且不同Job/Task混合在一个 TaskManager 进程中。虽然这种方式可以有效提高 CPU 利用率,但是个人不太喜欢这种设计,因为不仅缺乏资源隔离机制,同时也不方便调试。类似 Storm 的进程模型,一个JVM 中只跑该 Job 的 Tasks 实际应用中更为合理。</p>    <h2>Job 例子</h2>    <p>本文所示例子为 flink-1.0.x 版本</p>    <p>我们使用 Flink 自带的 examples 包中的 SocketTextStreamWordCount ,这是一个从 socket 流中统计单词出现次数的例子。</p>    <ul>     <li> <p>首先,使用 <strong>netcat</strong> 启动本地服务器:</p> <pre>  <code class="language-java">$ nc -l 9000  </code></pre> </li>    </ul>    <ul>     <li> <p>然后提交 Flink 程序</p> <pre>  <code class="language-java">$ bin/flink run examples/streaming/SocketTextStreamWordCount.jar \    --hostname 10.218.130.9 \    --port 9000  </code></pre> </li>    </ul>    <p>在netcat端输入单词并监控 taskmanager 的输出可以看到单词统计的结果。</p>    <p>SocketTextStreamWordCount 的具体代码如下:</p>    <pre>  <code class="language-java">public static void main(String[] args) throws Exception {    // 检查输入    final ParameterTool params = ParameterTool.fromArgs(args);    ...      // set up the execution environment    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();      // get input data    DataStream<String> text =        env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0);      DataStream<Tuple2<String, Integer>> counts =        // split up the lines in pairs (2-tuples) containing: (word,1)        text.flatMap(new Tokenizer())            // group by the tuple field "0" and sum up tuple field "1"            .keyBy(0)            .sum(1);    counts.print();        // execute program    env.execute("WordCount from SocketTextStream Example");  }  </code></pre>    <p>我们将最后一行代码 env.execute 替换成 System.out.println(env.getExecutionPlan()); 并在本地运行该代码(并发度设为2),可以得到该拓扑的逻辑执行计划图的 JSON 串,将该 JSON 串粘贴到 <a href="/misc/goto?guid=4959672255688468263" rel="nofollow,noindex">http://flink.apache.org/visualizer/</a> 中,能可视化该执行图。</p>    <p><img src="https://simg.open-open.com/show/acf42f8508bfc12031e878d964946f50.png"></p>    <p>但这并不是最终在 Flink 中运行的执行图,只是一个表示拓扑节点关系的计划图,在 Flink 中对应了 SteramGraph。另外,提交拓扑后(并发度设为2)还能在 UI 中看到另一张执行计划图,如下所示,该图对应了 Flink 中的 JobGraph。</p>    <p><img src="https://simg.open-open.com/show/0baa50a5279da2fa23bfc0c6a44d3198.png"></p>    <h2>Graph</h2>    <p>看起来有点乱,怎么有这么多不一样的图。实际上,还有更多的图。Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。</p>    <ul>     <li><strong>StreamGraph:</strong> 是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。</li>     <li><strong>JobGraph:</strong> StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。</li>     <li><strong>ExecutionGraph:</strong> JobManager 根据 JobGraph 生成的分布式执行图,是调度层最核心的数据结构。</li>     <li><strong>物理执行图:</strong> JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。</li>    </ul>    <p>例如上文中的 2个并发度的 SocketTextStreamWordCount 四层执行图的演变过程如下图所示(点击查看大图):</p>    <p><img src="https://simg.open-open.com/show/84b55b01e1dc812ae8d9f83776a10f69.png"></p>    <p>这里对一些名词进行简单的解释。</p>    <ul>     <li><strong>StreamGraph:</strong> 根据用户通过 Stream API 编写的代码生成的最初的图。      <ul>       <li>StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。</li>       <li>StreamEdge:表示连接两个StreamNode的边。</li>      </ul> </li>     <li><strong>JobGraph:</strong> StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。      <ul>       <li>JobVertex:经过优化后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。</li>       <li>IntermediateDataSet:表示JobVertex的输出,即经过operator处理产生的数据集。producer是JobVertex,consumer是JobEdge。</li>       <li>JobEdge:代表了job graph中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。</li>      </ul> </li>     <li><strong>ExecutionGraph:</strong> JobManager 根据 JobGraph 生成的分布式执行图,是调度层最核心的数据结构。      <ul>       <li>ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有和并发度一样多的 ExecutionVertex。</li>       <li>ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。</li>       <li>IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。每一个IntermediateResult有与下游ExecutionJobVertex相同并发数的IntermediateResultPartition。</li>       <li>IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是ExecutionVertex,consumer是若干个ExecutionEdge。</li>       <li>ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一个。</li>       <li>Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者。</li>      </ul> </li>     <li><strong>物理执行图:</strong> JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。      <ul>       <li>Task:Execution被调度后在分配的 TaskManager 中启动对应的 Task。Task 包裹了具有用户执行逻辑的 operator。</li>       <li>ResultPartition:代表由一个Task的生成的数据,和ExecutionGraph中的IntermediateResultPartition一一对应。</li>       <li>ResultSubpartition:是ResultPartition的一个子分区。每个ResultPartition包含多个ResultSubpartition,其数目要由下游消费 Task 数和 DistributionPattern 来决定。</li>       <li>InputGate:代表Task的输入封装,和JobGraph中JobEdge一一对应。每个InputGate消费了一个或多个的ResultPartition。</li>       <li>InputChannel:每个InputGate会包含一个以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一对应,也和ResultSubpartition一对一地相连,即一个InputChannel接收一个ResultSubpartition的输出。</li>      </ul> </li>    </ul>    <p>后续的文章,将会详细介绍 Flink 是如何生成这些执行图的。主要有以下内容:</p>    <ol>     <li>如何生成 StreamGraph</li>     <li>如何生成 JobGraph</li>     <li>如何生成 ExecutionGraph</li>     <li>如何进行调度(如何生成物理执行图)</li>    </ol>    <p>来自: <a href="/misc/goto?guid=4959672255773423020" rel="nofollow">http://wuchong.me/blog/2016/05/03/flink-internals-overview/</a> </p>