Apache Sqoop的介绍及其操作

it2022-05-05  120

一、Apache Sqoop

Apache p Sqoop 是在 在 p Hadoop 生态体系和 和 S RDBMS 体系 之间传送数据的一种工具。来自Apache 软件基金会提供。Sqoop 工作机制是将导入或导出命令翻译成 mapreduce 程序来实现。在翻译mapreduce 中主要是对 inputformat 和 outputformat 进行定制。Hadoop 生态系统包括:HDFS、Hive、Hbase 等 RDBMS 体系包括:Mysql、Oracle、DB2 等 Sqoop 可以理解为:“SQL 到 Hadoop 和 Hadoop 到 SQL”

[外链图片转存失败(img-qFv41Kej-1563435837917)(C:\Users\10481\AppData\Roaming\Typora\typora-user-images\1563360478848.png)]

站在 Apache 立场看待数据流转问题,可以分为数据的导入导出: Import:数据导入。RDBMS----->Hadoop Export:数据导出。Hadoop---->RDBMS

二、sqoop的安装

安装 sqoop 的前提是已经具备 java 和 hadoop 的环境。 最新稳定版: 1.4.6 配置文件修改:

cd $SQOOP_HOME/conf mv sqoop-env-template.sh sqoop-env.sh vi sqoop-env.sh export HADOOP_COMMON_HOME= /export/servers/hadoop-2.7.5 export HADOOP_MAPRED_HOME= /export/servers/hadoop-2.7.5 export HIVE_HOME= /export/servers/hive

本命令会列出所有 mysql 的数据库。 到这里,整个 Sqoop 安装工作完成。

加入 mysql 的 jdbc 驱动包

cp /hive/lib/mysql-connector-java-5.1.32.jar $SQOOP_HOME/lib/

验证启动

bin/sqoop list-databases \ --connect jdbc:mysql://localhost:3306/ \ --username root --password hadoop

本命令会列出所有 mysql 的数据库。 到这里,整个 Sqoop 安装工作完成。

三、Sqoop导入

“导入工具”导入单个表从 RDBMS 到 HDFS。表中的每一行被视为 HDFS 的记录。所有记录都存储为文本文件的文本数据。 下面的语法用于将数据导入 HDFS。

$ sqoop import (generic-args) (import-args)

Sqoop 测试表数据 在 mysql 中创建数据库 userdb,然后执行参考资料中的 sql 脚本: 创建三张表: p emp 雇员表 、 d emp_add 雇员地址表 、n emp_conn 雇员联系表。

1 . 全量导入 mysql 表数据到 HDFS

下面的命令用于从 MySQL 数据库服务器中的 emp 表导入 HDFS。 #导入的固定配置 bin/sqoop import \ #数据库的连接信息 --connect jdbc:mysql://node03:3306/userdb \ --username root \ --password 123456 \ #如果导入的路径存在,自动删除 --delete-target-dir \ #导入Hadoop中的命令 --target-dir /sqoopresult \ #指定表名 --table emp #m 表示只有map阶段在运行 1 表示只有一个map --m 1

其中 – target- -r dir 可以用来指定导出数据存放至 S HDFS 的目录 ; mysql jdbc url 请使用 ip 地址。

为了验证在 HDFS 导入的数据,请使用以下命令查看导入的数据:

hdfs dfs -cat /sqoopresult/part-m-00000

可以看出它会在 HDFS 上默认用逗号,分隔 emp 表的数据和字段。可以通过下列来指定分隔符。

#指定分隔符 -- fields- - terminated- - by '\ t'

显示结果:

1201,gopal,manager,50000,TP 1202,manisha,Proof reader,50000,TP 1203,khalil,php dev,30000,AC 1204,prasanth,php dev,30000,AC 1205,kranthi,admin,20000,TP

如果表的数据太大,可以启动多个maptask执行操作,如果表没有主键,请指定根据那个主键进行切分

bin/sqoop import \ --connect jdbc:mysql://node03:3306/userdb \ --username root \ --password 123456 \ --target-dir /sqoopresult214 \ --fields-terminated-by '\t' \ #确认根据主键id进行切分 --split-by id \ --table emp #启动两个maptask执行操作 --m 2

总结:全量导入数据到hdfs

mysql的地址尽量不要使用localhost 请使用ip或者host如果不指定 导入到hdfs默认分隔符是 “,”可以通过-- fields-terminated-by '\ t‘ 指定具体的分隔符如果表的数据比较大 可以并行启动多个maptask执行导入操作,如果表没有主键,请指定根据哪个字段进行切分

