草庐IT

Elasticsearch:将关系数据库中的数据提取到 Elasticsearch 集群中

Elastic 中国社区官方博客 2023-11-27 原文

本指南介绍了如何使用 Logstash JDBC 输入插件通过 Logstash 将关系数据库中的数据提取到 Elasticsearch 集群中。 它演示了如何使用 Logstash 高效地复制记录并从关系数据库接收更新,然后将它们发送到 Elasticsearch 中。

此处提供的代码和方法已经过 MySQL 测试。 他们应该也适用于其他关系数据库。

Logstash Java 数据库连接 (JDBC) 输入插件使你能够从许多流行的关系数据库(包括 MySQL 和 Postgres)中提取数据。 从概念上讲,JDBC 输入插件运行一个循环,该循环定期轮询关系数据库以查找自该循环的最后一次迭代以来插入或修改的记录。

在今天的展示中,我将使用最新的 Elastic Stack 8.5.0 来进行演示。

将关系数据库中的数据提取到 Elasticsearch 集群中

安装

Elastic Stack

如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参阅我之前的文章:

在安装时,请参考最新的 Elastic Stack 8.x 的指南来进行安装。在默认的情况下,Elasticsearch 的访问是需要 HTTPS 的。

MySQL

对于本教程,你需要一个供 Logstash 读取的源 MySQL 实例。 MySQL Community Downloads 站点的 MySQL Community Server 部分提供了免费版本的 MySQL。我们可以通过如下的命令来登录 MySQL:

在上面我们需要输入用户 root 的密码进行登录。

下载 MySQL JDBC 驱动

Logstash JDBC 输入插件不包含任何数据库连接驱动程序。你需要用于关系数据库的 JDBC 驱动程序才能执行后面即将描述的步骤。从 MySQL 社区下载站点的 Connector/J 部分下载并解压缩 MySQL 的 JDBC 驱动程序。根据后续步骤的需要记下驱动程序的位置。

准备源 MySQL 数据库

我们来看一个简单的数据库,你将从中导入数据并将其发送到 Elasticsearch 集群中。 此示例使用带有时间戳记录的 MySQL 数据库。 时间戳使你能够轻松确定自最近一次数据传输到 Elasticsearch 以来数据库中发生了什么变化。

考虑数据库结构和设计

对于这个例子,让我们创建一个新的数据库 es_db 和表 es_table,作为我们的 Elasticsearch 数据的来源。

1)运行以下 SQL 语句以生成具有三个列表的新 MySQL 数据库:

CREATE DATABASE es_database;
USE es_database;
DROP TABLE IF EXISTS es_table;
CREATE TABLE es_table (
  id BIGINT(20) UNSIGNED NOT NULL,
  PRIMARY KEY (id),
  UNIQUE KEY unique_id (id),
  client_name VARCHAR(32) NOT NULL,
  modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

让我们探讨一下这个 SQL 片段中的关键概念:

条目描述
es_table存储数据的表的名称。
id记录的唯一标识符。 id 被定义为 PRIMARY KEY 和 UNIQUE KEY 来保证每个 id 在当前表中只出现一次。 这被转换为 _id 用于更新文档或将文档插入 Elasticsearch。
client_name最终将被引入 Elasticsearch 的数据。 为简单起见,此示例仅包含一个数据字段。
modification_time插入或最后更新记录的时间戳。 此外,你可以使用此时间戳来确定自上次数据传输到 Elasticsearch 以来发生了什么变化。

2)考虑如何处理删除以及如何将它们通知 Elasticsearch。 通常,删除一条记录会导致它立即从 MySQL 数据库中删除。 没有删除的记录。 Logstash 未检测到更改,因此该记录保留在 Elasticsearch 中。

有两种可能的方法来解决这个问题

  • 你可以在源数据库中使用 “软删除(soft deletes)”。 本质上,记录首先通过布尔标志标记为删除。 当前正在使用你的源数据库的其他程序必须在其查询中过滤掉 “软删除”。 “软删除” 被发送到 Elasticsearch,在那里可以处理它们。 之后,你的源数据库和 Elasticsearch 都必须删除这些 “软删除”。
  • 你可以定期清除基于数据库的 Elasticsearch 索引,然后使用新摄取的数据库内容刷新 Elasticsearch。

3)登录到你的 MySQL 服务器并将三个记录添加到你的新数据库:

use es_database
INSERT INTO es_table (id, client_name)
VALUES (1,"Targaryen"),
(2,"Lannister"),
(3,"Stark");

4) 使用 SQL 语句验证你的数据:

select * from es_table;

现在,让我们回到 Logstash 并将其配置为接收此数据。

使用 JDBC 输入插件配置 Logstash 管道

让我们设置一个示例 Logstash 输入管道,以从新的 JDBC 插件和 MySQL 数据库中获取数据。 除了 MySQL,你还可以从任何支持 JDBC 的数据库输入数据。

