草庐IT

Flink CDC 监听 Postgresql表的变化

dk168 2023-03-28 原文

前言

最近看文章说如何把Postgresql的数据同步给别的数据源,可以利用它的WAL,具体怎么操作没有说,我自己找到一篇文章https://www.cnblogs.com/xiongmozhou/p/14817641.html 可以利用Flink CDC。 我自己正好前段时间也看过Flink,把这个知识串起来也很有意义,于是开始动手试了一下,期间也遇到些困难,也尝试解决了,有些原理不是很清晰,记录下来,后面看能不能解决。

Postgresql配置

我们使用上篇文章搭建的Postgresql数据库,要让Postgresql支持同步给其它数据源,一个最关键的配置是更改wal日志方式为logical, 这个配置在postgresql.conf, 而我们docker里面的postgresql.conf这个配置又在哪个目录呢? 网上找到了答案:https://stackoverflow.com/questions/30848670/how-to-customize-the-configuration-file-of-the-official-postgresql-docker-image
进入psql后,使用如下命令

SHOW config_file;

得到如下的结果
/var/lib/postgresql/data/postgresql.conf
得到路径后, 我打算像平时一样用vi去修改,发现不行,这个postgresql的Image并没有安装vim。
如何修改呢,继续网上找答案 https://stackoverflow.com/questions/30848670/how-to-customize-the-configuration-file-of-the-official-postgresql-docker-image
方法很多,我们用个简单的,使用sed命令来修改

sed -i -e"s/^#wal_level = replica.*$/wal_level = logical/" /var/lib/postgresql/data/postgresql.conf

就是查找到“#wal_level = replica“,把它替换为“wal_level = logical”
修改后需要重启postgresql,执行如下命令

su - postgres -c "PGDATA=$PGDATA /usr/lib/postgresql/15/bin/pg_ctl -w restart"

执行后会退出docker,需要重新进入

新建用户和授予权限参考https://www.cnblogs.com/xiongmozhou/p/14817641.html
注意文档中使用CREATE USER user它建的用户是user,我用的这个用户名是不成功的,提示语法错误
感觉是把user当作保留命令参数了,用户名改为user1可以成功。

我们参考官方文档https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#connector-options
首先在已有的Flink项目中加入如下的pom

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>2.3.0</version>
            <scope>provided</scope>
        </dependency>

这里代码参考文档

        SourceFunction postgreSQLSource = PostgreSQLSource.<String>builder()
                .hostname("localhost")
                .port(5432)
                .database("postgres") // set captured database
                .tableList("postgres.market_price") // set captured table
                .username("user1")
                .password("pwd")
                .decodingPluginName("pgoutput")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
        .addSource(postgreSQLSource)
        .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print Postgres Snapshot + WAL");

有一点需要注意,官方文档中没有.decodingPluginName("pgoutput"),使用默认的decoderbufs,运行程序会提示
“PSQLException: ERROR: could not access file "decoderbufs": No such file or directory”, 修改成pgoutput,才能成功。 这里应该是要安装插件decoderbufs在Postgresql里面。这里暂时留下这个疑问,后面还有wal2json,看怎么把wal的值转成json格式显示出来。

程序运行起来后我们往表里插入和删除数据,可以在控制台中打印出变化来。
这里直接贴图

这里也有个疑问,我对表操作了三次,结果控制台打印出超过3条的信息,这里应该和是否commit有关
暂时也没有细究。

程序运行后,我们可以使用这个命令查看这个slot,
SELECT * FROM pg_replication_slots;

如果我们直接修改配置,比如把pgoutput改为别的,会提示slot flink已经存在,我们需要在postgresql里面把它先删除掉。

总结

总体上这个流程是打通了,但是对于里面的细节没有深入,比如flink怎么消费,里面的记录怎么显示出来,它里面实现的原理是什么,都需要花时间去研究,先开个头在这里。

