草庐IT

Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

一、案例说明现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“\t”键分割,数据内容及数据格式如下:二、前置准备工作项目环境说明LinuxUbuntu16.04jdk-7u75-linux-x64scala-2.10.4kafka_2.10-0.8.2.2spark-1.6.0-bin-hadoop2.6开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。/apps/zookeeper/bin/zkServer.shstartcd/apps/kafkabin/

JAVA实时获取kafka各个主题下分区消息的消费情况

目标通过指定主题和消费者组调用方法,实时查看主题下分区消息的消费情况(消息总数量、消费消息数量、未消费的消息数量)。工具类packagecom.utils.kafka;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.Properties;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.consumer.Offs

华为云上的一次kafka集群故障处理

问题现象:  生产者的日志中大量的超时   2022-02-1709:29:41,692[kafka-producer-network-thread|monolith-rule-engine-xm2m-IOT-0003]WARN o.t.s.q.k.TbKafkaProducerTemplate-Producertemplatefailure:Expiring2record(s)fortb_rule_engine.main.0-0:120000mshaspassedsincebatchcreationorg.apache.kafka.common.errors.TimeoutException

华为云上的一次kafka集群故障处理

问题现象:  生产者的日志中大量的超时   2022-02-1709:29:41,692[kafka-producer-network-thread|monolith-rule-engine-xm2m-IOT-0003]WARN o.t.s.q.k.TbKafkaProducerTemplate-Producertemplatefailure:Expiring2record(s)fortb_rule_engine.main.0-0:120000mshaspassedsincebatchcreationorg.apache.kafka.common.errors.TimeoutException

kafka send data 超时问题 报错:xx ms has passed since last append

转行到大数据将近一年了,在工作中经常遇到kafkasenddata超时的报错,今天给各位道友浅谈一下这类问题的经验:报错日志:xxmshaspassedsincelastappend/xxmshaspassedsincebatchcreationpluslingertime/mshaspassedsincelastattemptplusbackofftime此异常错误即RecordBatch#maybeExpire方法抛出,意思是:在设置的timeout时间内send线程没有将client缓存内的请求发送出去。原因通常来说为以下几种:kafka服务端压力过大导致处理请求慢,查看kafka服务端

golang整合kafka

kafka基本概念消息队列1、什么是消息队列消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。消息队列(MessageQueue)是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。2、消息队列的应用场景应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少

Failed to send data to Kafka

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:FailedtosenddatatoKafka:Themessageis1446026byteswhenserializedwhichislargerthanthemaximumrequestsizeyouhaveconfiguredwiththemax.request.sizeconfiguration.数据太大无法发送至kafka.需要调整produceconfigsinkProperties.setProperty("max.request.size","214

安装Zookeeper和Kafka集群

安装Zookeeper和Kafka集群本文介绍如何安装Zookeeper和Kafka集群。为了方便,介绍的是在一台服务器上的安装,实际应该安装在多台服务器上,但步骤是一样的。安装Zookeeper集群下载安装包从官网上下载安装包:curlhttps://dlcdn.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz-oapache-zookeeper-3.7.1-bin.tar.gz解压:tarxvfapache-zookeeper-3.7.1-bin.tar.gz配置创建目录zk1,然后添加如下配置:z

安装Zookeeper和Kafka集群

安装Zookeeper和Kafka集群本文介绍如何安装Zookeeper和Kafka集群。为了方便,介绍的是在一台服务器上的安装,实际应该安装在多台服务器上,但步骤是一样的。安装Zookeeper集群下载安装包从官网上下载安装包:curlhttps://dlcdn.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz-oapache-zookeeper-3.7.1-bin.tar.gz解压:tarxvfapache-zookeeper-3.7.1-bin.tar.gz配置创建目录zk1,然后添加如下配置:z

【Jeepay】02-Kafka实现延迟消息与广播模式详细设计

在专题的上一章中,重点讲解了项目的改造背景、难点分析传送门:【Jeepay】01-Kafka实现延迟消息与广播模式概要设计在进入正篇之前,想简单说一下,之所以会如此的追本溯源的去记录:第一是因为:一个可以落地的解决方案的敲定,是综合项目各方面的原因得到的。没有完美的架构,只有刚好的架构;没有满足一切的架构,只有满足目标的架构。第二是因为想要通过这样的记录,让后面的同学能快速的理解:实践中并不需要沿用我的解决方案,只要能把思路打开,一定会找到更加适合你们项目的方式。好了废话不多说,本章会就Kafka实现延迟消息与广播模式的技术细节展开讨论。Kafka延迟发送解决思路:Kafka延迟发送的解决思路