草庐IT

同步到Kafka

全部标签

java - Spark Streaming Kafka 消息未被消费

我想使用Spark(1.6.2)Streaming从Kafka(代理v0.10.2.1)中的主题接收消息。我正在使用Receiver方法。代码如下:publicstaticvoidmain(String[]args)throwsException{SparkConfsparkConf=newSparkConf().setAppName("SimpleStreamingApp");JavaStreamingContextjavaStreamingContext=newJavaStreamingContext(sparkConf,newDuration(5000));//MaptopicM

kafka为什么性能这么高?

Kafka系统架构Kafka是一个分布式流处理平台,具有高性能和可伸缩性的特点。它使用了一些关键的设计原则和技术,以实现其高性能。上图是Kafka的架构图,Producer生产消息,以Partition的维度,按照一定的路由策略,提交消息到Broker集群中各Partition的Leader节点,Consumer以Partition的维度,从Broker中的Leader节点拉取消息并消费消息。Producer发送消息:Producer生产消息会涉及大量的消息网络传输,如果Producer每生产一个消息就发送到Broker会造成大量的网络消耗,严重影响到Kafka的性能。为了解决这个问题,Kaf

java - Spring Kafka 生产者抛出 TimeoutExceptions

问题我在Kubernetes中有一个Kafka设置和三个代理,根据https://github.com/Yolean/kubernetes-kafka上的指南设置.从Java客户端生成消息时出现以下错误消息。2018-06-0611:15:44.103ERROR1---[ad|producer-1]o.s.k.support.LoggingProducerListener:Exceptionthrownwhensendingamessagewithkey='null'andpayload='[...redacted...]':org.apache.kafka.common.errors

java - 与边缘同步的编译器是否在两个方向上重新排序障碍?

我有一个关于Java内存模型的问题。给定以下示例:action1action2synchronized(monitorObject){//acquireaction3}//releaseaction4acquire和release可以是任何同步边缘(锁定、解锁、启动线程、加入线程、检测线程中断、volatile-write、volatile-read、等等)是否保证action3在获取之前不能移动并且在释放之后不能移动?p>是否保证action2不能在获取之后(无论是在发布之前还是发布之后)和action4不能在发布之前移动(无论是在获取之前还是之后)?对于编译器的重新排序操作,与边同

java - 确保所有任务完成的同步对象

我应该使用哪个Java同步对象来确保完成任意数量的任务?约束是:每项任务都需要很长时间才能完成,适合并行执行任务。有太多任务无法放入内存(即我无法将每个任务的Future放入Collection中,然后调用get在所有future上)。我不知道会有多少任务(即我不能使用CountDownLatch)。ExecutorService可能是共享的,所以我不能使用awaitTermination(long,TimeUnit)例如,对于GrandCentralDispatch,我可能会这样做:letworkQueue=dispatch_get_global_queue(QOS_CLASS_BA

消息引擎系统KAFKA

消息引擎介绍消息引擎:用于在不同系统之间传输消息传输消息的格式:信息表达业务语义无歧义最大限度地提供可重用性通用性kafka传输消息格式:二进制的字节序列传输消息的协议:点对点模型,也叫消息队列模型发布/订阅模型,发送方也成为发布者,接受方成为订阅者,与点对点不同的是,这个模型可能存在多个发布者向相同的主题(topic)发送消息,而订阅者也可能有多个,它们都能接收到相同主题的消息kafka传输消息的协议:以上两种都支持JMS(JavaMessageService):支持上面两种消息引擎模型,但它非传输协议,而仅仅是一组API,ActiveMQ、RabbitMQ、IBM的WebSphereMQ和

Spark写入kafka(批数据和流式)

Spark写入(批数据和流式处理)Spark写入kafka批处理写入kafka基础#spark写入数据到kafkafrompyspark.sqlimportSparkSession,functionsasFss=SparkSession.builder.getOrCreate()#创建df数据df=ss.createDataFrame([[9,'王五',21,'男'],[10,'大乔',20,'女'],[11,'小乔',22,'女']],schema='idint,namestring,ageint,genderstring')df.show()#todo注意一:需要拼接一个value#在写入

apache-kafka - Kafka-connect sink任务忽略文件偏移存储属性

我在使用ConfluentJDBC连接器时遇到了非常奇怪的行为。我很确定它与Confluent堆栈无关,而是与Kafka-connect框架本身有关。因此,我将offset.storage.file.filename属性定义为默认/tmp/connect.offsets并运行我的接收器连接器。显然,我希望连接器在给定文件中保留偏移量(它在文件系统中不存在,但应该自动创建,对吧?)。文档说:offset.storage.file.filenameThefiletostoreconnectoroffsetsin.Bystoringoffsetsondisk,astandaloneproce

java - kafka消费者轮询超时

我正在使用Kafka并尝试使用它的数据。从下面这行,我可以轮询来自Kafka的数据。while(true){ConsumerRecordsrecords=consumer.poll(Long.MAX_VALUE);for(ConsumerRecordrecord:records){//retrievedata}}我的问题是,与提供200作为超时相比,我通过提供Long.MAX_VALUE作为超时获得的好处是什么。将运行生产的系统的最佳实践是什么。谁能解释一下高超时与低超时的区别,以及应该在生产系统中使用哪个? 最佳答案 设置MAX_

java - 对于在静态哈希表上同步的 java.util.Calendar 构造函数,我们可以做些什么?

我惊恐地看到我们的许多应用程序线程在从日历的构造函数访问的java.util.Hashtable.get(xx)方法上竞争同步。atjava.util.Hashtable.get(java.lang.Object)atjava.util.Calendar.setWeekCountData(java.util.Locale)atjava.util.Calendar.(java.util.TimeZone,java.util.Locale)atjava.util.GregorianCalendar.(java.util.TimeZone,java.util.Locale)ctor查找一个静