有关Flink CDC 监听 Postgresql表的变化的更多相关文章

  1. ruby-on-rails - 启用 Rack::Deflater 时 ETag 发生变化 - 2

    在启用Rack::Deflater来gzip我的响应主体时偶然发现了一些奇怪的东西。也许我遗漏了一些东西,但启用此功能后,响应被压缩,但是资源的ETag在每个请求上都会发生变化。这会强制应用程序每次都响应,而不是发送304。这在没有启用Rack::Deflater的情况下有效,我已经验证页面源没有改变。我正在运行一个使用thin作为Web服务器的Rails应用程序。Gemfile.lockhttps://gist.github.com/2510816有没有什么方法可以让我从Rack中间件获得更多的输出,这样我就可以看到发生了什么?提前致谢。 最佳答案

  2. ruby-on-rails - 如何让 datamapper 与 postgresql 数据库一起工作? - 2

    我已经找到了几个使用datamapper的示例,并且能够让它们正常工作。不过,所有这些示例都是针对sqlite数据库的。我正在尝试将数据映射器与postgresql一起使用。我将datamapper中的调用从sqlite3更改为postgres,并且我已经安装了dm-postgres-adapter。但它仍然不起作用。我还需要做什么? 最佳答案 与SQLite不同,PostgreSQL不将数据库存储在单个文件中。在你拥有createdyourdatabase之后,尝试这样的事情:DataMapper.setup:default,{:

  3. sql - 在 Rails Console for PostgreSQL 的表中显示数据 - 2

    我找到了这样的东西:Rails:Howtolistdatabasetables/objectsusingtheRailsconsole?这一行没问题:ActiveRecord::Base.connection.tables并返回所有表但是ActiveRecord::Base.connection.table_structure("users")产生错误:ActiveRecord::Base.connection.table_structure("projects")我认为table_structure不是Postgres方法。如何列出Postgres数据库的Rails控制台中表中的所有

  4. ruby-on-rails - ruby 中两个哈希之间的变化 - 2

    我有两个具有以下格式的哈希mydetails[x['Id']]=x['Amount']这将包含如下数据hash1={"A"=>"0","B"=>"1","C"=>"0","F"=>"1"}hash2={"A"=>"0","B"=>"3","C"=>"0","E"=>"1"}我期待这样的输出:Differencesinhash:"B,F,E"非常感谢任何帮助。 最佳答案 这个解决方案可能更容易理解:(hash1.keys|hash2.keys).select{|key|hash1[key]!=hash2[key]}Array#|返回2

  5. ruby-on-rails - Rails 迁移中的 PostgreSQL 点类型 - 2

    我想使用PostgreSQL中的point类型。我已经完成了:railsgmodelTestpoint:point最终的迁移是:classCreateTests当我运行时:rakedb:migrate结果是:==CreateTests:migrating====================================================--create_table(:tests)rakeaborted!Anerrorhasoccurred,thisandalllatermigrationscanceled:undefinedmethod`point'for#/hom

  6. ruby-on-rails - Ruby on Rails 单表继承(STI)和单元测试问题(使用 PostgreSQL) - 2

    我正在使用带有单个“帐户”表的STI模型来保存用户和技术人员的信息(即用户...8)错误:test_the_truth(用户测试):ActiveRecord::StatementInvalid:PGError:ERROR:关系“技术人员”不存在:从“技术人员”中删除...从本质上讲,标准框架不承认Technicians和Users表(或PostgreSQL称它们为“关系”)不存在,事实上,应该别名为Accounts。有什么想法吗?我对RoR比较陌生,不知道如何解决这个问题而又不完全删除STI。 最佳答案 原来问题是由于存在:./te

  7. ruby - rails 3.2.2(或 3.2.1)+ Postgresql 9.1.3 + Ubuntu 11.10 连接错误 - 2

    我正在使用PostgreSQL9.1.3(x86_64-pc-linux-gnu上的PostgreSQL9.1.3,由gcc-4.6.real(Ubuntu/Linaro4.6.1-9ubuntu3)4.6.1,64位编译)和在ubuntu11.10上运行3.2.2或3.2.1。现在,我可以使用以下命令连接PostgreSQLsupostgres输入密码我可以看到postgres=#我将以下详细信息放在我的config/database.yml中并执行“railsdb”,它工作正常。开发:adapter:postgresqlencoding:utf8reconnect:falsedat

  8. ruby-on-rails - Rails 是否支持监听 UDP 套接字的简洁方式? - 2

    在Rails中,什么是集成更新模型某些元素的UDP监听过程的最佳方式(特别是它将向其中一个表添加行)。简单的答案似乎是在同一个进程中使用UDP套接字对象启动一个线程,但我什至不清楚我应该在哪里做适合Rails方式的事情。有没有一种巧妙的方法来开始收听UDP?具体来说,我希望能够编写一个UDPController并在每个数据报消息上调用一个特定的方法。理想情况下,我希望避免在UDP上使用HTTP(因为它会浪费一些在这种情况下非常宝贵的空间),但我完全控制消息格式,因此我可以为Rails提供它需要的任何信息。 最佳答案 Rails是一个

  9. ruby-on-rails - 如何在文本字段rails postgresql中查询json数据 - 2

    我有一个数据库表列(collection),datatypetext我在collection中插入了json数据这是我的json数据{"name":"test","age":"25","country":"xxx"}但是现在我该如何查询这个json数据。有没有如下选项User.where(collection::name)#Herenamedenotestest 最佳答案 由于JSON是一个字符串,您只能使用SQLite/MySQL(或任何其他数据库)进行字符串比较。此外,尽量将数据类型保持为二进制。您可以执行:LIKE操作以在您存

  10. ruby-on-rails - Rails Postgresql 多个模式和相同的表名 - 2

    我在两个不同的模式中有两个表,例如案例和事件。在每个模式中我都有基本表events.basiccases.basic这个表有关系:events.basic有一个cases.basic(cases.basic有多个events.basic)我的尝试失败了:文件cases_basic.rbclassCasesBasic'EventsBasic',:foreign_key=>'case_id'end文件events_basic.rbclassEventsBasic'CasesBasic',:foreign_key=>'case_id'end环境:Ruby1.9.3、Rails3.1.3、ge

随机推荐