草庐IT

分布式消息队列Kafka(二)- 生产者

weixin_42232931 2023-12-15 原文

1.生产者消息发送流程

(1)消息发送原理

​ 在消息发送的过程中,涉及到了两个线程——main线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

(2)生产者重要参数列表

参数名称描述
bootstrap.servers生产者连接集群所需的 broker 地 址 清 单 。例如 hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息。
key.serializer 和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memoryRecordAccumulator 缓冲区总大小,默认 32m。
batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader 收到数据后应答。-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的。
max.in.flight.requests.per.connection允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是 100ms。
enable.idempotence是否开启幂等性,默认 true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。

2.生产者异步发送API

(1)普通异步发送

(2)发送代码

1)创建工程

2)pom依赖

<dependencies>
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>3.0.0</version>
 </dependency>
</dependencies>

3)普通异步代码

package com.zrclass.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {
 public static void main(String[] args) throws InterruptedException {
  // 1. 创建 kafka 生产者的配置对象
  Properties properties = new Properties();
  // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
 
  // key,value 序列化(必须):key.serializer,value.serializer
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
 
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  // 3. 创建 kafka 生产者对象
  KafkaProducer<String, String> kafkaProducer = new 
  KafkaProducer<String, String>(properties);
  // 4. 调用 send 方法,发送消息
  for (int i = 0; i < 5; i++) {
    kafkaProducer.send(new ProducerRecord<>("test","PHP " + i));
  }
 // 5. 关闭资源
  kafkaProducer.close();
 }
}

4)观察kafka消费者控制台是否接收到消息

[zrclass@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test
PHP 0
PHP 1
PHP 2
PHP 3
PHP 4

5)带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package com.zrclass.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {
 public static void main(String[] args) throws InterruptedException {
		// 1. 创建 kafka 生产者的配置对象
		Properties properties = new Properties();
		// 2. 给 kafka 配置对象添加配置信息
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
		"hadoop102:9092");
		// key,value 序列化(必须):key.serializer,value.serializer
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
		StringSerializer.class.getName());
		
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
		StringSerializer.class.getName());
		// 3. 创建 kafka 生产者对象
		KafkaProducer<String, String> kafkaProducer = new 
		KafkaProducer<String, String>(properties);
		// 4. 调用 send 方法,发送消息
		for (int i = 0; i < 5; i++) {
		 // 添加回调
		  kafkaProducer.send(new ProducerRecord<>("first", "PHP " + i), new Callback() {
		    // 该方法在 Producer 收到 ack 时调用,为异步调用
		    @Override
		    public void onCompletion(RecordMetadata metadata, Exception exception) {
			  if (exception == null) {
			  // 没有异常,输出信息到控制台
			  System.out.println(" 主题: " + metadata.topic() + "->" + "分区:" + metadata.partition());
			  } else {
			   // 出现异常打印
			   exception.printStackTrace();
			  }
		    }
		   });
		  // 延迟一会会看到数据发往不同分区
		  Thread.sleep(2);
		}
		// 5. 关闭资源
		kafkaProducer.close();
	} 
 }

3.生产者分区

(1)kafka分区优势

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-irf7Miq4-1682404805995)(分布式消息队列Kafka.assets/

(2)生产者发送消息的分区策略

1)默认的分区器 DefaultPartitioner

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {
 … …
}

2)生产者消息类ProduceRecord构造方法

<1>指定partition的情况下,直接发送消息到指定的partition分区

// 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)
 kafkaProducer.send(new ProducerRecord<>("test", 1,"","java " + i));

<2>没有指定partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值,例如:key1的hash值=5,key2的hash值=6,topic的partition数=2,那么key1对应的value1写入1号分区,key2对应的value2写入0号分区

// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0
 kafkaProducer.send(new ProducerRecord<>("test", "a","java " + i));

<3>没有指定分区也没有指定key,kafka会随机选择一个分区使用

<4>自定义分区

例如我们实现一个分区器实现,发送过来的数据中如果包含 java,就发往 0 号分区, 不包含 java,就发往 1 号分区。

实现步骤

1>定义类实现 Partitioner 接口。

