数据输入转换和输出

Kettle包含大量组件,但逐一介绍这些组件其实并不是我们学习的重点,我们通常只需要掌握Kettle中最核心的组件,即数据输入组件、数据转换组件和数据输出组件,通过组装这些组件我们可以实现一个完整的ETL流程。这篇笔记我们以一个完整例子的形式介绍Kettle的基本使用方法,这个例子中将涉及从多个数据源抽取数据,对数据进行合并和加工,以及最终将数据写入目标数据库,借助这个例子我们可以实现一个最基础的ETL流程。

数据源和表结构介绍

假设我们有这样两个数据源:

  • 电子商城系统(netstore):该系统中我们有订单表t_order,订单表中有订单编号、下单的客户ID、订单金额等信息
  • 客户管理系统(crm):该系统中我们有客户表t_customer,客户表中有客户的基础信息,包括客户的生日,但该字段是可选的,它可能为NULL

数据源表结构如下:

CREATE TABLE `t_order` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `order_code` varchar(45) NOT NULL COMMENT '订单编号',
  `customer_id` bigint(20) NOT NULL COMMENT '客户ID',
  `total_price` decimal(19,2) NOT NULL COMMENT '订单金额',
  `create_time` datetime NOT NULL COMMENT '下单时间',
  PRIMARY KEY (`id`)
) COMMENT='订单表';
CREATE TABLE `t_customer` (
  `customer_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '客户ID',
  `name` varchar(45) NOT NULL COMMENT '客户姓名',
  `birthday` datetime DEFAULT NULL COMMENT '客户生日',
  PRIMARY KEY (`customer_id`)
) COMMENT='客户表';

需求:我们需要分析客户年龄和下单金额之间的关系,但商城系统中并没有存储任何客户年龄相关的信息,这些信息存储在客户管理系统中,此时我们就必须进行跨数据库的表连接操作,然后将结果记录存储到新的数据库中,此外我们还要筛掉生日字段为NULL的客户数据记录,它是无效数据。根据这些需求,Kettle很适合完成这个任务。

结果表结构如下:

CREATE TABLE `t_order_analytics` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `order_code` varchar(45) NOT NULL COMMENT '订单编码',
  `total_price` decimal(19,2) NOT NULL COMMENT '订单金额',
  `customer_birthday` datetime NOT NULL COMMENT '客户生日',
  PRIMARY KEY (`id`)
) COMMENT='数据汇总分析表';

创建数据库连接

具体配置Kettle的ETL流程前,我们需要先配置数据库连接,我们可以在Spoon设计器的左侧找到Database connections菜单,我们可以在其中新增数据库连接。

在连接配置界面,我们输入数据库服务器的主机、端口、数据库名、用户名和密码等信息,此外Kettle的数据库连接也支持连接池,不过默认没有开启,对于较大量数据的ETL流程我们可能需要使用连接池,这部分可以在Pooling选项卡配置。

创建好连接后,我们可以在其上右键选择Share,这样连接信息在其它Job或Transformation中都可被共享使用,我们编辑不同的.kjb.ktr时就不必每个组件内都重复创建连接了。

此外还要注意,Kettle中默认没有集成数据库驱动,上面例子中我们使用了MySQL数据库,因此还需要将对应的驱动包放在Kettle的lib目录中,否则Kettle是无法成功连接数据库的,我这里使用的是MySQL5.7和mysql-connector-java-8.0.29.jar,驱动包可以在Maven中央仓库找到。

创建Job

我们这个ETL流程最外层的Job中其实没有特别的内容,它包含Start开始组件、调用Transformation的组件、以及Success成功结束组件,不过实际开发中情况通常要复杂很多,我们的Job内可能嵌套了许多子Job和Transformation,并且可能包含复杂的数据流逻辑、错误处理等流程。

Transformation组件中我们配置了要执行的.ktr文件,注意我们使用了${Internal.Entry.Current.Directory}这个变量,它可以理解为当前文件的文件系统路径,这样我们在执行Job时,无论子组件放在哪个基础路径下,它都能正确加载执行。

注意:Kettle中,输入框右侧如果有一个S图标,表示该位置可以使用变量,变量统一使用${}格式拼接。

创建Transformation

我们继续编辑Transformation,并设计如下的流程。我们分别从两个数据源表t_ordert_customer使用Table input组件抽取数据,然后使用Merge join组件进行表连接,随后使用Filter rows组件将生日字段为空的数据记录筛掉,最后使用Table output组件将数据写入目标数据源表。

表输入组件中,我们配置了数据库连接和具体执行的SQL语句。注意表输入组件的SQL语句不要使用分号结尾(我就遇到过分号引起Oracle报ORA-00911:非法字符),且只能存在一条查询语句,否则可能出现奇怪的错误。此外,由于我们使用了Merge join组件,它要求两个被合并的数据流必须是用连接键排序的,否则无法正确连接,因此我们这里直接使用order by语句让数据库按顺序返回。

表连接组件中,我们指定了连接的两个源组件,连接类型使用内连接INNER,连接键使用客户ID。

数据过滤组件中,我们配置了一个条件,它在birthday不为NULL时返回True,此时会执行Table output组件。

表输出组件中,我们配置了数据源连接、目标表,以及数据流中字段和目标表的映射关系。其中我们勾选了Truncate table,这表示我们的更新是全量的,每次执行数据插入前都会清空该表;此外我们还勾选了Specify database fields,勾选此选项后我们才可以在Database fields选项卡中配置字段映射,如果不勾选,我们就要保证数据流中的数据和表字段名完全一致。

执行Job

我们点击Run按钮即可执行Job,在执行过程中,我们可以看到每个组件的执行情况和对应日志信息。

执行成功结果如上图。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。