草庐IT

kafka-admin-client-thread

全部标签

java - Kafka Java 消费者仅适用于本地主机,无法用于远程服务器

我使用Kafka已经两个月了,我使用这段代码在本地消费消息。我最近决定分发Zookeeper和Kafka,一切似乎都运行良好。当我尝试从远程IP使用消费者代码时,我的问题就开始了;一旦我将seeds.add("127.0.0.1");更改为seeds.add("104.131.40.xxx");我收到此错误消息:run:ErrorcommunicatingwithBroker[104.131.40.xxx]tofindLeaderfor[temperature,0]Reason:java.net.ConnectException:ConnectionrefusedCan'tfindme

java - 无法使用kafka Producer API与kafka服务器通信

我已经在单个节点上设置了kafka,并启动了zookeeper和kafka服务器。我在控制台上针对内部生产者和消费者对其进行了测试,并且运行良好。但是当我在控制台上运行内部kafka消费者时,我的自定义生产者它不起作用。下面是我的Producer类Propertiesprops=newProperties();props.put("metadata.broker.list","xx.xx.xx.xx:9092");props.put("serializer.class","kafka.serializer.StringEncoder");props.put("partitioner.c

java - 'Thread termination due to failure'指的是什么?

ExecutorService的javadoc有时会提到线程“由于失败”而终止的情况。但是,目前尚不清楚这指的是哪种故障。例如,singlethreadexecutor文档说ifthissinglethreadterminatesduetoafailureduringexecutionpriortoshutdown,anewonewilltakeitsplaceifneededtoexecutesubsequenttasks我本来以为这种情况可能发生在Exception的情况下,也可能是RuntimeException,但似乎并不是这样。运行以下代码似乎给出了相同的线程名称和线程ID。

java - kafka log-compaction消费数据

我正在阅读最新版本的kafka中的日志压缩,我很好奇这对消费者有何影响。消费者是否像以前一样工作,或者是否有一个新的流程来获取所有最新值?对于“标准”Kafka主题,我使用消费者组来维护指向最新值的指针。但是,如果Kafka基于键而不是时间来保存值,我想知道消费者群体将如何运作? 最佳答案 它不会影响消费者的工作方式。如果你只对每个键的最新值感兴趣并阅读整个主题,你可能仍然会看到一个键的“重复项”(如果不是所有重复项都被消除,或者在上次压缩运行后写入新消息)因此你只关心关于每个键的最新值。关于消费者组:当主题被压缩时,有效偏移量范围

Java Thread.sleep 最短时间

这个问题在这里已经有了答案:HowaccurateisThread.sleep?(3个答案)关闭5年前。TimeUnit.sleep(longtimeout)文档这样描述它的论点:timeout-theminimumtimetosleep.但是,我发现—至少在带有Java8update141的Windows764位上—线程hibernate的时间通常少于:publicstaticvoidmain(String[]args)throwsInterruptedException{finallongfrom=TimeUnit.MILLISECONDS.toNanos(100);finallo

java - 为什么 Kafka Direct Stream 会为每条消息创建一个新的解码器?

我有一个用Java编写并使用Spark2.1的Spark流应用程序。我正在使用KafkaUtils.createDirectStream来读取来自Kafka的消息。我正在为kafka消息使用kryo编码器/解码器。我在Kafkaproperties->key.deserializer,value.deserializer,key.serializer,value.deserializer中指定了这个当Spark在微批中拉取消息时,使用kryo解码器成功解码消息。但是我注意到Spark执行程序创建了一个新的kryo解码器实例,用于解码从kafka读取的每条消息。我通过将日志放入解码器构造

java - 使用 Spring Boot 1.5 避免 Kafka Streams 在测试中启动

在我的SpringBoot应用程序的测试过程中,我遇到了一个非常烦人的问题。我有一个使用KafkaStreams并在专用配置文件中声明它们的应用程序。@EnableKafka@EnableKafkaStreams@ConfigurationpublicclassKafkaStreamConfiguration{@Bean(name=KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)publicStreamsConfigkStreamsConfigs(){//Omissis}@BeanpublicKStre

java - Apache Kafka 1.0.0 Streams API Multiple Multilevel groupby

我如何在KafkaStreamsAPI中使用具有多个约束的.groupby。与下面的Java8StreamsAPI示例相同publicvoidtwoLevelGrouping(Listpersons){finalMap>>personsByCountryAndCity=persons.stream().collect(groupingBy(Person::getCountry,groupingBy(Person::getCity)));System.out.println("PersonslivinginLondon:"+personsByCountryAndCity.get("UK"

java - 单例对象工厂 : is this code thread-safe?

我有一个用于许多单例实现的通用接口(interface)。接口(interface)定义了可以抛出检查异常的初始化方法。我需要一个工厂来按需返回缓存的单例实现,想知道以下方法是否线程安全?UPDATE1:请不要建议任何第三部分库,因为由于可能的许可问题,这将需要获得法律许可:-)更新2:此代码可能会在EJB环境中使用,因此最好不要产生额外的线程或使用类似的东西。interfaceSingleton{voidinit()throwsSingletonException;}publicclassSingletonFactory{privatestaticConcurrentMap>CACH

java - ORA-12519, TNS :no appropriate service handler found while inserting into Oracle Database with X threads

我正在尝试插入到具有两列的Oracle数据库中-IDPrimaryKeyvarchar2(4000)ACCOUNTvarchar2(4000)我为此编写了一个多线程程序。并且每个线程每次都使用uniqueid插入ID列,因为ID是主键。我在某些时候面临的唯一问题是-下面的代码在运行几秒钟后抛出以下异常。1)NullPointerException2)java.sql.SQLException:Listenerrefusedtheconnectionwiththefollowingerror:ORA-12519,TNS:noappropriateservicehandlerfound我无