2 . 全量入 导入 mysql 表数据到 到 HIVE

2.1. 方式一:先复制表结构到 hive 中再导入数据

将关系型数据的表结构复制到 hive 中

bin/sqoop create-hive-table \ --connect jdbc:mysql://node03:3306/userdb \ --table emp_add \ --username root \ --password 123456 \ --hive-table test.emp_add_sp

其中: –table emp_add 为 mysql 中的数据库 sqoopdb 中的表。 –hive-table emp_add_sp 为 hive 中新建的表名称。

从关系数据库导入文件到 hive 中

bin/sqoop import \ --connect jdbc:mysql://node03:3306/userdb \ --username root \ --password 123456 \ --table emp_add \ --hive-table test.emp_add_sp \ --hive-import \ --m 1

2.2.方式二: 直接复制表结构数据到 到 hive

bin/sqoop import \ --connect jdbc:mysql://node03:3306/userdb \ --username root \ --password 123456 \ --table emp_conn \ --hive-import \ --m 1 \ --hive-database test;

3 . 导入表数据子集(where 过滤)

–where 可以指定从关系数据库导入数据时的查询条件。它执行在数据库服 务器相应的 SQL 查询,并将结果存储在 HDFS 的目标目录。 bin/sqoop import \ --connect jdbc:mysql://node03:3306/sqoopdb \ --username root \ --password 123456 \ --where "city ='sec-bad'" \ --target-dir /wherequery \ --table emp_add --m 1

4 . 导入表数据子集(query 查询)

注意事项: 使用 query sql 语句来进行查找不能加参数–table ; 并且必须要添加 where 条件; 并且 where 条件后面必须带一个$CONDITIONS 这个字符串; 并且这个 sql 语句必须用单引号,不能用双引号;

bin/sqoop import \ --connect jdbc:mysql://node03:3306/userdb \ --username root \ --password 123456 \ --target-dir /wherequery12 \ --query 'select id,name,deg from emp WHERE id>1203 and $CONDITIONS' \ --split-by id \ --fields-terminated-by '\t' \ --m 2

sqoop 命令中,–split-by id 通常配合-m 10 参数使用。用于指定根据哪 个字段进行划分并启动多少个 maptask。

总结:使用 query sql 语句来进行查找不能加参数–table ; 并且必须要添加 where 条件; 并且 where 条件后面必须带一个$CONDITIONS 这个字符串; 并且这个 sql 语句必须用单引号,不能用双引号;

5 . 增量导入

在实际工作当中,数据的导入,很多时候都是只需要导入增量数据即可,并 不需要将表中的数据每次都全部导入到 hive 或者 hdfs 当中去,这样会造成数据 重复的问题。因此一般都是选用一些字段进行增量的导入, sqoop 支持增量的 导入数据。 增量导入是仅导入新添加的表中的行的技术。

– check- - column (col) 用来指定一些列,这些列在增量导入时用来检查这些数据是否作为增量数据 进行导入,和关系型数据库中的自增字段及时间戳类似。 注意:这些被指定的列的类型不能使任意字符类型,如 char、varchar 等类 型都是不可以的,同时-- check-column 可以去指定多个列。 – incremental (mode) append:追加,比如对大于 last-value 指定的值之后的记录进行追加导入。 lastmodified:最后的修改时间,追加 last-value 指定的日期之后的记录 – last- - value (value) 指定自从上次导入后列的最大值(大于该指定的值),也可以自己设定某一值

5.1. Append 模式 模式

执行以下指令先将我们之前的数据导入:

bin/sqoop import \ --connect jdbc:mysql://node03:3306/userdb \ --username root \ --password 123456 \ --target-dir /appendresult \ --table emp --m 1

使用 hadoop fs -cat 查看生成的数据文件,发现数据已经导入到 hdfs 中。 然后在 mysql 的 emp 中插入 2 条增量数据:

insert into `userdb`.`emp` (`id`, `name`, `deg`, `salary`, `dept`) values ('1206', 'allen', 'admin', '30000', 'tp'); insert into `userdb`.`emp` (`id`, `name`, `deg`, `salary`, `dept`) values ('1207', 'woon', 'admin', '40000', 'tp');

执行如下的指令,实现增量的导入:

bin/sqoop import \ --connect jdbc:mysql://node03:3306/userdb \ --username root --password 123456 \ --table emp --m 1 \ --target-dir /appendresult \ --incremental append \ --check-column id \ --last-value 1205

5.2.Lastmodified 模式 模式