2>重写 partition()方法。

package com.zrclass.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {
 /**
  * 返回信息对应的分区
  * @param topic 主题
  * @param key 消息的 key
  * @param keyBytes 消息的 key 序列化后的字节数组
  * @param value 消息的 value
  * @param valueBytes 消息的 value 序列化后的字节数组
  * @param cluster 集群元数据可以查看分区信息
  * @return
  */
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		// 获取消息
		String msgValue = value.toString();
		// 创建 partition
		int partition;
		// 判断消息是否包含 java
		if (msgValue.contains("java")){
		partition = 0;
		}else {
		partition = 1;
		}
		// 返回分区号
		return partition;
	}
	// 关闭资源
	@Override
	public void close() {
	}
	// 配置方法
	@Override
	public void configure(Map<String, ?> configs) {
	} 
 }

3>定义kafka配置信息时指定自定义分区

// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zrclass.kafka.producer.MyPartitioner");
 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

4.生产者如何提高吞吐量

合理调整以下参数的组合

• batch.size:批次大小,默认16k

• linger.ms:等待时间,默认0

• compression.type:压缩,默认 none,可配置值 gzip、snappy、 lz4 和 zstd

• RecordAccumulator:缓冲区大小,默认32M

5.生产者消息可靠性(消息发送不丢失)

(1)消息的ack机制

1)ACK应答级别

0:生产者发送过来的数据,不需要等数据落盘应答

1:生产者发送过来的数据,Leader收到数据后应答。

-1all:生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。

可靠性总结:

2)代码设置

package com.zrclass.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerAck {
	public static void main(String[] args) throws InterruptedException {
		// 1. 创建 kafka 生产者的配置对象
		Properties properties = new Properties();
		// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
		
		// key,value 序列化(必须):key.serializer,value.serializer
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
		StringSerializer.class.getName());
		
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
		StringSerializer.class.getName());
		// 设置 acks
		properties.put(ProducerConfig.ACKS_CONFIG, "all");
		// 重试次数 retries,默认是 int 最大值,2147483647
		properties.put(ProducerConfig.RETRIES_CONFIG, 3);
		// 3. 创建 kafka 生产者对象
		KafkaProducer<String, String> kafkaProducer = new 
		KafkaProducer<String, String>(properties);
		// 4. 调用 send 方法,发送消息
		for (int i = 0; i < 5; i++) {
			kafkaProducer.send(new ProducerRecord<>("first","java " + i));
		}
		// 5. 关闭资源
		kafkaProducer.close();
	}
}

6.生产者发送消息不重复

(1)ACK机制引入保证了消息不丢失,但没有处理消息重复的问题

至少一次(At Least Once= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

最多一次(At Most Once= ACK级别设置为0

总结:

At Least Once可以保证数据不丢失,但是不能保证数据不重复;

At Most Once可以保证数据不重复,但是不能保证数据不丢失。

• 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。

(2)幂等性

1)幂等性原理

幂等性 :就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

精确一次(Exactly Once=幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。

重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其

中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。

所以幂等性只能保证的是在单分区单会话内不重复。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2bapiMoo-1682404805998)(分布式消息队列Kafka.assets/image-20230424143930098.png)]

2)开启幂等性

开启参数 enable.idempotence 默认为 true,false 关闭。

(3)生产者事务

1)kafka事务api

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

2)生产者开启事务代码

package com.zrclass.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransactions {
 public static void main(String[] args) throws InterruptedException {
	// 1. 创建 kafka 生产者的配置对象
	Properties properties = new Properties();
	// 2. 给 kafka 配置对象添加配置信息
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
	// key,value 序列化
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
	
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
	// 设置事务 id(必须),事务 id 任意起名
	properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
	// 3. 创建 kafka 生产者对象
	KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
	// 初始化事务
	kafkaProducer.initTransactions();
	// 开启事务
	kafkaProducer.beginTransaction();
	try {
		// 4. 调用 send 方法,发送消息
		for (int i = 0; i < 5; i++) {
		// 发送消息
		kafkaProducer.send(new ProducerRecord<>("test", "java " + i));
		}
		// int i = 1 / 0;
		// 提交事务
		kafkaProducer.commitTransaction();
	} catch (Exception e) {
		// 终止事务
		kafkaProducer.abortTransaction();
	} finally {
		// 5. 关闭资源
		kafkaProducer.close();
	}
  }
 }

