Kettle使用
# 配置数据库
# 配置数据库连接
- 下载对应数据库的驱动 (JAVA依赖包),然后放到Kettle目录下的
./lib
目录中。- 可以去对应数据库的官网或者第三方仓库下载。
- 第三方仓库:
https://mvnrepository.com/
,搜索对应数据库名称,然后选择/J
结尾的进去,再选择对应数据库版本,然后在Files
那一栏单击jar
进行jar包下载。
- 作业或转换中的主对象树中,右键DB连接 -> 新建。
- 输入数据库信息后,单击测试进行测试连接,测试成功后单击确认即可。
# 共享数据库连接
数据库连接默认只对当前执行创建操作的作业或转换有效,所以我们一般会将建好的数据库连接进行共享,以方便在其他作业或者转换中使用。
# Transformation
# 输入类步骤
输入是步骤用于从外部数据源中读取数据,它用于完成ETL中数据的**提取(Extract)**工作。
# CSV文件输入
CSV文件是一个用逗号分隔的固定格式的文本文件,这种文件后缀名为.csv,可以用Excel或者文本编辑器打开。在企业里面一般最常见的ETL需求就是将csv文件转换为excel文件,如果用Kettle来做这个ETL工作,就需要用到CSV文件输入步骤。
- 步骤名称:同转换中步骤名不能重复。
- 文件名:选择对应的csv文件路径。
- 列分隔符:默认是逗号。
- 封闭符:表示行数据结束的符号。
- NIO缓存大小:文件如果行数过多,需要调整此参数。
- 包含列头行:表示文件中第一行是字段名称行,表头不进行读写。
- 行号字段:如果需要从某行开始读写,可在此输入行号。
- 并发运行:选择并发运行,可提高读写速度。
- 字段中有回车换行:不要选择,会将换行符做数据读取。
- 文件编码:如果预览数据出现乱码,可更换文件编码。
# 文本文件输入
提取服务器上的日志信息是公司里ETL开发很常见的操作,日志信息基本上都是文本类型,因此文本文件输入也是Kettle中常用的一个输入步骤。
文本文件输入步骤使用方法如下:
- 添加需要转换的文本文件。
- 按照日志文件格式,指定分隔符。
- 设置字段格式。
- 然后预览下记录,确认无误后单击确定即可。
# Excel输入
Excel输入也是很常用的输入步骤,一般企业里会用此控件对大量的Excel文件进行ETL操作。
Excel输入步骤使用方法如下:
- 按照源文件格式指定表格类型,然后选择并添加对应的Excel文件。
- 获取Excel的Sheet工作表。
获取字段,并设置字段格式。
然后预览下记录,确认无误后单击确定即可。
# JSON输入
如果我们需要将JSON作为输入步骤,则我们需要先了解JSON Path的用法,JSON Path使用用来匹配JSON中数据用的,格式一般分为点记法(Dot-Notation)和括号记法(Bracket-Notation),两种方式实际上和Python中对象取值和字典取值类似。
在Kittle中,JSON输入提供可视化选择字段生成JSON Path,可以方便进行字段选择。我们只需要在可视化选择的路径无法满足需求时,再手动编写JSON Path即可。
JSON Path操作符:
操作符 | 概述 |
---|---|
$ | 表示根节点 |
@ | 表示当前节点 |
.name 或 ['name'] | 表示取子节点 |
[number] 或 ['name'] | 取数组下标或对象子节点 |
[number<, ...>] 或 ['name'<, ...>] | 取多个数组下标或对象子节点 |
[start:end] | 数组切片 |
* | 表示通配符匹配节点 |
.. | 递归匹配键名 |
?(<expression>) | 过滤表达式操作,表达式结果必须是Boolean |
例如:
[ { "name": "Users", "data":[ { "name": "hana", "age": 17 }, { "name": "sakura", "age": 18 } ] } ] # 匹配hana的age,则可以使用以下JSON Path进行匹配: #点记法: $[0].data[0].age #括号记法: $[0]['data'][0]['age'] # 递归匹配所有键为name的值: $[0]..name $[0]..['name']
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
JSON输入步骤使用方法如下:
- 添加需要转换的JSON文件。
- 根据JSON Path获取到需要的字段,并设置字段格式。
- 然后预览下记录,确认无误后单击确定即可。
# 表输入
Kettle中的表输入步骤中的表,实际指的就是数据库表。Kettle可以连接市面上常见的各种数据库,比如Oracle、Mysql、SqlServer等。但在使用表输入之前,我们需要先配置好数据库连接,然后才能使用表输入步骤。
- 选择对应的数据库连接,然后单击【获取SQL查询语句】,它会生成查询所有字段的查询语句,如下图所示。另外也可以自己输入SQL查询语句。
- 然后预览下记录,确认无误后单击确定即可。
# 输出类步骤
输入主要用于将处理后的数据输出到目标数据源中,它用于完成ETL中数据的**装载(Load)**工作。
# 文本文件输出
文本文件输出步骤用于将数据输出成文本。
文本文件输出步骤使用方法如下:
- 设置要输出到的目录、文件名,以及扩展名,比如txt、csv等。
- 在内容框里设置合适的分隔符,比如分号,逗号,TAB等。
- 在字段选项卡中获取字段,并设置字段格式,确认无误后单击确定即可。
注意:如果输出的文本中有多余的空格,在字段页面单击一下最小宽度即可。
# Excel输出
Excel输出步骤用于将数据输出成Excel表格,Kettle中自带Excel输出
、Microsoft Excel输出
,我们一般用Microsoft Excel输出
即可。
- Excel输出只能输出xls文件,适用于Excel2003。
- Microsoft Excel输出可以输出xls和xlsx文件,适用于Excel2007及以后。
Excel输出步骤使用方法如下:
- 设置要输出到的目录、文件名,以及扩展名。
- 在字段选项卡中获取字段,并设置字段格式,确认无误后单击确定即可。
# SQL文件输出
SQL文件输出一般跟表输入做连接,他会将数据库表的表结构和数据以SQL文件的形式导出,可以达到数据备份的目的。
SQL文件输出步骤使用方法如下:
选择对应的数据库连接、目标表。
按需勾选增加
创建表语句
和每个语句另起一行
。设置要输出到的目录、文件名,扩展名默认为sql无需修改。
确认无误后单击确定即可。
# 表输出
表输出步骤可以直接将数据写入到数据库的表中。
- 数据库连接、目标模式、目标表:顾名思义选择对应连接的对应目标库表即可。
- 提交记录数量 :该配置表示单次事务最多插入的数据行,为0表示不使用事务批量提交。
- 裁剪表:勾选该配置后,在数据输出前Kettle会先执行truncate语句清空目标表中所有数据,然后才会进行输出,同等于全量同步。
- 忽略插入错误:也就是忽略错误的数据,只插入符合要求的数据。至于错误的数据我们可以通过定义错误处理,输出到其他文件。
- 可以右键表输出,然后选择定义错误处理,将错误的记录输出到指定文件中。
- 另外该功能和批量插入冲突,使用了批量插入就不能忽略插入错误,遇到错误会停下。
- 指定数据库字段:只有勾选了这个,才能对上一步的数据与表字段名进行关系映射。不勾选则会以上一步的字段名进行插入,此时如果两者字段名不匹配则会插入失败。
- 勾选后数据库字段选项卡将变为可用,其中的表字段指的是目标表的字段,流字段指的是上一步骤的字段。
- 表分区数据:表分区功能可以将表按照指定字段进行分区存储,比如将data表分为202201、202202等多个分区。
- 使用批量插入:默认是勾选的,可以提高数据插入效率。
- 表名定义在一个字段里:该功能可以将表按照指定字段进行分表存储,例如指定了性别字段,该字段有两个值man和woman,则表输出是会按照man表和woman表进行分表存储。
- 返回一个自动产生的关键字:该功能可以在插入行时,对指定的目标表关键字字段进行自增。
- 该字段通过自动产生的关键字的字段名称进行指定,指定后在数据库字段选项卡中就不用指定关键字字段的映射关系了。
- 如果上一步骤的数据流中没有关键字字段,我们就能使用该功能进行关键字自增。
# 更新
更新步骤主要用于对目标表已有的数据进行更新操作,但如果出现流数据比目标表数据更多的情况则会报错,所以一般用插入/更新会比较多些。
- 数据库连接、目标模式、目标表:顾名思义选择对应连接的对应目标库表即可。
- 提交记录数量 :该配置表示单次事务最多更新的数据行,为0表示不使用事务批量提交。
- 批量更新:批量更新可以提高更新效率。
- 跳过查询:跳过查询在更新时不会先对数据进行查询,可以提高更新效率。
- 需要注意的是,如果勾选了该功能、并且更新字段中包含修改时自动更新时间的字段,那么不管数据实际是否发生了变化,时间都会被更新。
- 批量更新&跳过查询:同时使用批量更新和跳过查询功能可以大幅提高数据的更新速度,但在某些数据库可能会出现报错,所以根据实际情况使用即可。
- 用来查询值的关键字:指定作为更新条件的目标表字段和流字段的关系,一般使用主键关系作为条件,它同等于UPDATE语句中的WHERE子句。
- 更新字段:指定要更新的目标表字段与其对应的流字段,它同等于UPDATE语句中的SET子句。
# 插入/更新
插入/更新步骤主要用于对目标表已有的数据进行更新操作和对目标表没有的数据进行插入操作。它在更新的基础上进行了优化,如果流数据比目标表数据更多,则会将多出的数据插入到目标表中。
- 数据库连接、目标模式、目标表:顾名思义选择对应连接的对应目标库表即可。
- 提交记录数量 :该配置表示单次事务最多插入或更新的数据行,为0表示不使用事务批量提交。
- 不执行任何更新:勾选该选项后只会插入新数据,不会更新已存在的数据。
- 用来查询值的关键字:指定作为更新条件的目标表字段和流字段的关系,一般使用主键关系作为条件,它同等于UPDATE语句中的WHERE子句。
- 更新字段:指定要更新的目标表字段与其对应的流字段,它同等于UPDATE语句中的SET子句。
# 删除
删除步骤用于删除目标表中指定条件的数据,还可以用于跟另外一个表数据做对比,然后进行去重的操作。
- 数据库连接、目标模式、目标表:顾名思义选择对应连接的对应目标库表即可。
- 提交记录数量 :该配置表示单次事务最多删除的数据行,为0表示不使用事务批量提交。
- 用来查询值的关键字:指定作为删除条件的目标表字段和流字段的关系,它同等于DELETE语句中的WHERE子句。
# 转换类步骤
输入步骤用于执行数据的转换、清洗、过滤等操作,它用于完成ETL中数据的**转换(Transform)**工作。Transform是整个ETL过程中工作量最大、耗费时间较久的一个环节,大概可以占到整个ETL流程的三分之二。
# Concat fields
Concat fields用于将多个字段进行拼接,形成一个新的字段。
# 值映射
值映射就是把字段的一个值映射成其他的值,比如将字符串"关闭"映射成数值0、字符串"启用"映射成数值1。
# 增加常量
增加常量就是在数据流中新增一列字段,该列的所有数据都是相同的值。
# 增加序列
增加序列是在数据流中新增一列字段,该列的值是一个递增序列,我们可以自定义指定递增步长。
# 字段选择
字段选择可以对数据流中的字段进行字段类型修改、字段改名、字段删除等操作。
# 计算器
计算器可以用于对已有字段进行计算并得出新字段,同时我们也能对计算得出的新字段进行再次计算。另外对于那些仅做临时使用的计算字段,我们还可以在计算步骤之后将其进行移除。
# 剪切字符串
剪切字符串步骤可以将输入流中的指定字段按照指定位置范围进行剪切,并将剪切内容赋值给新的输出流字段。
# 替换字符串
替换字符串步骤可以将输入流中的指定字段按照指定搜索内容进行匹配,匹配上的搜索内容会替换为目标字符串,并将替换结果赋值给新的输出流字段。
# 操作字符串
操作字符串步骤可以将输入流中的指定字段按照指定方式进行操作,例如去除字符串两端空格、大小写转换等,并将操作结果赋值给新的输出流字段。
# 排序记录
排序记录步骤可以根据指定字段对数据流进行升序排序或者降序排序。
# 去除重复记录
去除重复记录步骤用于去重数据流中相同的数据行,在使用该步骤之前必须先对需要去重的字段进行排序。
# 唯一行 (哈希值)
唯一行 (哈希值) 步骤用于删除数据流中重复的行。唯一行 (哈希值) 会给每个数据行建立哈希值,然后通过指定字段的哈希值来比较数据是否重复,该方式去重效率比较高,更建议使用。
# 拆分字段
拆分字段步骤用于将指定字段按照指定分隔符拆分成多个新字段,原字段拆分后会从数据流中消失。
如果拆分后有多个值,但是只定义了一个新字段,那么只会对应第一列的值到那一个新字段上。
# 列拆分为多行
列拆分为多行步骤用于将指定字段按照指定分隔符拆分成多个行,拆分后除了拆分用的字段外,其他字段的值会直接复制原行的。
- 输出中包含行号:如果数据流中包含行号,可以使用该功能重置新数据的行号,而非直接复制原行号。
效果例如:
# 行扁平化
行扁平化步骤用于将同一组的多行数据合并成为一行,可以理解为列拆分为多行的逆向操作。需要注意的是,在使用该步骤之前必须先对数据进行排序、且每个分组的数据条数要保证一致,否则数据会有错乱。
效果例如:
# 列转行
列转行步骤用于将多列转换为一行,列转行会将数据流按照指定的字段,将另一列的字段内容作为新的列字段名,然后将另另一列的字段作为数据。需要注意的是,在使用该步骤之前必须先对数据按照分组字段进行排序。
- 关键字段:要将数据内容变成列名的字段
- 分组字段:列转行后的分组字段
- 目标字段:增加的列的列名字段
- 数据字段:目标字段的数据字段
- 关键字值:对应目标字段的关键字段值
- 类型:目标字段的数据类型
效果例如:
# 行转列
列转行步骤用于将一行转换为多列,可以理解为列转行的逆向操作。
- Key字段:生成的列字段名
- 字段名称:原本数据流中的列字段名
- Key值:Key字段的值,可自定义的,一般和字段名称一样即可。
- Value字段:生成的对应Key值的数据列名
效果例如:
# 应用类步骤
应用类步骤包括多种辅助功能,如邮件发送、文件压缩、日志记录等。
# 替换NULL值
替换NULL值步骤用于将将数据流中的NULL值替换成其他的值。可以替换数据流中指定类型的NULL值,也可以替换数据流中指定字段的NULL值。
# 写日志
写日志步骤一般用于调试,该步骤会将数据流中的指定字段打印到控制台中。
效果例如:
# 流程类步骤
流程类步骤主要用于控制数据流程和数据流向。
# Switch/case
Switch/case步骤用于数据分流,它可以根据指定字段的值来决定要走哪个步骤,让数据流中的数据从一路到多路。
# 过滤记录
过滤记录步骤也用于数据分流,与Switch/case不同,过滤记录相当于If-else,它可以根据指定字段的值来进行条件判断,然后将数据流中的数据从一路分为两路。
- 上面是条件成立时走的步骤,下面是条件不成立时走的步骤。
# 空操作
空操作步骤就是什么都不做,一般作为数据流的终点,比如作为上面的过滤记录步骤中条件不成立时走的步骤。
# 中止
中止步骤就是终止整个转换程序,此步骤一般用于校验数据、调试程序等情况。如果有数据流到此步骤,整个转换程序将中止,并且在控制台输出报错信息。
# 查询类步骤
查询类步骤主要用于查询数据源中的数据,然后合并到数据流中。
# 数据库查询
数据库查询用于从数据库中查询出数据,然后跟数据流中的数据进行左连接。会以数据库表作为左表根据关联条件进行匹配查询,同等于SELECT xxx,xxx FROM xxx a LEFT JOIN b ON a.xxx= b.xxx
。
- 查询所需的关键字:用于指定左连接的关联条件。
- 查询表返回的值:用于指定查询要获取的字段,可以直接通过获取返回字段按钮获取。
# 流查询
流查询步骤用于查询两条数据流中的数据,然后按照指定的字段做等值连接,流查询在查询时会把数据都加载到内存中。
# 连接类步骤
连接类步骤用于将多个数据流通过关键字进行连接形成一个新的数据流。
# 合并记录
合并记录步骤用于将两个不同来源的数据流合并,这两个来源的数据分别为旧数据和新数据,该步骤会将旧数据和新数据按照指定的关键字匹配、比较、合并。
合并后的数据将包括旧数据和新数据中的所有数据,对于变化的数据会优先使用新数据代替旧数据,类似于Python中字典对象的update方法,同时在结果里会有一个标示字段,来指定新旧数据的比较结果。
注意:旧数据和新数据需要事先按照关键字段排序,并且旧数据和新数据要有相同的字段名称。
- 旧数据源:选择旧数据来源的步骤。
- 新数据源:选择新数据来源的步骤。
- 标志字段:标志字段的名称,标志字段用于保存比较的结果,比较结果有以下几种:
- identical - 旧数据和新数据一样。
- changed - 数据发生了变化。
- new - 新数据中有而旧数据中没有的记录。
- deleted - 旧数据中有而新数据中没有的记录。
- 关键字段:用于定位判断两个数据源中的同一条记录的字段。
- 数据字段:对于两个数据源中的同一条记录,指定需要比较的字段。
# 记录集连接
记录集连接步骤用于对两个步骤中的数据流进行内连接、左连接、右连接、外连接。需要注意的是,在进行记录集连接之前,需要分别对两个步骤的数据集的关联的字段进行排序,否则会数据错乱出现NULL值。
# 统计类步骤
统计类步骤用于提供数据的采样和统计功能。
# 分组
分组控件的功能类似于GROUP BY,可以按照指定的一个或者几个字段进行分组,然后其余字段可以按照聚合函数进行合并计算。注意在进行分组之前,数据最好先按分组字段进行排序。
# 映射类步骤
映射类步骤用于定义子转换,方便代码封装和重用。
# 映射
映射步骤用于在转换中调用子转换。
# 映射输入规范
映射输入规范步骤用于主转换向子转换输入数据,需要在子转换中定义。
# 映射输出规范
映射输出规范步骤用于子转换向主转换输出数据,需要在子转换中定义。
# 脚本类步骤
脚本类步骤用于通过写代码完成一些复杂的操作。
# 执行SQL脚本
执行SQL脚本步骤用于在指定数据库执行指定SQL语句。
# Job
1)新建一个作业。
2) 按照下图设置作业项和作业跳。
3)转换作业项设置,选择要嵌入的转换文件。
4) 发送邮件作业项设置。
# 资源库
数据库资源库是将作业和转换相关的信息存储在数据库中,执行的时候直接去数据库读取信息,方便管理资源。
1)点击Kettle右上角Connect,选择Other Resporitory。
2)选择Database Repository,表示使用数据库资源库。另外还有文件资源库,不过不是很常用。
3)建立数据库连接,指定作为资源库的数据库实例和库。
4)填好之后点击Finish。如果该库是新库,则会在指定的库中自动创建资源库表,如果该库之前已经作为资源库,则会直接使用该资源库。
5)连接资源库,默认账号:admin 默认密码:admin。
6)将之前的转换导入资源库。
选择从xml文件导入,然后选择要导入的转换,然后点击保存,选择存储位置及文件名即可,之后就能在资源库里查看了。
# Kettle调优
1)调整JVM大小进行性能优化,修改Kettle根目录下的Spoon脚本,尽量用大一点的内存参数启动Kettle。
-Xmx2048m:设置JVM最大可用内存为2048M。
-Xms1024m:设置JVM促使内存为1024m。此值可以设置与-Xmx相同,以避免每次垃圾回收完成后JVM重新分配内存。
-Xmn2g:设置年轻代大小为2G。整个JVM内存大小=年轻代大小 + 年老代大小 + 持久代大小。持久代一般固定大小为64m,所以增大年轻代后,将会减小年老代大小。此值对系统性能影响较大,Sun官方推荐配置为整个堆的3/8。
-Xss128k:设置每个线程的堆栈大小。JDK5.0以后每个线程堆栈大小为1M,以前每个线程堆栈大小为256K。更具应用的线程所需内存大小进行调整。在相同物理内存下,减小这个值能生成更多的线程。但是操作系统对一个进程内的线程数还是有限制的,不能无限生成,大概在3000~5000左右。
2)调整提交记录数大小进行优化,尽量提高批处理的Commit Size。Kettle默认Commit数量为1000,可以根据数据量大小来设置1000~50000。
3)尽量使用数据库连接池。
4)尽量使用缓存,缓存尽量大一些,主要是文本文件和数据流。
5)可以使用SQL来做的一些操作尽量用SQL,例如:Group、merge、stream lookup、split field这些操作都是比较慢的,能用SQL就用SQL。
6)能使用truncate table的时候,就不要使用delete all row这种类似sql合理的分区,如果删除操作是基于某一个分区的,就不要使用delete row这种方式(不管是SQL还是步骤),直接把分区drop掉,再重新创建。
7)尽量缩小输入的数据集的大小 (增量更新也是为了这个目的)。