首先创建一个 customer 表,指定一个时间戳字段:

create table customertest(id int,name varchar(20),last_mod timestamp default current_timestamp on update current_timestamp);

此处的时间戳设置为在数据的产生和更新时都会发生改变.

分别插入如下记录:

insert into customertest(id,name) values(1,'neil'); insert into customertest(id,name) values(2,'jack'); insert into customertest(id,name) values(3,'martin'); insert into customertest(id,name) values(4,'tony'); insert into customertest(id,name) values(5,'eric');

执行 sqoop 指令将数据全部导入 hdfs:

bin/sqoop import \ --connect jdbc:mysql://node-1:3306/userdb \ --username root \ --password hadoop \ --target-dir /lastmodifiedresult \ --table customertest --m 1

查看此时导出的结果数据…

再次插入一条数据进入 customertest 表

insert into customertest(id,name) values(6,'james')

使用 incremental 的方式进行增量的导入:

bin/sqoop import \ --connect jdbc:mysql://node-1:3306/userdb \ --username root \ --password hadoop \ --table customertest \ --target-dir /lastmodifiedresult \ --check-column last_mod \ --incremental lastmodified \ --last-value "2019-05-28 18:42:06" \ --m 1 \ --append

此处已经会导入我们最后插入的一条记录,但是我们却发现此处插入了 2 条数据,这是为什么呢? 这是因为采用 d lastmodified 于 模式去处理增量时,会将大于等于 last- - e value 值的数据当做增量插入。

5.3. Lastmodified 模式:append 、merge-key

使用 lastmodified 模式进行增量处理要指定增量数据是以 d append 模式(附 加)还是 merge- - key(合并)模式添加 下面演示使用 merge-by 的模式进行增量更新,我们去更新 id 为 1 的 name 字段。

update customertest set name = 'Neil' where id = 1;

更新之后,这条数据的时间戳会更新为更新数据时的系统时间. 执行如下指令,把 id 字段作为 merge-key:

bin/sqoop import \ --connect jdbc:mysql://node-1:3306/userdb \ --username root \ --password hadoop \ --table customertest \ --target-dir /lastmodifiedresult \ --check-column last_mod \ --incremental lastmodified \ --last-value "2019-05-28 18:42:06" \ --m 1 \ --merge-key id

由于 merge-key 模式是进行了一次完整的 mapreduce 操作, 因此最终我们在 lastmodifiedresult 文件夹下可以看到生成的为 part-r-00000 这样的文件,会发现 id=1 的 name 已经得到修改,同时新增了 id=6 的数据。

总结:

增量数据的导入

所谓的增量数据指的是上次至今中间新增加的数据sqoop支持两种模式的增量导入 append追加 根据数值类型字段进行追加导入 大于指定的last-valuelastmodified 根据时间戳类型字段进行追加 大于等于指定的last-value 注意在lastmodified 模式下 还分为两种情形:append merge-key

关于lastmodified 中的两种模式:

append 只会追加增量数据到一个新的文件中 并且会产生数据的重复问题

因为默认是从指定的last-value 大于等于其值的数据开始导入

merge-key 把增量的数据合并到一个文件中 处理追加增量数据之外 如果之前的数据有变化修改

也可以进行修改操作 底层相当于进行了一次完整的mr作业。数据不会重复。

四、 Sqoop 导出

将数据从 Hadoop 生态体系导出到 RDBMS 数据库导出前,目标表必须存在于 目标数据库中。 export 有三种模式: 默认操作是从将文件中的数据使用 INSERT 语句插入到表中。 更新模式:Sqoop 将生成 UPDATE 替换数据库中现有记录的语句。 调用模式:Sqoop 将为每条记录创建一个存储过程调用。 以下是 export 命令语法:

$ sqoop export (generic-args) (export-args)

1 . 默认模式出 导出 HDFS 数据到 mysql

默认情况下,sqoop export 将每行输入记录转换成一条 INSERT 语句,添加到 目标数据库表中。如果数据库中的表具有约束条件(例如,其值必须唯一的主键 列)并且已有数据存在,则必须注意避免插入违反这些约束条件的记录。如果 INSERT 语句失败,导出过程将失败。 此模式主要用于将记录导出到可以接收这些 结果的空表中。通常用于全表数据导出。 导出时可以是将 Hive 表中的全部记录或者 HDFS 数据(可以是全部字段也可 以部分字段)导出到 Mysql 目标表。

1.1.准备 HDFS 数据

在 HDFS 文件系统中“/emp/”目录的下创建一个文件 emp_data.txt:

