草庐IT

Java代码实现kafka的消息生产与消费

Fairy要carry 2023-04-11 原文

目录

生产者的同步发送消息

异步发送

生产端ack的配置

发送消息缓冲机制

 代码实现

生产者消费者实例


生产者的同步发送消息

 生产者发送消息到我们的topic分区上,需要等待我们kafka返回的ack,如果没有返回就会进入3s的阻塞,retry3次——>抛出异常(这里面我们可以将信息记录到文件日志中)

(43条消息) NIO学习_Fairy要carry的博客-CSDN博客

package com.wyh.kafka_demo.kafka;

import com.alibaba.fastjson.JSON;
import com.wyh.kafka_demo.pojo.Order;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;


public class SimpleProducer {
    /**
     * 1.创建主题
     */
    private final static String TOPIC_NAME = "jqTopic5";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /**
         * 2.kafka的broker
         */
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "82.157.198.247:9092,82.157.198.247:9093");

        /**
         * 2.1序列化:Key+value
         */
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        /**
         * 封装到发消息的客户端中
         */
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        /**
         * 3.模拟发送五条消息
         */
        int maxNum = 5;

        /**
         * 3.1创建消息
         * hash(key)%partition
         * ProducerRecord:消息记录
         * key:消息标识key,帮助告诉发送到哪个分许通过hash计算
         * value:json数据,消息内容
         */
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0,"TestKey", "hello,kafka");

        /**
         * 3.2通过异步的形式发送消息
         */
        RecordMetadata recordMetadata = producer.send(producerRecord).get();
        System.out.println("同步方式得到的消息:" + "topic-" + recordMetadata.topic()
                + "|partition-" + recordMetadata.partition()
                + "|offset-" + recordMetadata.offset());

    }

}

异步发送

我们可以利用异步回调,生产者不管你消费者有没有消费发送完就往下执行自己的了;当我们拿到数据后通过回调机制异步返回(比如zk的watch机制就是这样)

(43条消息) Zookeeper_Fairy要carry的博客-CSDN博客

优点:速度较快,像银行转账业务异步就毕竟好,反馈较快

缺点:可能会出现消息丢失的情况,例如我们rabbitmq可以利用spring的retry机制解决,另外分布式事务也是需要解决的

(43条消息) RabbitMQ的高级特性(消息可靠性)_Fairy要carry的博客-CSDN博客

(43条消息) MQ-消息延迟_Fairy要carry的博客-CSDN博客_mq 延迟消息

像消息堆积我们kafka不需要考虑这方面,消息放在硬盘中

生产端ack的配置

1.两个broker上创建2个分区配上两个副本 

bin/kafka-topics.sh --bootstrap-server xxxx:9092,xxxx:9093 --create --partitions 2 --replication-factor 2 --topic jqTopic5

2.生产者配置 

bin/kafka-console-producer.sh --broker-list xxxx:9092,xxxx:9093 --topic jqTopic5

同步发送的前提,生产者在获取ack前会一直阻塞:三种ack 

ack=0:生产者只需要把消息给到broker,而不需要到partition中kafka就会把ack返回给生产者,速度较快容易丢消息

ack=1:多副本都接收到消息(leader进行同步),并且将消息传到log中kafka就返回ack

ack=all:leader和follower同步完后才会将ack返回

发送消息缓冲机制

生产者并不是直接把消息推送给kafka的——>利用了消息缓冲的机制,kafka本地线程默认会创建一个缓冲区 ,用来存放发送的数据(先放到缓冲区,当到达16k就进行发送)

如果没有满足16k,就等待10ms再发送消息

 代码实现

   /**
         * 2.kafka的broker
         */
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"82.157.198.247:9092,82.157.198.247:9093");

        /**
         * 2.1发送消息持久化机制参数ack
         * 生产者会等3s,然后重试3次
         */
        properties.put(ProducerConfig.ACKS_CONFIG,"1");

        /**
         * 2.12重试间隔的设置
         * 默认间隔为100ms,次数为3次
         */
        properties.put(ProducerConfig.RETRIES_CONFIG,3);
        properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);

        /**
         * 2.2设置消息的缓冲区
         * 消息先放到缓冲区,如果到16k就发送,未满足则进行等待10ms再发送
         */
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        /**
         * 2.3序列化:Key+value
         */
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        /**
         * 封装到发消息的客户端中
         */
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

生产者消费者实例

生产者代码

package com.wyh.kafka_demo.kafka;

import com.alibaba.fastjson.JSON;
import com.wyh.kafka_demo.pojo.Order;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


public class MyProducer {
    /**
     * 1.创建主题
     */
    private final static String TOPIC_NAME="jqTopic5";

    public static void main(String[] args) throws InterruptedException {
        /**
         * 2.kafka的broker
         */
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"82.157.198.247:9092,82.157.198.247:9093");

        /**
         * 2.1发送消息持久化机制参数ack
         * 生产者会等3s,然后重试3次
         */
        properties.put(ProducerConfig.ACKS_CONFIG,"1");

