使用logstash同步mysql数据到elasticsearch

it2022-05-05  117

说明:以下文档说到elasticsearch都简称es。

           logstash同步数据不仅仅是数据库到es,它只是一个同步中间件,数据来源和数据的目标存储都是可以在配置里面指定的,根据数据来源和目标存储的不同配合logstash提供的不同插件。

           本文讲解的是从mysql 同步到es实现方式。使用的版本logstash版本是6.6.1,es版本是6.5.4,logstash版本和es版本一定要配套,如果2者版本差距过大,同步过程中会报版本错误。

          写这篇文章的时候es和Logstash最新版本是7.2。

 

 

1、logstash是一款开源的数据同步工具,还是比较优秀的。其他几款同步es数据的工具,比如elasticseach-jdbc等都试过,好久没人维护了。

     官方文档访问地址:https://www.elastic.co/guide/en/logstash/current/index.html

2、es是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。

     官方说明文档访问地址:https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html

3、logstash和es的安装

     参考官方文档。

4、mysql同步到es的logstash配置文件,命名一个logstash-mysql-es.conf配置文件,其中ip是mysql数据库对应的ip,es_ip是elasticsearch对应的ip内容如下:

 

input {   jdbc {     type1 => "kl_carousel_info"     jdbc_connection_string2 => "jdbc:mysql://ip:3306/db_name?useUnicode=true&characterEncoding=utf-8&useSSL=false"     jdbc_user3 => "root"     jdbc_password4 => "123456"     jdbc_driver_library5 => "./config/mysql-connector-java-5.1.38.jar"     jdbc_driver_class6 => "com.mysql.jdbc.Driver"     jdbc_paging_enabled7 => true     jdbc_fetch_size8 => 100     jdbc_page_size9 => 100000     jdbc_default_timezone10 =>"Asia/Shanghai"     statement11 => "select * from kl_carousel_info where createtime >= :sql_last_value order by createtime asc"     schedule12 => "*/3 * * * *"     record_last_run13 => true     use_column_value14 => true     tracking_column15 => "createtime"     tracking_column_type16 => "numeric"     last_run_metadata_path17 => "./id/logstash_kl_carousel_info_last_id"     clean_run18 => false     lowercase_column_names19 => false   }   jdbc {     type => "kl_knowledge_article"     jdbc_connection_string => "jdbc:mysql://ip:3306/db_name?useUnicode=true&characterEncoding=utf-8&useSSL=false"     jdbc_user => "root"     jdbc_password => "123456"     jdbc_driver_library => "./config/mysql-connector-java-5.1.38.jar"     jdbc_driver_class => "com.mysql.jdbc.Driver"     jdbc_paging_enabled => true  jdbc_fetch_size => 100     jdbc_page_size => 100000     jdbc_default_timezone =>"Asia/Shanghai"     statement => "SELECT 'kl_infomation_article' table_name,a.`title` m_title,a.pinyin,a.`type` m_type,a.`knowledge_type_id`,a.`knowledge_type_name`,a.`tags`,a.status,a.`click_rate`, a.id m_id,c.`organization_id`,c.`organization_name`,b.`id`,b.`masterid`,b.`content`,b.`accessory_id`,b.`accessory_path`,b.`state`,b.`sort`,b.`updatetime`,b.`createtime` FROM `kl_infomation_article` b INNER JOIN kl_infomation_master a on a.id=b.masterid INNER JOIN `kl_master_to_organization` c ON a.`id`=c.`master_id` WHERE b.`state`='1' AND a.`state`='1' AND c.`state`='1' AND b.updatetime >= :sql_last_value order by b.updatetime asc"     schedule => "*/3 * * * *"     record_last_run => true     use_column_value => true     tracking_column => "updatetime"     tracking_column_type => "numeric"     last_run_metadata_path => "./id/logstash_kl_knowledge_article_last_id"     clean_run => false     lowercase_column_names => false   }   jdbc {     type => "kl_knowledge_document"     jdbc_connection_string => "jdbc:mysql://ip:3306/db_name?useUnicode=true&characterEncoding=utf-8&useSSL=false"     jdbc_user => "root"     jdbc_password => "123456"     jdbc_driver_library => "./config/mysql-connector-java-5.1.38.jar"     jdbc_driver_class => "com.mysql.jdbc.Driver"     jdbc_paging_enabled => true  jdbc_fetch_size => 100     jdbc_page_size => 100000     jdbc_default_timezone =>"Asia/Shanghai"     statement => "SELECT 'kl_infomation_document' table_name,a.`title` m_title,a.pinyin,a.`type` m_type,a.`knowledge_type_id`,a.`knowledge_type_name`,a.`tags`,a.status,a.`click_rate`,a.id m_id,c.`organization_id`,c.`organization_name`, b.* from `kl_infomation_document` b INNER JOIN kl_infomation_master a  on a.id=b.master_id INNER JOIN `kl_master_to_organization` c ON a.`id`=c.`master_id`  WHERE b.`state`='1' AND a.`state`='1' AND c.`state`='1' AND b.updatetime >= :sql_last_value order by b.updatetime asc"     schedule => "*/3 * * * *"     record_last_run => true     use_column_value => true     tracking_column => "updatetime"     tracking_column_type => "numeric"     last_run_metadata_path => "./id/logstash_kl_knowledge_document_last_id"     clean_run => false     lowercase_column_names => false   } } output {   if[type]=="kl_carousel_info"20{    elasticsearch {   hosts21 => "es_ip:9200"   index22 => "kl_carousel_info"   document_type23 => "doc"   document_id24 => "%{id}"   manage_template25 => true   template_overwrite26 => true   template_name27 => "kl_carousel_info"   template28 => "/opt/elasticsearch/logstash-6.6.1/template/kl_carousel_info_logstash.json"    }   }   if[type]=="kl_knowledge_article"{       elasticsearch {   hosts => "es_ip:9200"   index => "kl_knowledge"   document_type => "doc"   document_id => "%{id}"   manage_template => true   template_overwrite => true   template_name => "kl_knowledge"   template => "/opt/elasticsearch/logstash-6.6.1/template/kl_knowledge_logstash.json"    }   }   if[type]=="kl_knowledge_document"{       elasticsearch {   hosts => "es_ip:9200"   index => "kl_knowledge"   document_type => "doc"   document_id => "%{id}"   manage_template => true   template_overwrite => true   template_name => "kl_knowledge"   template => "/opt/elasticsearch/logstash-6.6.1/template/kl_knowledge_logstash.json"    }   }   stdout {       codec => json_lines   } }     logstash配置文件说明:     【1】——定义的类型名称,与output中上标【20】对应,说明那个输入到哪个输出类型。     【2】——mysql的jdbc连接串。     【3】,【4】——mysql的用户名和密码。     【5】,【6】——mysql的驱动包与驱动名名称配置。     【7】,【8】,【9】——mysql查询时的分页设置。     【10】——时区设置。     【11】——同步时的查询sql语句,如果是数值型一定要顺序排序。     【12】——定时同步频率设置,以上例子设置的是3分钟同步一次,默认是1分钟。     【13】,【14】,【15】,【16】——【14】使用列的字段做增量判断,否则默认是当前时间【15】增量同步时用于判断的字段名称,【16】字段类类型,以上例子是时间字段的长整型类型,所以是numeric     【17】——用于记录上一次同步时标记字段最后同步一行的值的文件路径,比如同步最后一行createtime值是1559198274577,那么这个值会记录到文件logstash_kl_carousel_info_last_id中,下次同步时仅仅同步createtime大于这个值的数据。     【18】——是否每次同步都清理,最好为false,否则每次同步都是全量同步     【19】——列字段是否都转为小写名称     【20】——与【1】一一对应     【21】——同步插入或者更新到es的目标地址     【22】——es的索引名称     【23】——es的文档类型名称,6.x版本都可以是一个索引对应多个文档类型的,但是不建议这么做,后续es版本只支持一个索引对应一个文档类型。     【24】——es中文档的id,例子中使用表记录的id作为文档的id     【25】——这个如果使用自己配置的模板,必须配置为true     【27】——模板名称,与定义的模板名称对应     【28】—— 使用自定义模板的文件路径,模板用于创建es的索引,决定了索引创建的方式 5、模板的定义      模板是在第一次同步数据时用于创建es索引用的,由es来加载模板。      {       "template": "kl_*"1     "order" : 12     "settings": {               "index.number_of_shards": 5,               "number_of_replicas": 1,               "index.refresh_interval": "60s",         "analysis":{             "analyzer":{                 "pinyin_smart"3:{                     "type":"custom",                     "tokenizer":"ik_smart"4,                     "char_filter": ["html_strip"]5,                     "filter":[                         "my_pinyin"6                     ]                 }             },             "filter":{                 "my_pinyin"7:{                     "type":"pinyin"8,                     "keep_separate_first_letter" : false,                     "keep_full_pinyin" : true,                     "keep_original" : true,                     "limit_first_letter_length" : 16,                     "lowercase" : false,                     "remove_duplicated_term" : true                 }             }         }       },        "mappings":     {                 "doc": {       "dynamic_templates"9: [ {                     "string_fields" : {                     "match" : "*"10,                     "match_mapping_type" : "string"11,                     "mapping" : {                         "analyzer":"pinyin_smart"12,                         "type" : "text"13,                        "fields": {                            "raw": {                                "type":  "keyword",                                "ignore_above": 256                             }                         }                     }                     }                }        ],       "properties"14: {                                "@timestamp":{                                        "type": "date"                                },     "id": {        "type": "keyword"        },     "m_id": {        "type": "keyword"        },     "masterid": {        "type": "keyword"        },     "master_id": {        "type": "keyword"        },     "accessory_id": {        "type": "keyword"        },     "parent_id": {        "type": "keyword"        },     "knowledge_type_id": {        "type": "keyword"        },     "organization_id": {        "type": "keyword"        },     "table_name": {        "type": "keyword"        },     "m_type": {        "type": "keyword"        },     "m_title"15: {        "type": "text",        "norms": true16,        "analyzer":"ik_smart"17,        "search_analyzer":"ik_smart"18,        "fields": {            "pinyin"19: {                "type":     "text",                "analyzer": "pinyin_smart"             },            "raw"20: {                "ignore_above":25621,                "type":  "keyword"22           }        }        },     "title": {        "type": "text",        "norms": true,        "analyzer":"ik_smart",        "search_analyzer":"ik_smart",        "fields": {       "pinyin": {         "type":     "text",         "analyzer": "pinyin_smart"       },       "raw": {           "ignore_above":256,        "type":  "keyword"       }        }        },                 "img_title": {        "type": "text",        "norms": true,        "analyzer":"ik_smart",        "search_analyzer":"ik_smart",        "fields": {       "pinyin": {         "type":     "text",         "analyzer": "pinyin_smart"       },       "raw": {           "ignore_above":256,        "type":  "keyword"       }        }        },        "content": {        "type": "text",        "norms": true,        "analyzer":"ik_smart",        "search_analyzer":"ik_smart",        "fields": {       "pinyin": {         "type":     "text",         "analyzer": "pinyin_smart"       },       "raw": {           "ignore_above":256,        "type":  "keyword"               }        }        }                      }              }        } } es模板的配置文件说明:     【1】——模板名称,这里"kl_*"通配以“kl_”开头的模板名称,与logstash配置的【27】对应。     【2】——模板的优先顺序,如果应用了多个模板则决定合并的优先顺序,值越大优先级越高,es自带的默认模板是0,所以自定义模板最好设置为>0的值。     【3】——分词分析器名称。     【4】——使用了ik分词器的ik_smart方式分词,ik分词器插件需要在es里面安装并且重启才能生效。     【5】——分词时跳过HTML标签,HTML不要去分词。     【6】——拼音分词过滤,支持拼音分词+ik中文分词混合,拼音分词与ik。     【7】——过滤器定义。     【8】——拼音分词器。     【9】——动态模板,没有在。     【10】——匹配字段,*代表匹配任何没有在【14】里面定义字段。     【11】——匹配字段的类型,这里是字符串类型。     【12】——使用自定义的分析器名称。     【13】——对应es的字段类型。    ,【14】——属性字段索引的方式定义。     【15】——字段的名称。     【16】——是否支持打分,如果要打分,一定要设置为true     【17】——使用ik分词器的ik_smart方式分词。     【18】——指定es查询是使用ik分词器的ik_smart方式分词。最好与插入是分词器保持一致,否则搜索结果可能不符合预期。     【19】——定义拼音分词嵌套字段,这里自定义名称为"pinyin"     【20】——定义keyword的嵌套子字段,方便like查询,注意:如果需要支持like查询一定需要定义keyword类型字段,否则可能查询不到,因为被分词了,但是keyword类型最大支持32766个字符或者10922个汉字。     【21】——定义keyword类型在字段长度超过多少个字符时忽略,这里是超过256忽略,这个值定义过大对性能会有显著影响,查询速度会变慢。     【22】——keyword类型。       6、手动删除es索引          比如我要删除索引以"kl_"开头的索引, 在linux下可以执行命令:curl -XDELETE -u elastic:changeme http://localhost:9200/kl_*     7、删除模板          这里踏过一个大坑,在第一次启动logstash,执行模板加载,新建es索引后,后面修改了模板,但是索引并没有生效,原因就是需要删除es里面默认的模板,估计是模板合并导致的,         删除模板语句命令:curl -XDELETE 'http://127.0.0.1:9200/_template/*'  8、启动(开始)同步(Linux)    cd  {lostash_home},    然后执行命令:nohup bin/logstash -f config/logstash-mysql-es.conf > logs/logstash.out 2>&1 &   相关文章:ik与拼音分词器,拓展热词/停止词库   技术合作:         qq:281414283         微信:so-so-life                                                              

     

转载于:https://www.cnblogs.com/javato/p/11139924.html


最新回复(0)