1201,gopal,manager,50000,TP 1202,manisha,preader,50000,TP 1203,kalil,php dev,30000,AC 1204,prasanth,php dev,30000,AC 1205,kranthi,admin,20000,TP 1206,satishp,grpdes,20000,GR

1.2. 手动创建 mysql

mysql> USE userdb; mysql> CREATE TABLE employee ( id INT NOT NULL PRIMARY KEY, name VARCHAR(20), deg VARCHAR(20), salary INT, dept VARCHAR(10))

1.3. 执行导出命令

bin/sqoop export \ --connect jdbc:mysql://node-1:3306/userdb \ --username root \ --password hadoop \ #导出表的表名 --table employee \ #导出表的路径 --export-dir /emp/emp_data

1.4. 相关配置 相关配置 参数

–input-fields-terminated-by ‘\t’(默认以逗号来切割) 指定文件中的分隔符 –columns(指定目标表明中的字段顺序) 选择列并控制它们的排序。当导出数据文件和目标表字段列顺序完全一 致的时候可以不写。否则以逗号为间隔选择和排列各个列。没有被包含在– columns 后面列名或字段要么具备默认值,要么就允许插入空值。否则数据 库会拒绝接受 sqoop 导出的数据,导致 Sqoop 作业失败 –export-dir 导出目录,在执行导出的时候,必须指定这个参数,同时需要具 备–table 或–call 参数两者之一,–table 是指的导出数据库当中对应的表, –call 是指的某个存储过程。 –input-null-string --input-null-non-string 如果没有指定第一个参数,对于字符串类型的列来说,“NULL”这个字符 串就回被翻译成空值,如果没有使用第二个参数,无论是“NULL”字符串还 是说空字符串也好,对于非字符串类型的字段来说,这两个类型的空串都会 被翻译成空值。比如: –input-null-string “\N” --input-null-non-string "\N"

总结:注意:

导出的目标表需要自己手动提前创建 也就是sqoop并不会帮我们创建复制表结构

导出有三种模式:

默认模式 目标表是空表 底层把数据一条条insert进去更新模式 底层是update语句调用模式 调用存储过程

相关配置参数:

导出文件的分隔符 如果不指定 默认以“,”去切割读取数据文件 --input-fields-terminated-by如果文件的字段顺序和表中顺序不一致 需要–columns 指定 多个字段之间以","导出的时候需要指定导出数据的目的 export-dir 和导出到目标的表名或者存储过程名针对空字符串类型和非字符串类型的转换 “\n”