        /**
         * 2.12重试间隔的设置
         * 默认间隔为100ms,次数为3次
         */
        properties.put(ProducerConfig.RETRIES_CONFIG,3);
        properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);

        /**
         * 2.2设置消息的缓冲区
         * 消息先放到缓冲区,如果到16k就发送,未满足则进行等待10ms再发送
         */
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        /**
         * 2.3序列化:Key+value
         */
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        /**
         * 封装到发消息的客户端中
         */
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        /**
         * 3.模拟发送五条消息
         */
        int maxNum=5;
        final CountDownLatch countDownLatch = new CountDownLatch(maxNum);
        for (int i = 1; i <= 5; i++) {
            Order order = new Order((long) i, i);
            /**
             * 3.1消息记录发送给指定的分区
             * hash(key)%partition
             * ProducerRecord:消息记录
             * key:消息标识key,帮助告诉发送到哪个分许通过hash计算
             * value:json数据,消息内容
             */
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));

            /**
             * 3.2通过异步的形式发送消息
             */
            producer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(e!=null){
                        System.err.println("发送消息失败:"+e.getStackTrace());
                    }
                    if(recordMetadata!=null){
                        System.out.println("异步发送的消息为:"+"topic-"+recordMetadata.topic()
                                +"|partition-"+recordMetadata.partition()
                        +"|offset-"+recordMetadata.offset());
                    }
                    countDownLatch.countDown();
                }
            });
        }

        /**
         * 4.主线程等待countDownlatch
         */
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.close();
    }

}

消费者代码:

 

package com.wyh.kafka_demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    private final static String TOPIC_NAME="jqTopic5";
    private final static String CONSUMER_GROUP_NAME="testGroup";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"82.157.198.247:9092,82.157.198.247:9093");
        /**
         * 1.消费分组名
         */
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"0");

        /**
         * 1.2设置序列化
         */
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

        /**
         * 2.创建一个消费者的客户端
         */
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        /**
         * 3.消费者订阅主题列表
         */
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while(true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("收到的消息:partition= %d,offset= %d,key= %s,value=%s %n",record.partition(),
                        record.offset(),record.key(),record.value());
            }
        }
    }
}

有关Java代码实现kafka的消息生产与消费的更多相关文章

  1. ruby - 如何在 buildr 项目中使用 Ruby 代码? - 2

    如何在buildr项目中使用Ruby?我在很多不同的项目中使用过Ruby、JRuby、Java和Clojure。我目前正在使用我的标准Ruby开发一个模拟应用程序,我想尝试使用Clojure后端(我确实喜欢功能代码)以及JRubygui和测试套件。我还可以看到在未来的不同项目中使用Scala作为后端。我想我要为我的项目尝试一下buildr(http://buildr.apache.org/),但我注意到buildr似乎没有设置为在项目中使用JRuby代码本身!这看起来有点傻,因为该工具旨在统一通用的JVM语言并且是在ruby中构建的。除了将输出的jar包含在一个独特的、仅限ruby​​

  2. ruby-on-rails - Rails 源代码 : initialize hash in a weird way? - 2

    在rails源中:https://github.com/rails/rails/blob/master/activesupport/lib/active_support/lazy_load_hooks.rb可以看到以下内容@load_hooks=Hash.new{|h,k|h[k]=[]}在IRB中,它只是初始化一个空哈希。和做有什么区别@load_hooks=Hash.new 最佳答案 查看rubydocumentationforHashnew→new_hashclicktotogglesourcenew(obj)→new_has

  3. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  4. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  5. Ruby Sinatra 配置用于生产和开发 - 2

    我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm

  6. ruby-on-rails - 如何在 Rails View 上显示错误消息? - 2

    我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c

  7. ruby-on-rails - 浏览 Ruby 源代码 - 2

    我的主要目标是能够完全理解我正在使用的库/gem。我尝试在Github上从头到尾阅读源代码,但这真的很难。我认为更有趣、更温和的踏脚石就是在使用时阅读每个库/gem方法的源代码。例如,我想知道RubyonRails中的redirect_to方法是如何工作的:如何查找redirect_to方法的源代码?我知道在pry中我可以执行类似show-methodmethod的操作,但我如何才能对Rails框架中的方法执行此操作?您对我如何更好地理解Gem及其API有什么建议吗?仅仅阅读源代码似乎真的很难,尤其是对于框架。谢谢! 最佳答案 Ru

  8. ruby - 模块嵌套代码风格偏好 - 2

    我的假设是moduleAmoduleBendend和moduleA::Bend是一样的。我能够从thisblog找到解决方案,thisSOthread和andthisSOthread.为什么以及什么时候应该更喜欢紧凑语法A::B而不是另一个,因为它显然有一个缺点?我有一种直觉,它可能与性能有关,因为在更多命名空间中查找常量需要更多计算。但是我无法通过对普通类进行基准测试来验证这一点。 最佳答案 这两种写作方法经常被混淆。首先要说的是,据我所知,没有可衡量的性能差异。(在下面的书面示例中不断查找)最明显的区别,可能也是最著名的,是你的

  9. ruby - 寻找通过阅读代码确定编程语言的ruby gem? - 2

    几个月前,我读了一篇关于ruby​​gem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:

  10. ruby-on-rails - 在 Rails 中调试生产服务器 - 2

    您如何在Rails中的实时服务器上进行有效调试,无论是在测试版/生产服务器上?我试过直接在服务器上修改文件,然后重启应用,但是修改好像没有生效,或者需要很长时间(缓存?)我也试过在本地做“脚本/服务器生产”,但是那很慢另一种选择是编码和部署,但效率很低。有人对他们如何有效地做到这一点有任何见解吗? 最佳答案 我会回答你的问题,即使我不同意这种热修补服务器代码的方式:)首先,你真的确定你已经重启了服务器吗?您可以通过跟踪日志文件来检查它。您更改的代码显示的View可能会被缓存。缓存页面位于tmp/cache文件夹下。您可以尝试手动删除

随机推荐