说明:以下文档说到elasticsearch都简称es。
logstash同步数据不仅仅是数据库到es,它只是一个同步中间件,数据来源和数据的目标存储都是可以在配置里面指定的,根据数据来源和目标存储的不同配合logstash提供的不同插件。
本文讲解的是从mysql 同步到es实现方式。使用的版本logstash版本是6.6.1,es版本是6.5.4,logstash版本和es版本一定要配套,如果2者版本差距过大,同步过程中会报版本错误。
写这篇文章的时候es和Logstash最新版本是7.2。
1、logstash是一款开源的数据同步工具,还是比较优秀的。其他几款同步es数据的工具,比如elasticseach-jdbc等都试过,好久没人维护了。
官方文档访问地址:https://www.elastic.co/guide/en/logstash/current/index.html
2、es是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。
官方说明文档访问地址:https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
3、logstash和es的安装
参考官方文档。
4、mysql同步到es的logstash配置文件,命名一个logstash-mysql-es.conf配置文件,其中ip是mysql数据库对应的ip,es_ip是elasticsearch对应的ip内容如下:
input { jdbc { type1 => "kl_carousel_info" jdbc_connection_string2 => "jdbc:mysql://ip:3306/db_name?useUnicode=true&characterEncoding=utf-8&useSSL=false" jdbc_user3 => "root" jdbc_password4 => "123456" jdbc_driver_library5 => "./config/mysql-connector-java-5.1.38.jar" jdbc_driver_class6 => "com.mysql.jdbc.Driver" jdbc_paging_enabled7 => true jdbc_fetch_size8 => 100 jdbc_page_size9 => 100000 jdbc_default_timezone10 =>"Asia/Shanghai" statement11 => "select * from kl_carousel_info where createtime >= :sql_last_value order by createtime asc" schedule12 => "*/3 * * * *" record_last_run13 => true use_column_value14 => true tracking_column15 => "createtime" tracking_column_type16 => "numeric" last_run_metadata_path17 => "./id/logstash_kl_carousel_info_last_id" clean_run18 => false lowercase_column_names19 => false } jdbc { type => "kl_knowledge_article" jdbc_connection_string => "jdbc:mysql://ip:3306/db_name?useUnicode=true&characterEncoding=utf-8&useSSL=false" jdbc_user => "root" jdbc_password => "123456" jdbc_driver_library => "./config/mysql-connector-java-5.1.38.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => true jdbc_fetch_size => 100 jdbc_page_size => 100000 jdbc_default_timezone =>"Asia/Shanghai" statement => "SELECT 'kl_infomation_article' table_name,a.`title` m_title,a.pinyin,a.`type` m_type,a.`knowledge_type_id`,a.`knowledge_type_name`,a.`tags`,a.status,a.`click_rate`, a.id m_id,c.`organization_id`,c.`organization_name`,b.`id`,b.`masterid`,b.`content`,b.`accessory_id`,b.`accessory_path`,b.`state`,b.`sort`,b.`updatetime`,b.`createtime` FROM `kl_infomation_article` b INNER JOIN kl_infomation_master a on a.id=b.masterid INNER JOIN `kl_master_to_organization` c ON a.`id`=c.`master_id` WHERE b.`state`='1' AND a.`state`='1' AND c.`state`='1' AND b.updatetime >= :sql_last_value order by b.updatetime asc" schedule => "*/3 * * * *" record_last_run => true use_column_value => true tracking_column => "updatetime" tracking_column_type => "numeric" last_run_metadata_path => "./id/logstash_kl_knowledge_article_last_id" clean_run => false lowercase_column_names => false } jdbc { type => "kl_knowledge_document" jdbc_connection_string => "jdbc:mysql://ip:3306/db_name?useUnicode=true&characterEncoding=utf-8&useSSL=false" jdbc_user => "root" jdbc_password => "123456" jdbc_driver_library => "./config/mysql-connector-java-5.1.38.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => true jdbc_fetch_size => 100 jdbc_page_size => 100000 jdbc_default_timezone =>"Asia/Shanghai" statement => "SELECT 'kl_infomation_document' table_name,a.`title` m_title,a.pinyin,a.`type` m_type,a.`knowledge_type_id`,a.`knowledge_type_name`,a.`tags`,a.status,a.`click_rate`,a.id m_id,c.`organization_id`,c.`organization_name`, b.* from `kl_infomation_document` b INNER JOIN kl_infomation_master a on a.id=b.master_id INNER JOIN `kl_master_to_organization` c ON a.`id`=c.`master_id` WHERE b.`state`='1' AND a.`state`='1' AND c.`state`='1' AND b.updatetime >= :sql_last_value order by b.updatetime asc" schedule => "*/3 * * * *" record_last_run => true use_column_value => true tracking_column => "updatetime" tracking_column_type => "numeric" last_run_metadata_path => "./id/logstash_kl_knowledge_document_last_id" clean_run => false lowercase_column_names => false } } output { if[type]=="kl_carousel_info"20{ elasticsearch { hosts21 => "es_ip:9200" index22 => "kl_carousel_info" document_type23 => "doc" document_id24 => "%{id}" manage_template25 => true template_overwrite26 => true template_name27 => "kl_carousel_info" template28 => "/opt/elasticsearch/logstash-6.6.1/template/kl_carousel_info_logstash.json" } } if[type]=="kl_knowledge_article"{ elasticsearch { hosts => "es_ip:9200" index => "kl_knowledge" document_type => "doc" document_id => "%{id}" manage_template => true template_overwrite => true template_name => "kl_knowledge" template => "/opt/elasticsearch/logstash-6.6.1/template/kl_knowledge_logstash.json" } } if[type]=="kl_knowledge_document"{ elasticsearch { hosts => "es_ip:9200" index => "kl_knowledge" document_type => "doc" document_id => "%{id}" manage_template => true template_overwrite => true template_name => "kl_knowledge" template => "/opt/elasticsearch/logstash-6.6.1/template/kl_knowledge_logstash.json" } } stdout { codec => json_lines } } logstash配置文件说明: 【1】——定义的类型名称,与output中上标【20】对应,说明那个输入到哪个输出类型。 【2】——mysql的jdbc连接串。 【3】,【4】——mysql的用户名和密码。 【5】,【6】——mysql的驱动包与驱动名名称配置。 【7】,【8】,【9】——mysql查询时的分页设置。 【10】——时区设置。 【11】——同步时的查询sql语句,如果是数值型一定要顺序排序。 【12】——定时同步频率设置,以上例子设置的是3分钟同步一次,默认是1分钟。 【13】,【14】,【15】,【16】——【14】使用列的字段做增量判断,否则默认是当前时间【15】增量同步时用于判断的字段名称,【16】字段类类型,以上例子是时间字段的长整型类型,所以是numeric 【17】——用于记录上一次同步时标记字段最后同步一行的值的文件路径,比如同步最后一行createtime值是1559198274577,那么这个值会记录到文件logstash_kl_carousel_info_last_id中,下次同步时仅仅同步createtime大于这个值的数据。 【18】——是否每次同步都清理,最好为false,否则每次同步都是全量同步 【19】——列字段是否都转为小写名称 【20】——与【1】一一对应 【21】——同步插入或者更新到es的目标地址 【22】——es的索引名称 【23】——es的文档类型名称,6.x版本都可以是一个索引对应多个文档类型的,但是不建议这么做,后续es版本只支持一个索引对应一个文档类型。 【24】——es中文档的id,例子中使用表记录的id作为文档的id 【25】——这个如果使用自己配置的模板,必须配置为true 【27】——模板名称,与定义的模板名称对应 【28】—— 使用自定义模板的文件路径,模板用于创建es的索引,决定了索引创建的方式 5、模板的定义 模板是在第一次同步数据时用于创建es索引用的,由es来加载模板。 { "template": "kl_*"1, "order" : 12, "settings": { "index.number_of_shards": 5, "number_of_replicas": 1, "index.refresh_interval": "60s", "analysis":{ "analyzer":{ "pinyin_smart"3:{ "type":"custom", "tokenizer":"ik_smart"4, "char_filter": ["html_strip"]5, "filter":[ "my_pinyin"6 ] } }, "filter":{ "my_pinyin"7:{ "type":"pinyin"8, "keep_separate_first_letter" : false, "keep_full_pinyin" : true, "keep_original" : true, "limit_first_letter_length" : 16, "lowercase" : false, "remove_duplicated_term" : true } } } }, "mappings": { "doc": { "dynamic_templates"9: [ { "string_fields" : { "match" : "*"10, "match_mapping_type" : "string"11, "mapping" : { "analyzer":"pinyin_smart"12, "type" : "text"13, "fields": { "raw": { "type": "keyword", "ignore_above": 256 } } } } } ], "properties"14: { "@timestamp":{ "type": "date" }, "id": { "type": "keyword" }, "m_id": { "type": "keyword" }, "masterid": { "type": "keyword" }, "master_id": { "type": "keyword" }, "accessory_id": { "type": "keyword" }, "parent_id": { "type": "keyword" }, "knowledge_type_id": { "type": "keyword" }, "organization_id": { "type": "keyword" }, "table_name": { "type": "keyword" }, "m_type": { "type": "keyword" }, "m_title"15: { "type": "text", "norms": true16, "analyzer":"ik_smart"17, "search_analyzer":"ik_smart"18, "fields": { "pinyin"19: { "type": "text", "analyzer": "pinyin_smart" }, "raw"20: { "ignore_above":25621, "type": "keyword"22 } } }, "title": { "type": "text", "norms": true, "analyzer":"ik_smart", "search_analyzer":"ik_smart", "fields": { "pinyin": { "type": "text", "analyzer": "pinyin_smart" }, "raw": { "ignore_above":256, "type": "keyword" } } }, "img_title": { "type": "text", "norms": true, "analyzer":"ik_smart", "search_analyzer":"ik_smart", "fields": { "pinyin": { "type": "text", "analyzer": "pinyin_smart" }, "raw": { "ignore_above":256, "type": "keyword" } } }, "content": { "type": "text", "norms": true, "analyzer":"ik_smart", "search_analyzer":"ik_smart", "fields": { "pinyin": { "type": "text", "analyzer": "pinyin_smart" }, "raw": { "ignore_above":256, "type": "keyword" } } } } } } } es模板的配置文件说明: 【1】——模板名称,这里"kl_*"通配以“kl_”开头的模板名称,与logstash配置的【27】对应。 【2】——模板的优先顺序,如果应用了多个模板则决定合并的优先顺序,值越大优先级越高,es自带的默认模板是0,所以自定义模板最好设置为>0的值。 【3】——分词分析器名称。 【4】——使用了ik分词器的ik_smart方式分词,ik分词器插件需要在es里面安装并且重启才能生效。 【5】——分词时跳过HTML标签,HTML不要去分词。 【6】——拼音分词过滤,支持拼音分词+ik中文分词混合,拼音分词与ik。 【7】——过滤器定义。 【8】——拼音分词器。 【9】——动态模板,没有在。 【10】——匹配字段,*代表匹配任何没有在【14】里面定义字段。 【11】——匹配字段的类型,这里是字符串类型。 【12】——使用自定义的分析器名称。 【13】——对应es的字段类型。 ,【14】——属性字段索引的方式定义。 【15】——字段的名称。 【16】——是否支持打分,如果要打分,一定要设置为true 【17】——使用ik分词器的ik_smart方式分词。 【18】——指定es查询是使用ik分词器的ik_smart方式分词。最好与插入是分词器保持一致,否则搜索结果可能不符合预期。 【19】——定义拼音分词嵌套字段,这里自定义名称为"pinyin" 【20】——定义keyword的嵌套子字段,方便like查询,注意:如果需要支持like查询一定需要定义keyword类型字段,否则可能查询不到,因为被分词了,但是keyword类型最大支持32766个字符或者10922个汉字。 【21】——定义keyword类型在字段长度超过多少个字符时忽略,这里是超过256忽略,这个值定义过大对性能会有显著影响,查询速度会变慢。 【22】——keyword类型。 6、手动删除es索引 比如我要删除索引以"kl_"开头的索引, 在linux下可以执行命令:curl -XDELETE -u elastic:changeme http://localhost:9200/kl_* 7、删除模板 这里踏过一个大坑,在第一次启动logstash,执行模板加载,新建es索引后,后面修改了模板,但是索引并没有生效,原因就是需要删除es里面默认的模板,估计是模板合并导致的, 删除模板语句命令:curl -XDELETE 'http://127.0.0.1:9200/_template/*' 8、启动(开始)同步(Linux) cd {lostash_home}, 然后执行命令:nohup bin/logstash -f config/logstash-mysql-es.conf > logs/logstash.out 2>&1 & 相关文章:ik与拼音分词器,拓展热词/停止词库 技术合作: qq:281414283 微信:so-so-life
转载于:https://www.cnblogs.com/javato/p/11139924.html