草庐IT

kafka-clients

全部标签

java - Kafka 0.11 中 sendOffsetsToTransaction 的含义

新的Kafka版本(0.11)支持exactly-once语义。https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging我在Java中使用kafka事务代码设置了一个生产者,就像这样。producer.initTransactions();try{producer.beginTransaction();for(ProducerRecordrecord:payload){producer.send(record);}Mapgrou

Java- apache http client- 使用示例显示了 cookie 的使用和从 HTTPResponse 对象中提取响应

我在javaweb应用程序中使用apachehttp客户端(v4),我遇到了以下情况,为此我需要简单的使用示例--(1)如何在ApacheHTTP客户端中使用Cookies,不同的cookies使用选项(2)当响应在HTTPResponse对象中可用时,提取字符集、mimetype、响应头(作为KeyValuePair)和伙伴(作为byte[])。 最佳答案 1)至于cookie,请参见示例:httpcomponents-client-4.1.3\examples\org\apache\http\examples\client\Cl

【登陆harbor仓库报错——解决方法】Error response from daemon:Get... http: server gave HTTP response to HTTs client

大家好,我是好学的小师弟。今天和大家分享下我在部署harbor的过程中遇到的一些问题的解决方法。问题:通过服务器docker登录harbor,发现登陆报错;Errorresponsefromdaemon:Get“https:.//.../v2/"":http:servergaveHTTPresponsetoHTTsclient解决方法:1.在服务器中,cd到docker目录下cd/etc/docker2.看这个目录下有没有daemon.json这个文件,如果没有就手动创建touchdaemon.json,然后vimdaemon.jsontouchdaemon.jsonvimdaemon.jso

java - 如何找到哪个消费者分配给kafka中主题的哪个分区?

我正在构建一个kafka管理器工具,我需要检查哪个主题分区分配给了消费者组中的哪个消费者。假设有消费者组group-A消费主题topic-A,n个分区,那么在group-A托管在不同的VM中。那么如何找到哪个分区分配给哪个消费者主机呢?在kafka0.9.1中可以吗?提前致谢。 最佳答案 您可以检查$KAFKA_HOME/bin/kafka-consumer-groups.sh的工作原理并将其实现集成到您的kafka管理器工具中,该工具将向您展示详细信息组所有者信息(例如,分区分配、滞后、IP)。小组主题分区CURRENT-OFFS

java - 如何使用 Java 中的结构化流从 Kafka 反序列化记录?

我使用Spark2.1。我正在尝试使用SparkStructuredStreaming从Kafka读取记录,反序列化它们并在之后应用聚合。我有以下代码:SparkSessionspark=SparkSession.builder().appName("Statistics").getOrCreate();Datasetdf=spark.readStream().format("kafka").option("kafka.bootstrap.servers",kafkaUri).option("subscribe","Statistics").option("startingOffset

java - 如何连接 Apache Kafka 和 Amazon S3?

我想使用KafkaConnect将来自Kafka的数据存储到存储桶s3中。我已经运行了一个Kafka的主题,并且创建了一个存储桶s3。我的主题有关于Protobuffer的数据,我尝试使用https://github.com/qubole/streamx我得到了下一个错误:[2018-10-0413:35:46,512]INFORevokingpreviouslyassignedpartitions[]forgroupconnect-s3-sink(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)[

Java 套接字 : One Server and Multiple Clients

所以我用java创建了一个基本的客户端-服务器程序。它开始是这样的:客户端连接到服务器服务器询问客户的名字客户回复姓名服务器问候客户在此之后,客户端说话,服务器重复这些话我使用thistutorial没有太多麻烦就让它工作了.每当我尝试介绍多个客户时,问题就会出现。我认为它会工作,因为我使用的是多线程,但是,第二个客户端只是挂起,直到第一个客户端退出然后它开始工作(服务器确实接受来自第二个客户端的输入,但它没有响应直到第一个客户退出。这是我使用的代码:importjava.net.*;importjava.io.*;publicclassServerextendsThread{priv

java - 在Java中创建之前检查kafka中是否存在主题

我正在尝试使用以下方法在kafka0.8.2中创建一个主题:AdminUtils.createTopic(zkClient,myTopic,2,1,properties);如果我在本地多次运行代码进行测试,则会失败,因为主题已经创建。有没有办法在创建主题之前检查主题是否存在?TopicCommandapi似乎没有为listTopics或describeTopic返回任何内容. 最佳答案 您可以使用kakfa-client版本0.11.0.0的AdminClient示例代码:Propertiesconfig=newProperties

java - 如何在jersey-client java中实现重试机制

我正在使用jersey-client进行一些httprestapi调用。现在我想重试失败请求。说如果返回错误代码不是200那么我想再试几次。如何使用Jersey客户端做到这一点 最佳答案 来晚了,但是您可以使用几种不同的机制。同步方法看起来像这样:publicResponseexecWithBackoff(Callablei){ExponentialBackOffbackoff=newExponentialBackOff.Builder().build();longdelay=0;Responseresponse;do{try{Th

java - "error_description": "AADSTS70002: The request body must contain the following parameter: ' client_secret or client_assertion'

我使用了https://github.com/AzureAD/azure-activedirectory-library-for-java/blob/master/src/samples/public-client-app-sample/src/main/java/PublicClient.java中的代码.唯一的区别是CLIENT_ID已更新。我一直收到错误消息"error_description":"AADSTS70002:Therequestbodymustcontainthefollowingparameter:'client_secretorclient_assertion'