利用datax数据同步工具将MySLQ的数据导入到Hbase数据库-----------全套流程

it2022-05-05  112

参考文档:https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md

第一:MySQL数据库中的表的结构如下: DROP TABLE IF EXISTS mhl_device; CREATE TABLE mhl_device ( 0 id bigint(20) NOT NULL, 1 update_time datetime DEFAULT NULL COMMENT ‘更新时间’, 2 create_time datetime DEFAULT NULL COMMENT ‘创建时间’, 3 mac_address varchar(255) DEFAULT NULL, 4 cpu_type varchar(255) DEFAULT NULL, 5 serial_number varchar(100) DEFAULT NULL COMMENT ‘序列号’, 6 firmware_version varchar(50) DEFAULT NULL COMMENT ‘固件版本’, 7 system_version varchar(50) DEFAULT NULL COMMENT ‘系统版本’, 8 registration_id varchar(100) DEFAULT NULL COMMENT ‘极光推送标记id’, 9 bluetooth_address varchar(255) DEFAULT NULL COMMENT ‘蓝牙地址’, 10 wifi_address varchar(255) DEFAULT NULL COMMENT ‘wifi 地址’, 11 android_id varchar(255) DEFAULT NULL COMMENT ‘android_id’, 12 status int(5) DEFAULT NULL COMMENT ‘状态 0:正常;1:故障;2睡眠’, 13 merchant_id bigint(20) DEFAULT NULL COMMENT ‘门店id’, 14 activate_status int(5) DEFAULT NULL COMMENT ‘是否激活 0:未激活;1已激活’, 15 activate_user_id bigint(20) DEFAULT NULL COMMENT ‘负责人id’, 16 cumulative_online_time bigint(20) DEFAULT ‘0’, 17 online_time bigint(20) DEFAULT ‘0’, 18 batch varchar(20) DEFAULT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

注意:如果使用的Mysql库在大数据集群上,则直接jdbc,如果使用的Mysql是业务库(比如:Java业务的后端mysql库),如果业务库不在你的大数据集群上,那么就要去找运维的去要这个业务库的jdbc的地址,它会给你一个相同域下的jdbc地址,并且这个业务库不会给你频繁访问,所以一般都是运维给的这个库的备份库。

第二:Hbase中的建表语句:create ‘mhl_device_bigdata’,‘mhl’

第三:Datax的配置json文件:取名字:mhl_device_bigdata.json

注意重点: 配置文件中rowkey的设计:一般都会要有一个唯一的字段的记录每次都不一样来和唯一标识的字段来一起设计,比如:如果你的msyql中有1500条记录,那么假如你的hbase中rowkey设置为唯一标识的id,那么如果mysql中有相同的id,那么就有可能在导入hbase的时候出现id相同的数据会进行插入数据的时候会进行数据的更新而导致数据的记录数减少,所以不能只用一个唯一表示id来设计rowkey,当我们有唯一标识id和每次唯一的字段的记录每次不一样比喻时间,那么在数据插入hbase的时候,每次rowkey不一样就不会进行插入数据的更新记录数就不会少。

{ “job”: { “setting”: { “speed”: { “channel”: 1 } }, “content”: [ { “reader”: { “name”: “mysqlreader”, “parameter”: { “column”: [ “id”, “update_time”, “create_time”, “mac_address”, “cpu_type”, “serial_number”, “firmware_version”, “system_version”, “registration_id”, “bluetooth_address”, “wifi_address”, “android_id”, “status”, “merchant_id”, “activate_status”, “activate_user_id”, “cumulative_online_time”, “online_time”, “batch” ], “connection”: [ { “jdbcUrl”: [“jdbc:mysql://172.20.2.40:3306/mhl_advert”], “table”: [“mhl_device”] } ], “password”: “mhlslave12*”, “username”: “user_big”, “where”: “” } }, “writer”: { “name”: “hbase11xwriter”, “parameter”: { “hbaseConfig”: { “hbase.zookeeper.quorum”: “data-1:2181,data-2:2181,data-3:2181” }, “table”: “mhl_device_bigdata”, “mode”: “normal”, “rowkeyColumn”: [ #这里的rowkey设计是以:我们的id和create_time(创建时间)来组合作为rowkey中间用“”来连接 { “index”:0, “type”:“string” }, { “index”:-1, “type”:“string”, “value”:"" }, { “index”:2, “type”:“string” } ], “column”: [ { “index”:1, “name”: “mhl:update_time”, “type”: “string” }, { “index”:2, “name”: “mhl:create_time”, “type”: “string” }, { “index”:3, “name”: “mhl:mac_address”, “type”: “string” }, { “index”:4, “name”: “mhl:cpu_type”, “type”: “string” }, { “index”:5, “name”: “mhl:serial_number”, “type”: “string” }, { “index”:6, “name”: “mhl:firmware_version”, “type”: “string” }, { “index”:7, “name”: “mhl:system_version”, “type”: “string” }, { “index”:8, “name”: “mhl:registration_id”, “type”: “string” }, { “index”:9, “name”: “mhl:bluetooth_address”, “type”: “string” }, { “index”:10, “name”: “mhl:wifi_address”, “type”: “string” }, { “index”:11, “name”: “mhl:android_id”, “type”: “string” }, { “index”:12, “name”: “mhl:status”, “type”: “int” }, { “index”:13, “name”: “mhl:merchant_id”, “type”: “long” }, { “index”:14, “name”: “mhl:activate_status”, “type”: “int” }, { “index”:15, “name”: “mhl:activate_user_id”, “type”: “long” }, { “index”:16, “name”: “mhl:cumulative_online_time”, “type”: “long” }, { “index”:17, “name”: “mhl:online_time”, “type”: “long” }, { “index”:18, “name”: “mhl:batch”, “type”: “string” }, ], “versionColumn”:{ “index”: -1, “value”:“123456789” }, “encoding”: “utf-8” } } } ] } }

注意:这里用的是全量导入,所以where设置为:" ",如果要增量来导入则可以到reader中来使用where,使用如下: “where”: "create_time >${create_time}”

任务的调度部分:这里使用crontab命令来调度,如下:

(1)shell脚本: #!/bin/bash #注意:source /etc/profile表示你给的当前环境,因为我们要用到crontab,避免环境问题,所以先给出 source /etc/profile python /data2/shuairui/datax/datax/bin/datax.py /data2/shuairui/mhl_device_bigdata/mhl_device_bigdata.json

(2)crontab调度命令:crontab -e 设置每5分钟调度一次 */5 * * * * /bin/sh /data2/shuairui/mhl_device_bigdata/mhl_device_bigdata.sh >> /data2/shuairui/mhl_device_bigdata/mhl_device_bigdata_log 2>&1

注:这样datax的数据传输的日志就可以在mhl_device_bigdata_log中去查看

最后需要注意的是: 下面是我使用的json文件,有两点需要注意 1、我在 where 使用了sql 语句 create_time > FROM_UNIXTIME( c r e a t e t i m e ) a n d c r e a t e t i m e < F R O M U N I X T I M E ( {create_time}) and create_time < FROM_UNIXTIME( createtime)andcreatetime<FROMUNIXTIME({end_time}) ,其中FROM_UNIXTIME()是mysql时间戳转换为时间格式的函数,${name}是datax提供的占位符后面会使用到,而我们这里的mysql库中的create_time是已经格式化了。 2、reader中连接字符串添加了useUnicode=true&characterEncoding=utf8 ,因为没有加这个导入到目标数据库中文乱码了,虽然我两边的数据库都是utf8mb4格式的,比如: jdbc:mysql://172.20.2.40:3306/mhl_advert?useUnicode=true&characterEncoding=utf8。

下面的例子是参考其他博主的:从Hbase的数据导入到MySQL数据库中:

参考文档地址:https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md

1 准备配置文件hbase2mysql.json (可以看出,从hbase中拿出group1列簇中拿出2个字段,按照顺序对应下面mysql字段) { “job”: { “content”: [ { “reader”: { “name”: “hbase11xreader”, “parameter”: { “hbaseConfig”: { “hbase.zookeeper.quorum”: “linux01:2181,linux02:2181,linux03:2181” }, “table”: “mysql2hbase”, “encoding”: “utf-8”, “mode”: “normal”, “column”: [ { “name”: “group1: id”, “type”: “string” }, { “name”: “group1: name”, “type”: “string” } ], “range”: { “startRowkey”: “”, “endRowkey”: “”, “isBinaryRowkey”: true } } }, “writer”: { “name”: “mysqlwriter”, “parameter”: { “column”: [“id”,“username”], “connection”: [ { “jdbcUrl”: “jdbc:mysql://192.168.43.20:3306/test”, “table”: [“target_user”] } ], “password”: “111111”, “preSql”: [], “session”: [], “username”: “root”, “writeMode”: “insert” } } } ], “setting”: { “speed”: { “channel”: “1” } } } }


最新回复(0)