2 . 更新 导出 (updateonly 模式

2.1.参数说明

– update-key,更新标识,即根据某个字段进行更新,例如 id,可以指定多 个更新标识的字段,多个字段之间用逗号分隔。 – updatemod,指定 updateonly(默认模式),仅仅更新已存在的数据记录, 不会插入新纪录。

2.2.准备 HDFS 数据

在 HDFS “/updateonly_1/”目录的下创建一个文件 updateonly_1.txt:

1201,gopal,manager,50000 1202,manisha,preader,50000 1203,kalil,php dev,30000

2.3. 手动创建 mysql

mysql> USE userdb; mysql> CREATE TABLE updateonly ( id INT NOT NULL PRIMARY KEY, name VARCHAR(20), deg VARCHAR(20), salary INT);

2.4. 先执行全部导出操作

bin/sqoop export \ --connect jdbc:mysql://node-1:3306/userdb \ --username root \ --password hadoop \ --table updateonly \ --export-dir /updateonly_1/

2.5. . 查看此时 mysql

可以发现是全量导出,全部的数据

2.6. . 新增一个文件

updateonly_2.txt。修改了前三条数据并且新增了一条记录。上传至 /updateonly_2/目录下:

1201,gopal,manager,1212 1202,manisha,preader,1313 1203,kalil,php dev,1414 1204,allen,java,1515

2.7. . 执行更新导出

bin/sqoop export \ --connect jdbc:mysql://node-1:3306/userdb \ --username root --password hadoop \ --table updateonly \ --export-dir /updateonly_2/ \ --update-key id \ --update-mode updateonly

2.8. . 查看最终结果

最终只进行了更新操作

3 . 更新 导出 (allowinsert 模式

3.1. 参数说明:

– update-key,更新标识,即根据某个字段进行更新,例如 id,可以指定多 个更新标识的字段,多个字段之间用逗号分隔。 – updatemod,指定 allowinsert,更新已存在的数据记录,同时插入新纪录。 实质上是一个 insert & update 的操作。

3.2. . 准备 HDFS 数据

在 HDFS “/allowinsert_1/”目录的下创建一个文件 allowinsert_1.txt:

1201,gopal,manager,50000 1202,manisha,preader,50000 1203,kalil,php dev,30000

3.3. . 手动创建 mysql 中的目标表

mysql> USE userdb; mysql> CREATE TABLE allowinsert ( id INT NOT NULL PRIMARY KEY, name VARCHAR(20), deg VARCHAR(20), salary INT);

3.4. . 先执行全部导出操作

bin/sqoop export \ --connect jdbc:mysql://node-1:3306/userdb \ --username root \ --password hadoop \ --table allowinsert \ --export-dir /allowinsert_1/

3.5. . 查看此时 mysql 中的数据

可以发现是全量导出,全部的数据

3.6. . 新增一个文件

allowinsert_2.txt。修改了前三条数据并且新增了一条记录。上传至/ allowinsert_2/目录下:

1201,gopal,manager,1212 1202,manisha,preader,1313 1203,kalil,php dev,1414 1204,allen,java,1515

3.7. . 执行更新导出

bin/sqoop export \ --connect jdbc:mysql://node-1:3306/userdb \ --username root --password hadoop \ --table allowinsert \ --export-dir /allowinsert_2/ \ --update-key id \ --update-mode allowinsert

3.8. . 查看最终结果

导出时候的日志显示导出 4 条记录: 数据进行更新操作的同时也进行了新增的操作

**总结:**更新导出

updateonly 只更新已经存在的数据 不会执行insert增加新的数据allowinsert 更新已有的数据 插入新的数据 底层相当于insert&update

五、 Sqoop job 作业

1 . job 语法

$ sqoop job (generic-args) (job-args) [-- [subtool-name] (subtool-args)] $ sqoop-job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]

2 . 创建 job

在这里,我们创建一个名为 myjob,这可以从 RDBMS 表的数据导入到 HDFS 作业。 下面的命令用于创建一个从DB数据库的emp表导入到HDFS文件的作业。

bin/sqoop job --create myjob -- import --connect jdbc:mysql://node-1:3306/userdb \ --username root \ --password hadoop \ --target-dir /sqoopresult333 \ --table emp --m 1

注意 import 前要有空格

3 . 验证 job

‘–list’ 参数是用来验证保存的作业。下面的命令用来验证保存 Sqoop 作业的 列表。

bin/sqoop job --list

4 . 检查 job

‘–show’ 参数用于检查或验证特定的工作,及其详细信息。以下命令和样本 输出用来验证一个名为 myjob 的作业。

bin/sqoop job --show myjob

5 . 执行 job

‘–exec’ 选项用于执行保存的作业。下面的命令用于执行保存的作业称为 myjob。

bin/sqoop job --exec myjob

6 . 免密执行 job

sqoop 在创建 job 时,使用–password-file 参数,可以避免输入 mysql 密码, 如果使用–password将出现警告,并且每次都要手动输入密码才能执行job,sqoop 规定密码文件必须存放在 HDFS 上,并且权限必须是 400。 并且检查 sqoop 的 sqoop-site.xml 是否存在如下配置:

<property> <name>sqoop.metastore.client.record.password</name> <value>true</value> <description>If true, allow saved passwords in the metastore. </description> </property> bin/sqoop job --create myjob1 -- import --connect jdbc:mysql://cdh-1:3306/userdb \ --username root \ --password-file /input/sqoop/pwd/mymysql.pwd \ --target-dir /sqoopresult333 \ --table emp --m 1

ob

### 5 . 执行 job ‘--exec’ 选项用于执行保存的作业。下面的命令用于执行保存的作业称为 myjob。 ```shell bin/sqoop job --exec myjob

6 . 免密执行 job

sqoop 在创建 job 时,使用–password-file 参数,可以避免输入 mysql 密码, 如果使用–password将出现警告,并且每次都要手动输入密码才能执行job,sqoop 规定密码文件必须存放在 HDFS 上,并且权限必须是 400。 并且检查 sqoop 的 sqoop-site.xml 是否存在如下配置:

<property> <name>sqoop.metastore.client.record.password</name> <value>true</value> <description>If true, allow saved passwords in the metastore. </description> </property> bin/sqoop job --create myjob1 -- import --connect jdbc:mysql://cdh-1:3306/userdb \ --username root \ --password-file /input/sqoop/pwd/mymysql.pwd \ --target-dir /sqoopresult333 \ --table emp --m 1

最新回复(0)