1)在 <localpath>/logstash-8.5.0/ 中,创建一个名为 jdbc.conf 的新文本文件。
2)将以下代码复制并粘贴到这个新的文本文件中。 此代码通过 JDBC 插件创建一个 Logstash 管道。

input {
  jdbc {
    jdbc_driver_library => "<driverpath>/mysql-connector-java-<versionNumber>.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db" 
    jdbc_user => "<myusername>" 
    jdbc_password => "<mypassword>" 
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
output {
  stdout { codec =>  "rubydebug"}
}

说明:

  • 指定本地 JDBC 驱动程序 .jar 文件的完整路径(包括版本号)。 例如:jdbc_driver_library => "/usr/share/mysql/mysql-connector-java-8.0.24.jar"
  • 提供 MySQL 主机的 IP 地址或主机名和端口。 例如,jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/es_database"
  • 提供您的 MySQL 凭据。 用户名和密码都必须用引号引起来。

注意:如果你使用的是 MariaDB(MySQL 的一个流行的开源社区分支),你需要做一些不同的事情:

  • 代替 MySQL JDBC 驱动程序,下载并解压缩 MariaDB 的 JDBC 驱动程序
  • 替换 jdbc.conf 代码中的以下行,包括最后一行中的 ANSI_QUOTES 片段:
jdbc_driver_library => "<driverPath>/mariadb-java-client-<versionNumber>.jar"
jdbc_driver_class => "org.mariadb.jdbc.Driver"
jdbc_connection_string => "jdbc:mariadb://<mySQLHost>:3306/es_db?sessionVariables=sql_mode=ANSI_QUOTES"

以下是有关 Logstash 管道代码的一些其他详细信息:

条目描述
jdbc_driver_libraryLogstash JDBC 插件未与 JDBC 驱动程序库一起打包。 必须使用 jdbc_driver_library 配置选项将 JDBC 驱动程序库显式传递到插件中。
tracking_column此参数指定字段 unix_ts_in_secs,该字段跟踪 Logstash 从 MySQL 读取的最后一个文档,存储在磁盘上的 logstash_jdbc_last_run 中。 该参数确定 Logstash 在其轮询循环的下一次迭代中请求的文档的起始值。 存储在 logstash_jdbc_last_run 中的值可以在 SELECT 语句中作为 sql_last_value 访问。
unix_ts_in_secs由 SELECT 语句生成的字段,其中包含 modification_time 作为标准 Unix 时间戳(自 epoch 以来的秒数)。 该字段由跟踪列引用。 Unix 时间戳用于跟踪进度而不是普通时间戳,因为普通时间戳可能会由于在 UMT 和本地时区之间正确来回转换的复杂性而导致错误。
sql_last_value这是一个包含 Logstash 轮询循环当前迭代起点的内置参数,它在 JDBC 输入配置的 SELECT 语句行中被引用。 此参数设置为从 .logstash_jdbc_last_run 中读取的 unix_ts_in_secs 的最新值。 该值是在 Logstash 轮询循环中执行的 MySQL 查询返回的文档的起点。 在查询中包含此变量可确保我们不会重新发送已存储在 Elasticsearch 中的数据。
schedule这使用 cron 语法来指定 Logstash 应多久轮询一次 MySQL 以获取更改。 规范 */5 * * * * * 告诉 Logstash 每 5 秒联系 MySQL。 来自此插件的输入可以安排为根据特定时间表定期运行。 此调度语法由 rufus-scheduler 提供支持。 语法类似于 cron,带有一些特定于 Rufus 的扩展(例如,时区支持)。
modification_time < NOW()SELECT 的这一部分将在下一节中详细解释。
filter在本节中,值 id 从 MySQL 记录复制到名为 _id 的元数据字段中,稍后在输出中引用该字段以确保每个文档都使用正确的 _id 值写入 Elasticsearch。 使用元数据字段可确保此临时值不会导致创建新字段。 id、@version 和 unix_ts_in_secs 字段也从文档中删除,因为它们不需要写入 Elasticsearch。
output本节指定应使用 ruby​​debug 输出将每个文档写入标准输出以帮助调试。

请参照我之前的文章 “Logstash:把 MySQL 数据导入到 Elasticsearch 中”。我们需要把下载的 JDBC 驱动拷贝到如下的目录中:

$ pwd
/Users/liuxg/elastic/logstash-8.5.0
$ ls logstash-core/lib/jars/mysql-connector-j-8.0.31.jar 
logstash-core/lib/jars/mysql-connector-j-8.0.31.jar

根据我的情况,我的 jdbc.conf 的代码如下:

jdbc.conf

input {
  jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/es_database" 
    jdbc_user => "root" 
    jdbc_password => "1234" 
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}

filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}

output {
  stdout { codec =>  "rubydebug"}
}

3)使用新的 JDBC 配置文件启动 Logstash:

bin/logstash -f jdbc.conf

