非死book官方详解:使用Apache Spark进行大型语言模型训练
v9z1n9e0vl
8年前
<p>Apache Spark 是用于大规模数据处理的快速和通用引擎,它运行在 Hadoop,Mesos,可以离线或云端运行,具有高速、可扩展等特点。近年来,在 IBM 等大公司和众多社区贡献者的推动下,Spark 得到了越来越多的应用。今天,非死book 团队也展示了他们使用 Apache Spark 进行大型语言模型训练的方法。</p> <p>如何处理大规模数据是 非死book 基础设施团队面临的核心问题。随着软件技术的发展,我们面临着越来越高的硬件需求,为了满足需要,我们必须在开源架构上设计并构建新的系统。</p> <p>考虑到我们的需求,我们决定使用 Apache Spark,一个快速发展的开源数据处理平台,它可以自由扩展,支持用户自定义应用。</p> <p>几个月前,我们分享了一个支持 Spark 声明(SQL)的的例子。在本文中,我们将简要介绍如何使用 Spark 重新设计一个大型、复杂(100 余级)的管道,而这个管道最初是使用 HQL 在 Hive 上编写的。在此之中,我们会介绍如何控制数据分布,避免数据偏移,并实现对特定应用程序的优化,以构建高性能及可靠的数据管道。与原来的 HQL 查询集相比,这种新的基于 Spark 的管道是模块化的,高度可读且易于维护的。除了质量提升之外,我们还观察到它的资源使用和数据登录时间也有减少。</p> <p><strong>使用案例:N-gram 语言模型训练</strong></p> <p>自然语言处理是涉及计算机和人类语言之间相互作用的人工智能领域。计算机可以对语言进行建模,此类模型可用于检测和纠正拼写错误。N-gram 语言模型是其中使用最广泛的语言建模方法。N-gram 通常以 N-x 方式呈现,其中前 N-1 个字作为历史,基于 N-1 的历史来预测下一个字。例如,「你能来这里吗(Can you please come here)」包含 5 个单词,是一个 5-gram。它的历史是「你能来吗(Can you please come)」基于这个历史,N-gram 语言模型可以计算出单词「这里。(here.)」的条件概率。</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/9e215bd57843244b1a14c478f3cb6bae.jpg"></p> <p>大规模、高阶的 N-gram 语言模型(例如 N = 5)已经被证明在许多应用中非常有效,例如自动语音识别和机器翻译。在 非死book 中,它被用于为上传到时间线的视频自动生成字幕,探测可能低质量的地址标签(如「家,温暖的家」,「Apt#00,Fake lane,Foo City」)。</p> <p>用大数据集训练的语言模型与用较小数据集训练的语言模型相比,前者通常具有更高的准确性。覆盖罕见单词(或 N-gram)充分实例的可能性会随着数据集体量的增大而增加。对于具有较大数据集的训练任务,分布式计算框架(如 MapReduce)通常具有更好的可扩展性,可进行并行化模型训练。</p> <p><strong>早期解决方案</strong></p> <p>我们最初开发了一个基于 Hive 的解决方案来生成 N-gram 语言模型。N-gram 计数由最后两个字的历史记录分割,使用基于 C ++的 TRANSFORM 来判断局部语言模型,并将它们保存在 Hive 中。单独的子模型建立在不同的数据源上,每个都由 Hive 查询触发。随后,每个子模型被插值算法计算权重,最后所有子模型被组合输出。以下是管道的概述:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/5e20edbc7419638c91dfb0c7b83be0a6.jpg"></p> <p>基于 Hive 的解决方案在构建语言模型中获得了一定程度的成功:当使用几百万 N-gram 训练时,我们能用它轻松地构建 5-gram 语言模型。然而一旦我们试图增加训练数据集的大小,运行管道的端到端时间就会达到不可接受的程度。</p> <p>Hive 提供了一个基于 SQL 的引擎,可以轻松地编写查询,这些查询会自动转换为 MapReduce 作业。对于训练语言模型而言,将计算表示为 SQL 查询是不自然的,原因如下:</p> <ul> <li> <p>管道代码,包括每个子模型训练的几个 SQL 查询。这些查询大部分是相似的,只有细微的差别。为模型训练而编写新的管道会导致这些 SQL 查询重复。</p> </li> <li> <p>当越来越多的子句被添加到查询中时,系统会越来越难以理解查询的意图。</p> </li> <li> <p>更改查询的一部分需要重新运行整个管道,以确保不会导致回归。无法测试隔离变化使得开发周期变长。</p> </li> </ul> <p>作为替代方法,编写 Hadoop 作业在表达计算方面为开发人员提供了更多的自由,但这也需要更多的时间,需要我们具有 Hadoop 的专业知识。</p> <p><strong>基于 Spark 的解决方案</strong></p> <p>Spark 自带特定领域语言(DSL),使得编写自定义应用程序比 SQL 查询作业更加容易。通过 DSL,你可以控制较低级别的操作(例如,当数据被洗牌时),并且可以访问中间数据。这有助于实现复杂的算法,达到更高的效率和稳定性。它还允许用户能以模块化的方式编写管道,而不是使用一个单一的 SQL 字符串,这提高了管道的可读性,可维护性和可测试性。所有这些优势吸引我们引入了 Spark。</p> <p>在 Scala 或 Java 中重现 C ++的逻辑——语言模型训练算法的实现——会是巨量的工作,因此我们决定不更改该部分。和 Hive 一样,Spark 支持运行自定义用户代码,这使得调用相同的 C ++二进制文件变得容易。它允许开发者平滑过渡,因此我们不必同时维护两个版本的 C ++逻辑,而且迁移对用户是透明的。我们使用 Spark 提供的 RDD 接口,没有使用 Spark SQL,因为前者可以控制中间数据的分区并直接管理分片生成。Spark 的 pipe()运算符用于调用二进制文件。</p> <p>在更高层上,管道的设计保持不变。我们继续使用 Hive 表作为应用程序的初始输入和最终输出。中间输出被写入集群节点上的本地硬盘中。整个应用程序大约有 1,000 行的 Scala 代码,并且可以在 Spark 上执行时生成 100 多个阶段(这取决于训练数据源的数量)。</p> <p><strong>可扩展性挑战</strong></p> <p>当我们使用更大的训练数据集来测试 Spark 方案时,我们遇到了可扩展性的挑战。在本节中,我们首先介绍数据分布要求(平滑和分割),然后是它带来的挑战和我们的解决方案。</p> <p><strong>平滑</strong></p> <p>N-gram 模型是根据训练数据中的 N-gram 出现计数来估算的。由于在训练数据中有可能缺少 N-gram,这种方式可能很难推广到未见的数据中。为了解决这个问题,我们使用了许多平滑方法以减少观察到的 N-gram 计数以提升未见的 N-gram 概率,并使用较低阶模型来让较高阶模型平滑。由于平滑,对于具有历史 h 的 N-gram,需要具有相同历史的所有 N-gram 计数和具有作为 h 的后缀的历史的所有较低级 N-gram 来估算其概率。例如,对于三元组「how are you,」,其中「how are」是历史,「you」是要预测的词,为了估计 P(you|how are),我们需要「how are*」,「are*」和所有 unigram(单字 N-gram)的计数,其中*是表示词汇表中任何单词的通配符。经常会出现 N-gram(例如,「how are*」)导致处理时的数据发生偏移。</p> <p>分片</p> <p>通过分布式计算框架,我们可以将 N-gram 计数分割成多片,以便由多个并行机器进行处理。基于 N-gram 历史的最后 k 个单词的分片方式可以保证比 k 更长的 N-gram 在所有片段之间被平衡。这需要在所有分片上共享所有长度为 k 的 N-gram 计数。我们把所有这些短 N-gram 放在一个叫做「0-shard」的特殊分片中。例如,如果 k 是 2,那么从训练数据中提取的所有单字母和双字母会被组合在同一个分片(0- shard)中,并且所有进行模型训练的服务器都可以访问。</p> <p><strong>问题:数据扭曲(Data skew)</strong></p> <p>在基于 Hive 的管道中,我们使用两个单词的历史分片(two word history sharding) 方式进行模型训练。两词历史分片意味着,共享相同集合的最高有效两词历史(最靠近正被预测的词)的所有 N-gram 计数会被分布到同一节点用于处理。与单字历史相比,两字分片通常具有更平衡的数据分布,除了所有节点必须共享存储在 0-shard 中的平滑算法所需的单字和双字统计。下图说明了具有单字和两字历史的分片分布之间的比较。</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/0ce75d17875a2cd085734e2f3270d626.jpg"></p> <p>对于大型数据集而言,两字历史分割会生成巨大的 0-shard。必须向所有节点散布 0-shard 以缩短总计算时间。同时,这种情况还存在潜在的不稳定性,因为很难预测它的内存需求,一旦启动作业,它可能在运行中耗尽内存。虽然我们可以提前分配更多内存,但仍然不能保证 100%的稳定性,而且这会导致集群内存利用率降低,因为并不是所有实例都需要比历史均值更多的内存。</p> <p>当我们尝试使用 Spark 后,作业可以在低负载状况下运行。但是对于更大的数据集,我们观察到了以下几个问题:</p> <ul> <li> <p>由于执行器长时间没有接收到 heartbeat,驱动程序将执行器标记为「lost」</p> </li> <li> <p>执行器 OOM</p> </li> <li> <p>频繁的执行器 GC</p> </li> <li> <p>随机服务 OOM</p> </li> <li> <p>Spark 的 block 存在 2GB 的限制</p> </li> </ul> <p>所有这些问题的根本原因可以归结于数据扭曲。我们想要实现分片的均衡分布,但是两词的历史分片和单词历史分片都不能带来均衡。因此,我们提出了一种混合方法:渐进式分片和动态调整分片大小。</p> <p><strong>解决方案:渐进式分片(Progressive sharding)</strong></p> <p>渐进式分片用迭代的方法来解决数据扭曲(skew)问题。在第一次迭代时,我们首先进行单个字的分片,在这一步的分片中只需要对所有分片(shard)的一元语言模型计数进行分割。一元语言模型计数远少于二元语言模型计数。通常情况下这种处理是可以完成预期作用的,但不包含分片极其大的情况。例如,对应于「how to ...」的分片将会被扭曲。为了解决这个问题,我们核查每个分片的尺寸然后仅处理小于某一阈值的分片。</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/23f2c09a67e6c946bb4e86ba17cd7350.jpg"></p> <p>在第二次迭代时,我们使用二个字的分割,即根据二个字的历史来完成对 N-gram 的分布。在这个阶段,我们只需要向二元语言模型(不包括已在第一次迭代中处理的二元语言模型)共享 N-gram 的计数。这些二元语言模型计数的数目远少于整个的二元语言模型计数数目,因此处理起来也更快。正如上面所说,我们依然核查每个分片的尺寸然后仅处理小于某一阈值的分片。所剩下的分片将会在下一次的迭代中通过三个字历史来处理。在大多数情况下,三次迭代已足以满足非常大数据集的需要。</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/f0cc04396d2098d4798112be2004d24b.jpg"></p> <p><strong>动态调整分片尺寸</strong></p> <p>在第一次迭代里,我们用了一个足够大的预设数,从而使得大部分的生成分片尺寸很小。每一个分片是由单个 Spark 所完成。在这次迭代中 0-shard 的分片是非常小的尺寸,有很多小分片并不会影响处理效率。在后面的迭代中,分片的数目将由 N—gram 的未处理部分所自动产生。</p> <p>这些方案能够成功实现归功于 Spark DSL 的灵活性。通过 Hive,开发者们不需要支配这些低级别的运算。</p> <p><strong>针对训练模型的通用库</strong></p> <p>根据不同应用环境,怎样使用语言模型呢?每一种应用可能需要不同的数据和配置,因此不同的管道也应运而生。在 Hive 解决方案中,管道的 SQL 部分应用之间是相似的,但在几个地方有不同的单元。相较于重复每一个管道的代码,我们开发了一种可以调用不同管道和不同数据源及配置的普适的 Spark 应用。</p> <p>基于 Spark 解决问题时,我们也可以自动地在输入配置的基础上,优化应用程序运行的工作流程中的步骤。比如说,如果用户没有明确指出使用熵修剪算法,那么应用程序将会跳过模型重新评估。如果用户在配置中明确指定了计数截止,那么应用程序将会瓦解许多低计数的 N-grams 并以通配符占位符来减少存储。这些优化组合节省了计算资源,同时可以在更短的时间内产生训练好的模型。</p> <p><strong>Spark 管道与 Hive 管道性能的比较</strong></p> <p>我们利用以下性能指标比较 Spark 管道与 Hive 管道:</p> <ul> <li> <p>CPU 时间:这是从操作系统的角度衡量 CPU 的使用。比如,如果你在 32 核机上使用所有 CPU 的 50%,以每 10 秒处理一个进程,那么你的 CPU 时间将是 32*0.5*160CPU 秒。</p> </li> <li> <p>CPU 保留时间:这是从资源管理框架的角度衡量 CPU 的保留。举个例子,如果我们在 32 核机上保留 10 秒来运行一个任务,那么 CPU 的保存时间是 32*10=320CPU 秒。CPU 时间与 CPU 保留时间的比例反映出我们是如何实现在集群上保留 CPU 资源的。当精确度达到要求时,相较于 CPU 时间,在运行相同的工作负载的条件下,保存时间可以作为一个更好的测量尺度去比较引擎的执行。比如,如果一个运程需要 1CPU 秒去运行但是必须保存 100CPU 秒,那么在完成同样的工作量时,按照这种度量标准,它就比一个需要 10CPU 秒运行且保存 10CPU 秒的运程效率低。</p> </li> <li> <p>延迟/屏蔽时间:从结束到结束的工作时间</p> </li> </ul> <p>以下图表总结了 Spark 和 Hive 工作的性能比较结果。注意 Spark 的管道不是 Hive 管道 1:1 的转化。它有许多有助于实现更好的可测量性和执行的定制和优化。</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/57824e35a9a58e02ac7b842fcf4ccc13.jpg"></p> <p>基于 Spark 的管道可以不费力地多次处理输入数据,甚至输入数据量高于 Hive 的巅峰处理量。例如,我们训练一个较大的语言模型,它可以在几小时内生成一个包含 192 亿 N-grams 的语言模型。能够用更多的数据并更快地运行试验训练的能力可以促使产生更高质量的模型。正如我们在我们自己的试验中观察到的,大规模语言模型通常会在相关的应用中得到更好的结果。</p> <p><strong>总结</strong></p> <p>Spark 的灵活性可以从以下方面为我们提供帮助:</p> <ul> <li> <p>用模块化的方式表达应用逻辑,相较于整体的 SQL 字符串,拥有更强的可读性和可持续性。</p> </li> <li> <p>在计算的任何阶段都可以对数据实现自定义处理(例如,分区,重洗)</p> </li> <li> <p>高性能的计算机引擎可以节省计算资源和试验时间</p> </li> <li> <p>拥有输入更大规模数据的扩展能力可以训练出高质量的语言模型</p> </li> <li> <p>建立一个通用的应用,可以用于在不同的产品上生成语言模型。</p> </li> <li> <p>由于支持运行用户二进制文件(如 Hive's TRANSFORM)和与Hive数据交互的兼容性,我们可以从早期的解决方案实行改进。</p> </li> </ul> <p>非死book 对加入 Spark 开源社区表示兴奋,并将共同协作致力于开发出 Spark 的全部潜能。</p> <p> </p> <p>来自:http://www.jiqizhixin.com/article/2252</p> <p> </p>