我正在使用 Kafka Streams (v0.10.0.1) 编写应用程序,并希望使用查找数据丰富我正在处理的记录。此数据(带时间戳的文件)每天(或每天 2-3 次)写入 HDFS 目录。
如何在 Kafka Streams 应用程序中加载它并加入实际的 KStream?
当新文件到达那里时从 HDFS 重新读取数据的最佳做法是什么?
或者切换到 Kafka Connect 并将 RDBMS 表内容写入 Kafka 主题,所有 Kafka Streams 应用程序实例都可以使用它会更好吗?
更新:
正如建议的那样,Kafka Connect 将是必经之路。因为查找数据在 RDBMS 中以每日为基础进行更新,所以我正在考虑按计划运行 Kafka Connect one-off job而不是保持连接始终打开。是的,因为语义和保持连接始终打开并确保它不会被中断等的开销。对我来说,在这种情况下进行预定的提取看起来更安全。
查找数据不大,记录可能被删除/添加/修改。我也不知道如何始终将完整转储到 Kafka 主题并截断以前的记录。启用日志压缩并为已删除的键发送空值可能不会起作用,因为我不知道源系统中删除了什么。此外,据我所知,压缩发生时我无法控制。
最佳答案
推荐的方法确实是将查找数据也提取到 Kafka 中——例如通过 Kafka Connect——正如您在上面自己建议的那样。
But in this case how can I schedule the Connect job to run on a daily basis rather than continuously fetch from the source table which is not necessary in my case?
也许您可以更新您的问题,您不想让 Kafka Connect 作业连续运行?您是否担心资源消耗(数据库上的负载),如果不是“每日更新”,您是否担心处理的语义,或者...?
Update: As suggested Kafka Connect would be the way to go. Because the lookup data is updated in the RDBMS on a daily basis I was thinking about running Kafka Connect as a scheduled one-off job instead of keeping the connection always open. Yes, because of semantics and the overhead of keeping a connection always open and making sure that it won't be interrupted..etc. For me having a scheduled fetch in this case looks safer.
Kafka Connect 是安全的,并且 JDBC 连接器的构建正是为了以健壮、容错和高性能的方式将数据库表输入 Kafka(已经有许多生产部署).所以我建议不要仅仅因为“它看起来更安全”而回退到“批量更新”模式;就我个人而言,我认为触发每日摄取在操作上不如保持连续(实时!)摄取运行方便,而且它还会为您的实际用例带来一些不利影响(请参阅下一段)。
当然,您的进度可能会有所不同——因此,如果您打算每天只更新一次,那就去做吧。但是你失去了 a) 在充实发生的时间点用最新的数据库数据来充实你的传入记录的能力,并且,相反地,b) 你可能实际上用陈旧的/旧数据来充实传入的记录,直到下一天更新已完成,这很可能会导致您向下游发送/提供给其他应用程序使用的数据不正确。例如,如果客户更新了她的送货地址(在数据库中),但您每天只将此信息提供给您的流处理应用程序(以及可能的许多其他应用程序)一次,那么订单处理应用程序会将包裹运送到错误的地方地址,直到下一次每日摄取完成。
The lookup data is not big and records may be deleted / added / modified. I don't know either how I can always have a full dump into a Kafka topic and truncate the previous records. Enabling log compaction and sending null values for the keys that have been deleted would probably won't work as I don't know what has been deleted in the source system.
Kafka Connect 的 JDBC 连接器已经为您自动处理了这个问题:1. 它确保数据库插入/更新/删除正确反射(reflect)在 Kafka 主题中,以及 2. Kafka 的日志压缩确保目标主题不会增长出界。也许您可能想阅读文档中的 JDBC 连接器以了解您刚刚免费获得的功能:http://docs.confluent.io/current/connect/connect-jdbc/docs/ ?
关于hadoop - 具有在 HDFS 上查找数据的 Kafka Streams,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39295017/
我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我刚刚被困在这个问题上一段时间了。以这个基地为例:moduleTopclassTestendmoduleFooendend稍后,我可以通过这样做在Foo中定义扩展Test的类:moduleTopmoduleFooclassSomeTest但是,如果我尝试通过使用::指定模块来最小化缩进:moduleTop::FooclassFailure这失败了:NameError:uninitializedconstantTop::Foo::Test这是一个错误,还是仅仅是Ruby解析变量名的方式的逻辑结果? 最佳答案 Isthisabug,or
有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳
我正在尝试解析一个CSV文件并使用SQL命令自动为其创建一个表。CSV中的第一行给出了列标题。但我需要推断每个列的类型。Ruby中是否有任何函数可以找到每个字段中内容的类型。例如,CSV行:"12012","Test","1233.22","12:21:22","10/10/2009"应该产生像这样的类型['integer','string','float','time','date']谢谢! 最佳答案 require'time'defto_something(str)if(num=Integer(str)rescueFloat(s
我正在使用Rails3.1并在一个论坛上工作。我有一个名为Topic的模型,每个模型都有许多Post。当用户创建新主题时,他们也应该创建第一个Post。但是,我不确定如何以相同的形式执行此操作。这是我的代码:classTopic:destroyaccepts_nested_attributes_for:postsvalidates_presence_of:titleendclassPost...但这似乎不起作用。有什么想法吗?谢谢! 最佳答案 @Pablo的回答似乎有你需要的一切。但更具体地说...首先改变你View中的这一行对此#
我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_
无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD
本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01 客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02 数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit
文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co