Logstash 通过标准输出 (stdout),你的命令行界面输出您的 MySQL 数据。 初始数据加载的结果应类似于以下内容:

 Logstash 结果会定期显示 SQL SELECT 语句,即使在 MySQL 数据库中没有任何新内容或修改内容时也是如此:

 4)打开你的 MySQL 控制台。 让我们使用以下 SQL 语句将另一条记录插入该数据库:

use es_database
INSERT INTO es_table (id, client_name)
VALUES (4,"Baratheon");

切换回你的 Logstash 控制台。 Logstash 检测到新记录,控制台显示类似如下结果:

5)查看 Logstash 输出结果以确保你的数据看起来正确。 使用 CTRL+C 关闭 Logstash。

输出到 Elasticsearch

在本节中,我们配置 Logstash 以将 MySQL 数据发送到 Elasticsearch。 我们修改在使用 JDBC 输入插件配置 Logstash 管道部分中创建的配置文件,以便数据直接输出到 Elasticsearch。 我们启动 Logstash 发送数据,然后登录 Kibana 来验证数据。

关于如何配置带有 HTTPS 连接的 Elasticsearch,请阅读我的文章 “Logstash:如何连接到带有 HTTPS 访问的集群”。我们首先按照那篇文章中步骤生成一个 truststore.p12 证书:

1)打开 Logstash 文件夹下的 jdbc.conf 文件进行编辑。

2)用以下内容更新输出部分:

jdbc.conf

input {
  jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/es_database" 
    jdbc_user => "root" 
    jdbc_password => "1234" 
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}

filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}

output {
  stdout { codec =>  "rubydebug"}

  elasticsearch {
    hosts => ["https://localhost:9200"]
    index => "data-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "Hx_-Z6cnuT-T_LIqGJoV"
    ssl_certificate_verification => true
    truststore => "/Users/liuxg/elastic/elasticsearch-8.5.0/config/certs/truststore.p12"
    truststore_password => "password"
  }
}

当然你也可以选择使用 API key 来进行连接。详细步骤请参考文章 “Logstash:如何连接到带有 HTTPS 访问的集群”。

3)现在,如果您只是像使用新输出一样重新启动 Logstash,则不会有 MySQL 数据发送到我们的 Elasticsearch 索引。

这是为什么呢?Logstash 保留了以前的 sql_last_value 时间戳,并看到从那时起 MySQL 数据库中没有发生新的更改。 因此,根据我们配置的 SQL 查询,没有新数据要发送到 Logstash。

解决方案:在 jdbc.conf 文件的 JDBC 输入部分添加 clean_run => true 作为新行。 当设置为 true 时,此参数会将 sql_last_value 重置回零。

input {
  jdbc {
      ...
      clean_run => true
      ...
    }
}

在将 clean_run 设置为 true 运行一次 Logstash 后,你可以删除 clean_run 行,除非您希望在每次重新启动 Logstash 时再次发生重置行为。

4)打开命令行界面实例,转到您的 Logstash 安装路径,然后启动 Logstash:

bin/logstash -f jdbc.conf

运行上面的命令后,Logstash 就会把数据传入到 Elasticsearch 中。

在 Kibana 中检查数据

我们在 Kibana 的 console 中打入如下的命令:

GET data-*/_search

我们看到有四个数据已经被写入到 Elasticsearch 中了。

现在,你应该很好地了解如何配置 Logstash 以通过 JDBC 插件从关系数据库中获取数据。 你有一些设计注意事项来跟踪新的、修改的和删除的记录。 你应该具备开始试验自己的数据库和 Elasticsearch 所需的基础知识。

有关Elasticsearch:将关系数据库中的数据提取到 Elasticsearch 集群中的更多相关文章

  1. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  2. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  3. ruby-on-rails - Ruby net/ldap 模块中的内存泄漏 - 2

    作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代

  4. ruby-on-rails - Rails 3 中的多个路由文件 - 2

    Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题

  5. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  6. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

    我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

  7. ruby-on-rails - Rails 3.2.1 中 ActionMailer 中的未定义方法 'default_content_type=' - 2

    我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer

  8. ruby-on-rails - Rails 应用程序中的 Rails : How are you using application_controller. rb 是新手吗? - 2

    刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr

  9. ruby-on-rails - form_for 中不在模型中的自定义字段 - 2

    我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢

  10. ruby - rspec 需要 .rspec 文件中的 spec_helper - 2

    我注意到像bundler这样的项目在每个specfile中执行requirespec_helper我还注意到rspec使用选项--require,它允许您在引导rspec时要求一个文件。您还可以将其添加到.rspec文件中,因此只要您运行不带参数的rspec就会添加它。使用上述方法有什么缺点可以解释为什么像bundler这样的项目选择在每个规范文件中都需要spec_helper吗? 最佳答案 我不在Bundler上工作,所以我不能直接谈论他们的做法。并非所有项目都checkin.rspec文件。原因是这个文件,通常按照当前的惯例,只

随机推荐