我在使用 Confluent JDBC 连接器时遇到了非常奇怪的行为。我很确定它与 Confluent 堆栈无关,而是与 Kafka-connect 框架本身有关。
因此,我将 offset.storage.file.filename 属性定义为默认 /tmp/connect.offsets 并运行我的接收器连接器。显然,我希望连接器在给定文件中保留偏移量(它在文件系统中不存在,但应该自动创建,对吧?)。文档说:
offset.storage.file.filenameThe file to store connector offsets in. By storing offsets on disk, a standalone process can be stopped and started on a single node and resume where it previously left off.
但是 Kafka 的行为方式完全不同。
这是一个错误,还是更有可能,我不明白如何使用此配置?我了解两种持久偏移量方法之间的区别,文件存储更符合我的需求。
最佳答案
offset.storage.file.filename 仅在源 连接器中以独立模式使用。它用于在输入数据源上放置一个书签并记住它停止读取它的位置。创建的文件包含诸如文件行号(对于文件源)或表行号(对于一般的 jdbc 源或数据库)之类的内容。
在分布式模式下运行 Kafka Connect 时,此文件将替换为默认命名为 connect-offsets 的 Kafka 主题,应复制该主题以容忍故障。
就 sink 连接器而言,无论使用哪种插件或模式(独立/分布式),它们都会将上次停止读取其输入主题的位置存储在名为 __consumer_offsets 就像任何 Kafka 消费者一样。这允许使用 kafka-consumer-groups.sh 命令行工具等传统工具来查看接收器连接器滞后的程度。
Confluent Kafka replicator ,尽管是源连接器,但可能是一个异常(exception),因为它从远程 Kafka 读取并可能使用 Kafka 消费者,但只有一个集群会维护那些原始消费者组偏移量。
我同意文档不明确,无论连接器类型是什么(源或接收器)都需要此设置,但它仅由源连接器使用。这个设计决定背后的原因是单个 Kafka Connect worker(我的意思是单个 JVM 进程)可以运行多个连接器,可能包括源连接器和接收器连接器。换句话说,此设置是工作级别设置,而不是连接器设置。
关于apache-kafka - Kafka-connect sink任务忽略文件偏移存储属性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42072854/
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我希望我的UserPrice模型的属性在它们为空或不验证数值时默认为0。这些属性是tax_rate、shipping_cost和price。classCreateUserPrices8,:scale=>2t.decimal:tax_rate,:precision=>8,:scale=>2t.decimal:shipping_cost,:precision=>8,:scale=>2endendend起初,我将所有3列的:default=>0放在表格中,但我不想要这样,因为它已经填充了字段,我想使用占位符。这是我的UserPrice模型:classUserPrice回答before_val
我有一个包含模块的模型。我想在模块中覆盖模型的访问器方法。例如:classBlah这显然行不通。有什么想法可以实现吗? 最佳答案 您的代码看起来是正确的。我们正在毫无困难地使用这个确切的模式。如果我没记错的话,Rails使用#method_missing作为属性setter,因此您的模块将优先,阻止ActiveRecord的setter。如果您正在使用ActiveSupport::Concern(参见thisblogpost),那么您的实例方法需要进入一个特殊的模块:classBlah
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
我有一个具有一些属性的模型:attr1、attr2和attr3。我需要在不执行回调和验证的情况下更新此属性。我找到了update_column方法,但我想同时更新三个属性。我需要这样的东西:update_columns({attr1:val1,attr2:val2,attr3:val3})代替update_column(attr1,val1)update_column(attr2,val2)update_column(attr3,val3) 最佳答案 您可以使用update_columns(attr1:val1,attr2:val2
我有这个html标记:我想得到这个:我如何使用Nokogiri做到这一点? 最佳答案 require'nokogiri'doc=Nokogiri::HTML('')您可以通过xpath删除所有属性:doc.xpath('//@*').remove或者,如果您需要做一些更复杂的事情,有时使用以下方法遍历所有元素会更容易:doc.traversedo|node|node.keys.eachdo|attribute|node.deleteattributeendend 关于ruby-Nokog
我知道我可以指定某些字段来使用pluck查询数据库。ids=Item.where('due_at但是我想知道,是否有一种方法可以指定我想避免从数据库查询的某些字段。某种反拔?posts=Post.where(published:true).do_not_lookup(:enormous_field) 最佳答案 Model#attribute_names应该返回列/属性数组。您可以排除其中一些并传递给pluck或select方法。像这样:posts=Post.where(published:true).select(Post.attr
对于Rails模型,是否可以/建议让一个类的成员不持久保存到数据库中?我想将用户最后选择的类型存储在session变量中。由于我无法从我的模型中设置session变量,我想将值存储在一个“虚拟”类成员中,该成员只是将值传递回Controller。你能有这样的类(class)成员吗? 最佳答案 将非持久属性添加到Rails模型就像任何其他Ruby类一样:classUser扩展解释:在Ruby中,所有实例变量都是私有(private)的,不需要在赋值前定义。attr_accessor创建一个setter和getter方法:classUs
我想这样组织C源代码:+/||___+ext||||___+native_extension||||___+lib||||||___(Sourcefilesarekeptinhere-maycontainsub-folders)||||___native_extension.c||___native_extension.h||___extconf.rb||___+lib||||___(Rubysourcecode)||___Rakefile我无法使此设置与mkmf一起正常工作。native_extension/lib中的文件(包含在native_extension.c中)将被完全忽略。