译文增量NetflixMaestroApacheIceberg(数据工作流增量更改回填)「增量数据处理」

译文增量NetflixMaestroApacheIceberg(数据工作流增量更改回填)

增量处理是一种处理工作流中新数据或更改数据的方法。
其主要优势在于,它仅增量处理新添加或更新到数据集的数据,而不是重新处理整个数据集。
这不仅降低了计算资源的成本,而且还显著缩短了执行时间。
当工作流执行持续时间较短时,发生故障和人工干预的可能性就会降低。
它还通过简化现有管道和解锁新模式来提高工程效率。
在这篇博文中,我们讨论了 Netflix 的工作流现状和挑战。
我们将展示如何使用 Netflix Maestro 和 Apache Iceberg 构建干净高效的增量处理解决方案 (IPS)。
IPS 为用户提供具有数据准确性、数据新鲜度和回填的增量处理支持,并解决了工作流中的许多挑战。
IPS 使用户能够以最少的更改继续使用数据处理模式。
介绍Netflix 依靠数据来推动其业务的各个阶段。
无论是分析 A/B 测试、优化工作室制作、训练算法、投资内容获取、检测安全漏洞还是优化支付,结构良好且准确的数据都是基础。
随着我们的业务在全球范围内扩展,对数据的需求不断增长,对可扩展低延迟增量处理的需求开始出现。
数据集所有者通常面临三个常见问题。
数据新鲜度:需要快速准确地处理来自 Iceberg 表的大型数据集,以生成洞察,从而更快地做出产品决策。
数据平台工具集提供的每小时处理语义以及有效的时间戳水印或数据信号如今可以满足许多用例,但对于低延迟批处理来说并不是最好的。
在 IPS 之前,数据平台没有一个解决方案可以作为单一的易于使用的产品来跟踪数据集的状态和进展。
这导致了一些内部解决方案的出现,例如Psyberg。
这些内部库通过捕获更改的分区来处理数据,这仅适用于特定用例。
此外,这些库与用户业务逻辑紧密耦合,这通常会产生更高的迁移成本、维护成本,并且需要与数据平台团队进行大量协调。
数据准确性:延迟到达的数据会导致过去处理的数据集不完整,从而导致不准确。
为了弥补这一点,ETL 工作流通常使用回顾窗口,并在此基础上重新处理特定时间窗口中的数据。
例如,一项作业将重新处理过去 3 天的汇总,因为它假设会有延迟到达的数据,但 3 天之前的数据不值得重新处理。
回填:回填数据集是大数据处理中的常见操作。
这需要在计划处理之前的历史时间段内重新填充数据。
回填的需要可能出于多种因素,例如 (1) 由于数据管道的业务逻辑发生变化而重新填充上游数据集,(2) 数据管道中的业务逻辑发生变化,(3) 创建了需要填充历史时间范围的新指标,(4) 发现历史数据缺失等。
目前,各个本地团队以不太理想且成本效率较低的方式来应对这些挑战,以满足需求,例如回顾:这是数据工程师用来解决数据准确性问题的一种通用且简单的方法。
用户配置工作流以读取窗口中的数据(例如过去 3 小时或 10 天)。
窗口是根据用户的领域知识设置的,以便用户有很高的信心相信迟到的数据将被包括在内或无关紧要(即数据到达得太晚而无用)。
它以时间和计算资源方面的高成本确保正确性。
Foreach 模式:用户使用Maestro foreach 支持构建回填工作流。
它非常适合回填单个工作流生成的数据。
如果管道有多个阶段或许多下游工作流,则用户必须为每个工作流手动创建回填工作流,这需要大量的手动工作。
本文介绍的增量处理解决方案 (IPS) 旨在解决上述问题。
设计目标是为增量处理提供一个简洁且易于采用的解决方案,以确保数据的新鲜度、数据的准确性,并提供轻松的回填支持。
数据新鲜度:通过状态跟踪功能,以微批方式(例如 15 分钟间隔)提供对工作流调度的支持数据准确性:提供处理所有延迟到达数据的支持,以实现业务所需的数据准确性,并在时间和成本效率方面显著提高性能回填:提供托管回填支持以构建、监控和验证回填,包括自动将更改从上游传播到下游工作流,从而大大提高工程效率(即,构建回填工作流需要几天或几周的工程工作,而托管回填只需单击一次即可完成)方法概述总体概念增量处理是一种批量处理数据的方法——但仅限于新数据或更改的数据。
为了支持增量处理,我们需要一种方法,不仅可以捕获增量数据更改,还可以跟踪其状态(即工作流是否处理了更改)。
它必须了解更改,并能够从源表中捕获更改,然后继续跟踪这些更改。
在这里,更改不仅仅意味着新数据本身。
例如,聚合目标表中的一行需要与聚合行关联的源表中的所有行。
此外,如果有多个源表,通常所有输入表中更改的数据范围的并集会给出完整的更改数据集。
因此,捕获的更改信息必须包括所有相关数据,包括源表中未更改的行。
由于前面提到的复杂性,更改跟踪不能简单地通过使用单个水印来实现。
IPS 必须以更细的粒度跟踪那些捕获的更改。
源表的更改可能会以各种方式影响目标表中的转换结果。
如果目标表中的一行源自源表中的一行,则新捕获的数据变化将成为工作流管道的完整输入数据集。
如果目标表中的一行来自源表中的多行,则捕获新数据只会告诉我们必须重新处理这些行。
但 ETL 所需的数据集超出了更改数据本身。
例如,基于帐户 ID 的聚合需要源表中有关帐户 ID 的所有行。
更改数据集将告诉我们哪些帐户 ID 已更改,然后用户业务逻辑需要加载在更改数据中找到的与这些帐户 ID 相关的所有数据。
如果目标表中的一行是基于更改的数据集之外的数据得出的,例如将源表与其他表联接,则新捕获的数据仍然有用,并且可以指示受影响的数据范围。
然后,工作流将根据该范围重新处理数据。
例如,假设我们有一个表,其中按天分区保存给定帐户的累计查看时间。
如果由于数据延迟而立即更新了 3 天前的查看时间,则必须重新计算该帐户接下来两天的查看时间。
在这种情况下,捕获的延迟到达数据将告诉我们重新计算的开始,这比通过猜测重新计算过去 X 天的所有内容要准确得多,其中 X 是由业务领域知识决定的截止回溯窗口。
一旦捕获了更改信息(数据或范围),工作流就必须以稍微复杂的方式将数据写入目标表,因为简单的INSERT OVERWRITE机制无法正常工作。
有两种替代方案:合并模式:在某些计算框架中,例如 Spark 3,它支持 MERGE INTO,以允许将新数据合并到现有数据集中。
这解决了增量处理的写入问题。
请注意,使用 MERGE INTO 时,可以安全地重新启动工作流/步骤,而不必担心插入重复数据。
追加模式:用户还可以使用仅追加写入(例如 INSERT INTO)将新数据添加到现有数据集。
处理完成后,追加数据将提交到表中。
如果用户想要重新运行或重建数据集,他们将运行回填工作流以完全覆盖目标数据集(例如 INSERT OVERWRITE)。
此外,IPS 在许多情况下自然会支持回填。
下游工作流(如果没有业务逻辑更改)将由回填导致的数据更改触发。
这使得回填数据可以在多阶段管道中自动传播。
请注意,本博客跳过了回填支持。
我们将在另一篇后续博客文章中讨论 IPS 回填支持。
Netflix MaestroMaestro是 Netflix 数据工作流编排平台,旨在满足 Netflix 当前和未来的需求。
它是一种通用工作流编排器,为 Netflix 的数据平台用户提供完全托管的工作流即服务 (WAAS)。
它为各种用例中的数千名用户提供服务,包括数据科学家、数据工程师、机器学习工程师、软件工程师、内容制作人和业务分析师。
Maestro 具有高度可扩展性和可扩展性,可支持现有和新用例,并为最终用户提供增强的可用性。
自上一篇关于Maestro的博客以来,我们已代表用户将所有工作流程迁移到 Maestro,且中断最少。
Maestro 已全面部署到生产中,1​00% 的工作负载都在其上运行。
IPS 以 Maestro 为基础,通过添加两个构建块(即新的触发机制和步骤作业类型)来扩展,从而实现所有工作流程的增量处理。
它无缝集成到整个 Maestro 生态系统中,且入门成本极低。
阿帕奇冰山Iceberg是一种用于大型分析表的高性能格式。
Iceberg 为大数据带来了 SQL 表的可靠性和简单性,同时使 Spark、Trino、Flink、Presto、Hive 和 Impala 等引擎能够同时安全地处理相同的表。
它支持富有表现力的 SQL、完整的架构演变、隐藏分区、数据压缩以及时间旅行和回滚。
在 IPS 中,我们利用 Apache Iceberg 提供的丰富功能来开发一种轻量级方法来捕获表更改。
增量变更捕获设计使用 Netflix Maestro 和 Apache Iceberg,我们创建了一种新颖的增量处理解决方案,它以超轻量级的方式提供增量变化(数据和范围)捕获,而无需复制任何数据。
在探索过程中,我们发现使用增量处理可以提高成本效率和工程生产率。
这是我们基于 Apache Iceberg 功能实现增量变更捕获的解决方案。
众所周知,冰山表包含带有一组元数据的快照列表。
快照包括对实际不可变数据文件的引用。
快照可以包含来自不同分区的数据文件。
上图显示,s0 包含 T1 时刻分区 P0 和 P1 的数据。
然后在 T2 时刻,新的快照 s1 被提交到表中,其中包含新数据文件列表,其中包括分区 P0 和 P1 的迟到数据以及 P2 的数据。
我们实施了一种轻量级方法来创建冰山表(称为 ICDC 表),该表具有自己的快照,但仅包含来自原始表的新数据文件引用,而无需复制数据文件。
它效率高,成本低。
然后工作流管道可以只加载 ICDC 表来处理分区 P0、P1、P2 中的更改数据,而无需重新处理 P0 和 P1 中未更改的数据。
同时,还会捕获指定数据字段的更改范围,因为冰山表元数据包含每个数据文件的每个数据字段的上限和下限信息。
此外,IPS 将跟踪每个工作流的数据文件粒度的变化。
这种轻量级方法与 Maestro 无缝集成,允许所有(数千名)调度程序用户在其数万个工作流程中使用这个新的构建块(即增量处理)。
每个使用 IPS 的工作流程都将注入一个表参数,即轻量级 ICDC 表的表名。
ICDC 表仅包含更改数据。
此外,如果工作流程需要更改范围,则会将参数列表注入用户工作流程以包含更改范围信息。
增量处理可以通过新的步骤作业类型 (ICDC) 和/或新的增量触发机制来启用。
用户可以将它们与所有现有的 Maestro 功能一起使用,例如 foreach 模式、基于有效时间戳水印的步骤依赖关系、写入-审核-发布模板化模式等。
主要优势通过这种设计,用户工作流可以非常轻松地采用增量处理。
用户业务逻辑也与 IPS 实现分离。
多阶段管道还可以将增量处理工作流与现有的常规工作流混合。
我们还发现,使用 IPS 后,可以通过删除处理回溯窗口复杂性的额外步骤或调用一些内部库来简化用户工作流。
将增量处理功能添加到 Netflix Maestro 中作为用户的新功能/构建块,将使用户能够以更高效的方式构建他们的工作流程,并以更简单的方式弥合差距,解决许多具有挑战性的问题(例如处理延迟到达的数据)。
新兴的增量处理模式在将用户管道引入 IPS 时,我们发现了一些增量处理模式:对捕获的增量变化数据进行增量处理,直接追加到目标表中这是简单的增量处理用例,其中更改数据包含数据处理所需的所有信息。
上游更改(通常来自单个源表)会传播到下游(通常是另一个目标表),工作流管道只需要处理更改数据(可能与其他维度表连接),然后合并到(通常是附加到)目标表中。
此模式将取代回顾窗口模式来处理延迟到达的数据。
用户工作流无需使用回顾窗口模式完全覆盖过去 X 天的数据,只需通过处理 ICDC 表将更改数据(包括延迟到达的数据)合并到目标表中即可。
使用捕获的增量变化数据作为行级过滤列表,以消除不必要的转换ETL 作业通常需要根据某些分组键聚合数据。
更改数据将揭示由于来自源表的新数据而需要重新聚合的所有分组键。
然后,ETL 作业可以使用 ICDC 作为过滤器,根据这些分组键将原始源表与 ICDC 表连接起来,以加快处理速度,从而能够计算更小的数据集。
业务转换逻辑没有变化,ETL 工作流也没有重新设计。
ETL 管道保留了批处理工作流的所有优势。
在业务逻辑中使用捕获的范围参数这种模式通常用于复杂的用例,例如连接多个表并执行复杂的处理。
在这种情况下,变化数据不能提供 ETL 工作流所需输入的全貌。
相反,变化数据指示给定输入表或通常是多个输入表中特定字段集(可能是分区键)的一系列更改的数据集。
然后,所有输入表的更改范围的并集给出工作流所需的完整更改数据集。
此外,通常必须覆盖整个数据范围,因为转换不是无状态的并且取决于先前范围的结果。
另一个示例是,必须基于分区中的整个数据集来更新目标表中的聚合记录或查询中的窗口函数(例如,计算整个分区的介质)。
基本上,从变化数据得出的范围指示要重新处理的数据集。
用例Netflix 的数据工作流通常必须处理延迟到达的数据,由于其简单且易于实现,通常使用回溯窗口模式来解决。
在回溯模式中,ETL 管道将始终使用源表中过去 X 个分区的数据,然后在每次运行中覆盖目标表。
这里,X 是由管道所有者根据其领域专业知识决定的数字。
缺点是计算和执行时间的成本。
它的成本通常比不考虑延迟到达数据的管道高出几乎 X 倍。
鉴于延迟到达的数据很稀疏,大多数处理都是在已经处理过的数据上完成的,这是不必要的。
另请注意,这种方法基于领域知识,有时会受到业务环境或数据工程师领域专业知识变化的影响。
在某些情况下,很难得出一个好的常数。
下面,我们将使用一个两阶段数据管道来说明如何使用 IPS 重建它以提高成本效率。
我们将观察到成本显著降低(> 80%),而业务逻辑几乎没有变化。
在此用例中,我们将回溯窗口大小 X 设置为 14 天,这在不同的实际管道中有所不同。
带有回顾窗口的原始数据管道playing_table:一个冰山表,保存了流媒体管道接收的用户设备的播放事件,这些数据比较稀疏,只有大约百分之几的数据是延迟到达的。
playing_daily_workflow:每日定时工作流,用于处理过去 X 天的 playing_table 数据,并将转换后的数据写入过去 X 天的目标表playing_daily_table:playback_daily_workflow 的目标表,过去 X 天内每天都会被覆盖playing_daily_agg_workflow:每日定时工作流,用于处理过去 X 天的 playing_daily_table 数据,并将过去 X 天的聚合数据写入目标表playing_daily_agg_table:playback_daily_agg_workflow 的目标表,在过去 14 天内每天都会被覆盖。
我们使用真实业务逻辑在示例数据集中运行此管道,以下是示例运行的平均执行结果第一阶段工作流程大约需要 7 个小时来处理playback_table数据第二阶段工作流程大约需要 3.5 小时来处理playback_daily_table数据具有增量处理功能的新数据管道使用 IPS,我们重写了管道,以尽可能避免重新处理数据。
新的管道如下所示。
阶段1:ips_playback_daily_workflow:它是playback_daily_workflow的更新版本。
然后,工作流 spark sql 作业将读取增量变化数据捕获 (ICDC) 冰山表(即playing_icdc_table),该表仅包含添加到 playing_table 中的新数据。
它包括延迟到达的数据,但不包含 playing_table 中任何未更改的数据。
业务逻辑会将INSERT OVERWRITE替换为MERGE INTO SQL 查询,然后将新数据合并到playback_daily_table中。
第二阶段:IPS 会捕获回放_daily_table 中已更改的数据,并将更改数据保存在 ICDC 源表 ( playback_daily_icdc_table ) 中。
因此,我们不需要在业务逻辑中硬编码回溯窗口。
如果回放_daily_table 中只有 Y 天的数据发生了更改,则它只需加载 Y 天的数据。
在ips_playback_daily_agg_workflow中,当天分区的业务逻辑将相同。
然后,​​我们需要更新业务逻辑,以处理延迟到达的数据,方法是将playback_daily表与playback_daily_icdc_table按照过去2到X天的聚合分组键进行JOIN,不包括当天(即第1天)由于迟到的数据比较稀疏,JOIN 会缩小playback_daily_table数据集,以便只处理其中的一小部分。
业务逻辑将使用MERGE INTO SQL 查询,然后将更改传播到下游目标表对于当天,业务逻辑将是相同的,并使用来自playback_daily_table的数据,然后使用INSERT OVERWRITE将结果写入目标表playback_daily_agg_table ,因为不需要与ICDC表连接。
通过这些小的改变,数据管道效率得到了很大的提高。
在我们的示例运行中,第一阶段工作流程仅需大约 30 分钟即可处理来自playback_table的X天变化数据。
第二阶段工作流大约需要 15 分钟通过与playback_daily_cdc_table数据连接来处理来自playback_daily_table的第2天到第X天的变化数据,并且再花费 15 分钟来处理当天(即第 1 天)playback_daily_table 的变化数据。
此处,原始管道和新管道中的 Spark 作业设置相同。
因此,总体而言,新的基于 IPS 的管道总体上需要大约10%的资源(以执行时间为衡量标准)才能完成。
期待我们将改进 IPS,以支持除仅附加情况之外的更复杂情况。
IPS 将能够跟踪表更改的进度,并支持多种 Iceberg 表更改类型(例如附加、覆盖等)。
我们还将在 IPS 中添加托管回填支持,以帮助用户构建、监控和验证回填。
我们正在将大数据编排提升到一个新的水平,并不断解决新的问题和挑战,敬请期待。
致谢感谢我们的产品经理Ashim Pokharel推动战略和需求。
我们还要感谢 Andy Chu、Kyoko Shimada、Abhinaya Shetty、Bharath Mummadisetty、John Zhuge、Rakesh Veeramacheneni 和 Netflix 的其他优秀同事在开发 IPS 时提出的建议和反馈。
我们还要感谢 Prashanth Ramdas、Eva Tse、Charles Smith 和 Netflix 工程组织的其他领导人对 IPS 架构和设计的建设性反馈和建议。
作者:Jun He, Yingyi Zhang, and Pawan Dixit出处:https://netflixtechblog.com/incremental-processing-using-netflix-maestro-and-apache-iceberg-b8ba072ddeeb

联系我们

在线咨询:点击这里给我发消息