打破数据孤岛,使用 Amazon Redshift 流媒体和 Amazon MSK 流式传输您的 C
- 40
解决数据孤岛问题:利用 Amazon Redshift 和 Amazon MSK 实现 CDC 数据流
由 Umesh Chaudhari 和 Vishal Khatri 撰写于 2023 年 12 月 13 日,发布于 Amazon 管理的 Apache Kafka 流媒体Amazon MSK、Amazon Redshift、客户解决方案、技术如何做
永久链接评论区分享
关键要点
数据在时间流逝中会失去价值,实时分析业务交易显得越来越重要。传统的批处理方法在数据移动中添加了延迟,而基于变更数据捕获CDC的方法能够实时捕获数据变化。利用 Amazon Redshift 流媒体,可以将 CDC 数据从多个数据源汇总至数据仓库中,以减少延迟并提升分析效率。本文展示了如何通过 Amazon MSK 和 Redshift 流媒体进行数据复制和聚合。数据随着时间的推移而失去其价值。我们了解到,客户希望能够实时分析业务交易。传统上,客户通常采用基于批处理的数据移动方式,这种方式的批次加载可能每天运行一到数次。而批处理方法会引入数据移动的延迟,从而降低数据用于分析的价值。基于变更数据捕获CDC的方法成为了替代批处理的选项。CDC可以捕获源数据库中的数据变化如插入、更新和删除,并不断将这些变化更新到目标数据库。如此一来,当CDC频率较高时,源数据库能够快速变化,而目标数据库通常是数据仓库则需要几乎实时地反映这些变化。
随着数据爆炸式增长,组织内数据系统的数量也在增加。数据孤岛使得数据在不同源上存在,这使得分析变得困难。
为了获得更深刻和更丰富的见解,您可以将不同数据孤岛中的所有变化集中到一个地方,如数据仓库。这篇文章展示了如何利用流媒体导入数据到 Amazon Redshift。
Redshift 流媒体导入 提供低延迟、高吞吐量的数据导入,使客户能够在几秒钟内推出洞察,而非几分钟。该过程简单易用,能够直接将流数据从 Amazon Kinesis Data Streams 和 Amazon 管理的 Streaming for KafkaAmazon MSK导入数据仓库,而无需借助 Amazon Simple Storage ServiceAmazon S3进行中转。使用 SQL 语句,您可以创建物化视图。在此之后,利用物化视图刷新功能,您可以每秒导入数百兆的数据。
解决方案概述
在本文中,我们创建了一个低延迟的数据复制功能,将 Amazon Aurora MySQL 中的数据传输至 Amazon Redshift 数据仓库,使用来自 Amazon MSK 的 Redshift 流媒体导入。利用 Amazon MSK,我们安全地通过一个完全管理、可高可用的 Apache Kafka 服务进行数据流。 Apache Kafka 是一个开源的分布式事件流平台,数千家公司使用它来构建高性能的数据管道、流分析、数据集成以及使命关键的应用程序。我们在 Amazon MSK 中存储 CDC 事件一段时间,这使得这些事件能够传递到其他目标,例如 Amazon S3 数据湖。
我们在 Amazon MSK Connect 上部署了 Debezium MySQL 源 Kafka 连接器。 Amazon MSK Connect 使得在 Apache Kafka 集群与外部系统如数据库、文件系统和搜索索引之间移动数据的连接器部署、监测和自动扩展变得简单。Amazon MSK Connect 完全兼容 Apache Kafka Connect,使您可以在没有代码变更的情况下轻松迁移您的 Apache Kafka Connect 应用。
本解决方案基于 Amazon Aurora MySQL 中的示例数据库 salesdb。该数据库的用户可以执行行级的 INSERT、UPDATE 和 DELETE 操作,以便在示例 salesdb 数据库中生成变更事件。Debezium MySQL 源 Kafka 连接器读取这些变更事件并将其输出到 Amazon MSK 的 Kafka 主题。然后,Amazon Redshift 利用其流媒体功能从 Amazon MSK 的 Kafka 主题中读取消息,并通过物化视图存储这些消息,在消息到达时进行处理。
您可以通过查看这个示例 这里 了解 CDC 如何生成创建事件。在我们的解决方案中,我们使用了 OP 字段这是一种强制性的字符串,描述了何种操作触发连接器生成该事件。在本示例中,c 表示该操作创建了一个行。OP 字段的有效值包括:
c = 创建u = 更新d = 删除r = 读取仅适用于快照下图展示了该解决方案的架构:
解决方案的工作流包含以下步骤:
Amazon Aurora MySQL 拥有一个二进制日志即 binlog,记录所有操作INSERT、UPDATE、DELETE,以提交到数据库的顺序进行记录。Amazon MSK Connect 运行名为 Debezium 的 MySQL 源 Kafka 连接器,读取 binlog,为行级的 INSERT、UPDATE 和 DELETE 操作生成变更事件,并将这些变更事件发出到 Amazon MSK 的 Kafka 主题中。配置了 Amazon Redshift 的集群作为数据流的消费者,可以从 Amazon MSK 的 Kafka 主题读取消息。Amazon Redshift 中的物化视图为读取流数据的落地区域,随着数据的到达不断进行处理。物化视图刷新时,Amazon Redshift 计算节点会将一组 Kafka 分区分配给计算切片。每个切片从分配的分区中消耗数据,直到视图与 Kafka 主题的最后偏移达到一致。随后的物化视图刷新将从前一次刷新的最后偏移读取数据,直到与主题数据达到一致。在 Amazon Redshift 中,我们创建了存储过程以处理 CDC 记录并更新目标表。前提条件
本文假设您在环境中有一个运行中的 Amazon MSK Connect 堆栈,包含以下组成部分:
托管的 Aurora MySQL 数据库。在本文中,使用示例数据库 salesdb。在 Amazon MSK Connect 上运行的 Debezium MySQL 连接器,它连接到您 Amazon 虚拟私有云Amazon VPC中的 Amazon MSK。Amazon MSK 集群。如果您没有 Amazon MSK Connect 堆栈,请按照 MSK Connect 实验室设置 中的说明进行操作,并 确认您的源连接器 是否能够将数据更改复制到 Amazon MSK 主题。
您应该在与 Amazon MSK 集群相同的 VPC 中配置 Amazon Redshift 集群。如果尚未部署,可以根据 此处 的步骤进行操作。
我们使用 AWS 身份与访问管理AWS IAM进行身份验证,以便在 Amazon MSK 和 Amazon Redshift 集群之间建立通信。请确保您创建了一个 AWS IAM 角色,并确保拥有允许您的 Amazon Redshift 集群承担该角色的信任策略。有关如何配置 AWS IAM 角色的信任策略的信息,请参见 授权 Amazon Redshift 代表您访问其他 AWS 服务。创建后,该角色应具有如下 AWS IAM 策略,以便能够与 Amazon MSK 集群进行通信。
json{ Version 20121017 Statement [ { Sid MSKIAMpolicy Effect Allow Action [ kafkaclusterReadData kafkaclusterDescribeTopic kafkaclusterConnect ] Resource [ arnawskafka0123456789cluster/xxx/xxx arnawskafka0123456789topic/// ] } { Sid MSKPolicy Effect Allow Action [ kafkaGetBootstrapBrokers ] Resource arnawskafka0123456789cluster/xxx/xxx } ]}请替换上面示例策略中包含的 xxx 的 ARN,使其对应于您的 Amazon MSK 集群的 ARN。
此外,确认 Amazon Redshift 集群能够访问 Amazon MSK 集群。在 Amazon Redshift 集群的安全组中,添加允许 MSK 安全组访问端口 9098 的入站规则。要了解如何管理 Redshift 集群的安全组,请参见 管理集群的 VPC 安全组。在 Amazon MSK 集群的安全组中,添加允许端口 9098 访问您 Amazon Redshift 集群领导者 IP 地址的入站规则,如下图所示。您可以在 AWS 管理控制台 的属性选项卡中找到 Amazon Redshift 集群的领导者节点的 IP 地址。操作步骤
在 AWS 管理控制台中导航到 Amazon Redshift 服务,然后通过执行以下步骤设置针对 Amazon MSK 的 Amazon Redshift 流媒体导入:
启用 casesensitiveidentifier 为 true 如果您使用的是 Amazon Redshift 集群的默认参数组,则无法将 enablecasesensitiveidentifier 设置为 true。您可以创建一个新的 参数组 并将 enablecasesensitiveidentifier 设置为 true,并将其附加到 Amazon Redshift 集群。修改参数值后,您必须重启与修改过的参数组关联的所有集群。这可能会需要几分钟的时间来重启 Amazon Redshift 集群。该 配置 选项决定数据库、表和列的名称标识符是否区分大小写。完成后,请打开新的 Amazon Redshift 查询编辑器 V2,以便我们所做的配置更改生效,然后继续下一步。
创建一个外部架构以映射到流媒体数据源。sql CREATE EXTERNAL SCHEMA MySchema FROM MSK IAMROLE arnawsiamYourRolerole/mskredshiftstreaming AUTHENTICATION IAM CLUSTERARN arnawskafkauseast12073196cluster/MSKClustermskconnectlab/849b47a065f2439eb1811038ea9d449310 // 替换最后部分为您的集群 ARN,这里仅作示例。//
完成后,确认您是否可以看到基于 MSK 主题创建的以下表:
创建一个引用外部架构的物化视图。sql CREATE MATERIALIZED VIEW customerdebezium AUTO REFRESH YES AS SELECT jsonparse(kafkavalue) as payload from devmyschemasalesdbsalesdbCUSTOMER // 用您在步骤 2 中为外部架构指定的名称替换 myschema //
现在,您可以使用以下命令查询新创建的物化视图 customerdebezium。
sql SELECT FROM devpubliccustomerdebezium order by refreshtime desc
tk加速器免费下载检查物化视图是否成功填充 CDC 记录
刷新物化视图可选。此步骤是可选的,因为在创建物化视图时我们已经指定了 AUTO REFRESH AS YES。sql REFRESH MATERIALIZED VIEW devpubliccustomerdebezium
注意:如果物化视图设置为自动刷新的,意味着如果您没有立即看到记录,则需等待几秒钟后再重新运行选择语句。Amazon Redshift 流媒体导入视图也提供手动刷新的选项,让您能够手动刷新对象。您可以使用以下查询立即将流媒体数据拉入 Redshift 对象。

sql SELECT FROM