草庐IT

Kafka-eagle

全部标签

java - Kafka - 使用高级消费者实现延迟队列

想要使用高级消费者api实现延迟消费者主要思想:按key生成消息(每个消息包含创建时间戳)这确保每个分区都按生成时间对消息进行排序。auto.commit.enable=false(将在每个消息处理后显式提交)消费一条消息检查消息时间戳并检查是否已经过了足够的时间处理消息(这个操作永远不会失败)提交1个偏移量while(it.hasNext()){valmsg=it.next().message()//checkstimestampinmsgtoseedelayperiodexceededwhile(!delayedPeriodPassed(msg)){waitSomeTime()//

python - 使用 python Spark 将大型 CSV 发送到 Kafka

我正在尝试向kafka发送大型CSV。基本结构是读取一行CSV并将其与标题一起压缩。a=dict(zip(header,line.split(",")然后将其转换为json:message=json.dumps(a)然后我使用kafka-python库发送消息fromkafkaimportSimpleProducer,KafkaClientkafka=KafkaClient("localhost:9092")producer=SimpleProducer(kafka)producer.send_messages("topic",message)使用PYSPARK我很容易地从CSV文件中

python - 使用 Kerberos 将 Kafka-Python 与集群连接

我正在尝试使用Kafka-Python连接到Kafka,Kafka集群具有Kerberos,我们需要构建一些命令来执行几个步骤。我在集群上创建了一个Topic,并使用./kafka-console-producer.sh和./kafka-console-consumer.sh进行了测试并且效果很好。但是当我尝试连接Kafka-Python时遇到了问题。请参阅下面的代码:defproduce():print('Producer')k_producer=KafkaProducer(bootstrap_servers='hostname:6667',security_protocol='SA

python - 使用气流将文件流式传输到kafka

使用气流将CSV文件流式传输到kafka主题的最佳方法是什么?为气流编写自定义运算符? 最佳答案 可能最好使用PythonOperator逐行处理文件。我有一个用例,我轮询和SFTP服务器获取文件,当我找到一些文件时,我逐行处理它们,将结果写为JSON。我会做一些事情,比如将日期解析为YYYY-MM-DD格式等。这样的事情可能对你有用:defcsv_file_to_kafka(**context):f='/path/to/downloaded/csv_file.csv'csvfile=open(f,'r')reader=csv.Di

python - R中的kafka消费者

我正在寻找用Python或R(最好是R)组合一个kafka消费者。使用kafka控制台消费者,我可以grep获取一个字符串并检索相关数据,但是当涉及到在R中适本地解析它时,我不知所措。有其他语言的kafka客户端(例如:PHP、CPP),但从数据分析的角度来看,R中的客户端会有所帮助。如果此论坛上的专家R开发人员可以暗示/建议资源,使我能够朝这个方向取得进展,那就太好了。Apache卡夫卡:incubator.apache.org/kafka/Kafka消费者客户端:https://github.com/kafka-dev/kafka/tree/master/clients

python - 如何使用 Apache Kafka 修复 Python2.7 中的 "AssertionError: Value must be bytes"错误

我是ApacheKafka技术的新手。我正在尝试使用python2.7将消息作为JSON对象发送到kafka主题,但出现“AssertionError:Valuemustbebytes”错误。我可以成功地以字符串形式发送消息,我可以使用kafka-console-consumer.sh查看我的消息。我正在使用apachekafka2.10-0.8.2.1版本。我在下面给出我的代码。fromkafkaimportKafkaProducerimportyamlproducer=KafkaProducer(bootstap_servers="localhost:9092")msg=yaml.

python - 我怎样才能找到kafka配置文件?

我在尝试创建主题时使用kafkaErrorwhileexecutingtopiccommandreplicationfactor:1largerthanavailablebrokers:0时出错。我通过修改配置文件找到了解决方案(ErrorcreatingKafkaTopics-Replicationfactorlargerthanavailablebrokers)cpconfig/server.propertiesconfig/server-1.propertiescpconfig/server.propertiesconfig/server-2.properties)但我的问题是我

Windows 下 Kafka 2.8.1 启动报错“输入行太长”问题解决方案

问题描述在Windows下运行Kafka2.8.1时有可能报错“输入行太长。命令语法不正确”。出现这个问题的原因是Windows命令长度最长支持8192个字符,Kafka的启动脚本中有命令拼接classpath变量,classpath的内容是每个jar包的绝对路径拼接的字符串。当Kafka的位置比较深,也就是目录比较长的时候,classpath的长度就会超长。之前版本的Kafka没有问题是因为2.8.1版本的Kafka的libs目录下的jar包数量增加了,也就是说之前版本也是存在这个隐患的。解决方案既然知道了是classpath过长引起的,那么缩短其长度就可以了。在使用IDEA开发时已经遇到过

python - 如何使用 kafka-python 订阅多个 kafka 通配符模式的列表?

我正在使用带有通配符的模式订阅Kafka,如下所示。通配符代表一个动态客户ID。consumer.subscribe(pattern='customer.*.validations')这很有效,因为我可以从主题字符串中提取客户ID。但现在我需要扩展功能,以便出于稍微不同的目的收听类似的主题。我们称它为customer.*.additional-validations。代码需要存在于同一个项目中,因为共享了如此多的功能,但我需要能够根据队列类型采用不同的路径。在Kafkadocumentation我可以看到可以订阅一系列主题。然而,这些是硬编码的字符串。不是允许灵active的模式。>>

Kafka集群部署与测试

目录Kafka集群部署与测试一、安装Kafka二、启动kafka服务 Kafka集群部署与测试一、安装Kafka①、Alt+P拷贝安装包  cd/opt/moduletar-zxvf/opt/software/kafka_2.11-2.0.0.tgz-C/opt/module/②、修改配置文件vi/opt/module/kafka_2.11-2.0.0/config/server.properties  增加环境变量vi/etc/profile  exportKAFKA_HOME=/opt/module/kafka_2.11-2.0.0 exportPATH=$PATH:$KAFKA_HOME