7.kafka在一定条件下保证单分区数据有序

有关分布式消息队列Kafka(二)- 生产者的更多相关文章

  1. Ruby Sinatra 配置用于生产和开发 - 2

    我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm

  2. ruby-on-rails - 如何在 Rails View 上显示错误消息? - 2

    我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c

  3. ruby-on-rails - 在 Rails 中调试生产服务器 - 2

    您如何在Rails中的实时服务器上进行有效调试,无论是在测试版/生产服务器上?我试过直接在服务器上修改文件,然后重启应用,但是修改好像没有生效,或者需要很长时间(缓存?)我也试过在本地做“脚本/服务器生产”,但是那很慢另一种选择是编码和部署,但效率很低。有人对他们如何有效地做到这一点有任何见解吗? 最佳答案 我会回答你的问题,即使我不同意这种热修补服务器代码的方式:)首先,你真的确定你已经重启了服务器吗?您可以通过跟踪日志文件来检查它。您更改的代码显示的View可能会被缓存。缓存页面位于tmp/cache文件夹下。您可以尝试手动删除

  4. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  5. ruby - 使用 Ruby 通过 Outlook 发送消息的最简单方法是什么? - 2

    我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=

  6. Ruby - 如何将消息长度表示为 2 个二进制字节 - 2

    我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi

  7. ruby-on-rails - 在 Flash 警报 Rails 3 中显示错误消息 - 2

    如果我在模型中设置验证消息validates:name,:presence=>{:message=>'Thenamecantbeblank.'}我如何让该消息显示在闪光警报中,这是我迄今为止尝试过的方法defcreate@message=Message.new(params[:message])if@message.valid?ContactMailer.send_mail(@message).deliverredirect_to(root_path,:notice=>"Thanksforyourmessage,Iwillbeintouchsoon")elseflash[:error]

  8. ruby-on-rails - 在 RSpec 中,如何以任意顺序期望具有不同参数的多条消息? - 2

    RSpec似乎按顺序匹配方法接收的消息。我不确定如何使以下代码工作:allow(a).toreceive(:f)expect(a).toreceive(:f).with(2)a.f(1)a.f(2)a.f(3)我问的原因是a.f的一些调用是由我的代码的上层控制的,所以我不能对这些方法调用添加期望。 最佳答案 RSpecspy是测试这种情况的一种方式。要监视一个方法,用allowstub,除了方法名称之外没有任何约束,调用该方法,然后expect确切的方法调用。例如:allow(a).toreceive(:f)a.f(2)a.f(1)

  9. ruby-on-rails - 闪存消息存储在哪里? - 2

    我以为它们存储在cookie中-但不,检查cookie没有任何结果。session也不存储它们。那么,我在哪里可以找到它们?我需要这个来直接设置它们(而不是通过flashhash)。 最佳答案 它们存储在inyoursessionstore.自rails2.0以来的默认设置是cookie存储,但请检查config/initializers/session_store.rb以检查您是否使用默认设置以外的东西。 关于ruby-on-rails-闪存消息存储在哪里?,我们在StackOverf

  10. ruby-on-rails - Ruby 长时间运行的进程对队列事件使用react - 2

    我有一个将某些事件写入队列的Rails3应用。现在我想在服务器上创建一个服务,每x秒轮询一次队列,并按计划执行其他任务。除了创建ruby​​脚本并通过cron作业运行它之外,还有其他稳定的替代方案吗? 最佳答案 尽管启动基于Rails的持久任务是一种选择,但您可能希望查看更有序的系统,例如delayed_job或Starling管理您的工作量。我建议不要在cron中运行某些东西,因为启动整个Rails堆栈的开销可能很大。每隔几秒运行一次它是不切实际的,因为Rails上的启动时间通常为5-15秒,具体取决于您的硬件。不过,每天这样做几

随机推荐