草庐IT

【Springboot】微服务学习笔记十:微服务项目整合Kafka实现文章上下架功能

赵四司机 2023-06-16 原文

个人简介: 

> 📦个人主页:赵四司机
> 🏆学习方向:JAVA后端开发 
> 📣种一棵树最好的时间是十年前,其次是现在!
> ⏰往期文章:SpringBoot项目整合微信支付
> 🧡喜欢的话麻烦点点关注喔,你们的支持是我的最大动力。

前言:

1.前面基于Springboot的单体项目介绍已经完结了,至于项目中的其他功能实现我这里就不打算介绍了,因为涉及的知识点不难,而且都是简单的CRUD操作,假如有兴趣的话可以私信我我再看看要不要写几篇文章做个介绍。

2.完成上一阶段的学习,我就投入到了微服务的学习当中,所用教程为B站上面黑马的微服务教程。由于我的记性不是很好,所以对于新事物的学习我比较喜欢做笔记以加强理解,在这里我会将笔记的重点内容做个总结发布到“微服务学习”笔记栏目中。我是赵四,一名有追求的程序员,希望大家能多多支持,能给我点个关注就更好了。

目录

一:Kafka消息发送快速入门

1.传递字符串消息

(1)发送消息

 (2)监听消息

(3)测试结果

2.传递对象消息

(1)修改生产者代码

(2)结果测试

二:功能引入

1.需求分析

2.逻辑分析

三:前期准备

1.引入依赖

2.定义常量

3.Kafka配置信息

四:代码实现

1.自媒体端

2.移动端


一:Kafka消息发送快速入门

1.传递字符串消息

(1)发送消息

创建一个Controller包并编写一个测试类用于发送消息

package com.my.kafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("hello")
    public String helloProducer(){
        kafkaTemplate.send("my-topic","Hello~");
        return "ok";
    }
}

 (2)监听消息

编写测试类用于接收消息:

package com.my.kafka.listener;

import org.junit.platform.commons.util.StringUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class HelloListener {
    @KafkaListener(topics = "my-topic")
    public void helloListener(String message) {
        if(StringUtils.isNotBlank(message)) {
            System.out.println(message);
        }
    }
}

(3)测试结果

打开浏览器输入localhost:9991/hello,然后到控制台查看消息,可以看到成功消息监听到并且进行了消费。

2.传递对象消息

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式:

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,这里不做介绍。

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式。

(1)修改生产者代码

@GetMapping("hello")
public String helloProducer(){
    User user = new User();
    user.setName("赵四");
    user.setAge(20);
    kafkaTemplate.send("my-topic", JSON.toJSONString(user));
    return "ok";
}

(2)结果测试

 可以看到成功接收都对象参数,后期要使用该对象只需要将其转换成User对象即可。

二:功能引入

1.需求分析

        发布文章之后,可能会由于文章出现某些错误或者其他原因,我们会在文章管理端实现文章的上下架功能(见下图),也即当管理端实现对文章下架之后移动端将不会再展示该文章,只有该文章重新被上架之后才能在移动端看到该文章信息。

​ 

2.逻辑分析

        后端接收到前端传过来的参数之后要先做一个校验,参数不为空才能继续往下执行,首先应该根据前端传过来的文章id(自媒体端文章id)查询自媒体数据库的文章信息并判断该文章是否已是发布状态,因为只有审核成功并成功发布了的文章才能进行上下架操作。自媒体端微服务对文章上下架状态进行修改之后便可以向Kafka发送一条消息,该消息为Map对象,里面存储的数据为移动端的文章id以及前端传过来的上下架参数enable,当然要将该Map对象转换成JSON字符串才能进行发送。

        文章微服务监听到Kafka发送过来的消息之后将JSON字符串转换成Map对象之后再获取相关参数对移动端文章的上下架状态进行修改。

三:前期准备

1.引入依赖

<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

2.定义常量

package com.my.common.constans;

public class WmNewsMessageConstants {

    public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}

3.Kafka配置信息

由于我是用Nacos来作为注册中心,所以配置信息放置在Nacos上面即可。

(1)自媒体端配置

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

(2)移动端配置

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

四:代码实现

1.自媒体端

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
 * 文章下架或上架
 * @param id
 * @param enable
 * @return
 */
@Override
public ResponseResult downOrUp(Integer id,Integer enable) {
    log.info("执行文章上下架操作...");
    if(id == null || enable == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }
    //根据id获取文章
    WmNews news = getById(id);
    if(news == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在");
    }
    //获取当前文章状态
    Short status = news.getStatus();
    if(!status.equals(WmNews.Status.PUBLISHED.getCode())) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非发布状态,不能上下架");
    }

    //更改文章状态
    news.setEnable(enable.shortValue());
    updateById(news);
    log.info("更改文章上架状态{}-->{}",status,news.getEnable());

    //发送消息到Kafka
    Map<String, Object> map = new HashMap<>();
    map.put("articleId",news.getArticleId());
    map.put("enable",enable.shortValue());
    kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
    log.info("发送消息到Kafka...");

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

2.移动端

(1)设置监听器

package com.my.article.listener;

import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.my.article.service.ApArticleService;
import com.my.common.constans.WmNewsMessageConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;


@Slf4j
@Component
public class EnableListener {
    @Autowired
    private ApArticleService apArticleService;

    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void downOrUp(String message) {
        if(StringUtils.isNotBlank(message)) {
            log.info("监听到消息{}",message);
            apArticleService.downOrUp(message);
        }
    }
}

(2)获取消息并修改文章状态

/**
* 文章上下架
* @param message
* @return
*/
@Override
public ResponseResult downOrUp(String message) {
    Map map = JSON.parseObject(message, Map.class);
    //获取文章id
    Long articleId = (Long) map.get("articleId");
    //获取文章待修改状态
    Integer enable = (Integer) map.get("enable");
    //查询文章配置
    ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne
            (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId));
    if(apArticleConfig != null) {
        //上架
        if(enable == 1) {
            log.info("文章重新上架");
            apArticleConfig.setIsDown(false);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
        //下架
        if(enable == 0) {
            log.info("文章下架");
            apArticleConfig.setIsDown(true);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
    }
    else {
        throw new RuntimeException("文章信息不存在");
    }
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

有关【Springboot】微服务学习笔记十:微服务项目整合Kafka实现文章上下架功能的更多相关文章

  1. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  2. ruby - 具有身份验证的私有(private) Ruby Gem 服务器 - 2

    我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..

  3. ruby-on-rails - 启动 Rails 服务器时 ImageMagick 的警告 - 2

    最近,当我启动我的Rails服务器时,我收到了一长串警告。虽然它不影响我的应用程序,但我想知道如何解决这些警告。我的估计是imagemagick以某种方式被调用了两次?当我在警告前后检查我的git日志时。我想知道如何解决这个问题。-bcrypt-ruby(3.1.2)-better_errors(1.0.1)+bcrypt(3.1.7)+bcrypt-ruby(3.1.5)-bcrypt(>=3.1.3)+better_errors(1.1.0)bcrypt和imagemagick有关系吗?/Users/rbchris/.rbenv/versions/2.0.0-p247/lib/ru

  4. ruby-on-rails - s3_direct_upload 在生产服务器中不工作 - 2

    在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo

  5. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  6. ruby - 用 Ruby 编写一个简单的网络服务器 - 2

    我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b

  7. ruby-on-rails - 在 Rails 中调试生产服务器 - 2

    您如何在Rails中的实时服务器上进行有效调试,无论是在测试版/生产服务器上?我试过直接在服务器上修改文件,然后重启应用,但是修改好像没有生效,或者需要很长时间(缓存?)我也试过在本地做“脚本/服务器生产”,但是那很慢另一种选择是编码和部署,但效率很低。有人对他们如何有效地做到这一点有任何见解吗? 最佳答案 我会回答你的问题,即使我不同意这种热修补服务器代码的方式:)首先,你真的确定你已经重启了服务器吗?您可以通过跟踪日志文件来检查它。您更改的代码显示的View可能会被缓存。缓存页面位于tmp/cache文件夹下。您可以尝试手动删除

  8. ruby-on-rails - Cucumber 是否只是 rspec 的包装器以帮助将测试组织成功能? - 2

    只是想确保我理解了事情。据我目前收集到的信息,Cucumber只是一个“包装器”,或者是一种通过将事物分类为功能和步骤来组织测试的好方法,其中实际的单元测试处于步骤阶段。它允许您根据事物的工作方式组织您的测试。对吗? 最佳答案 有点。它是一种组织测试的方式,但不仅如此。它的行为就像最初的Rails集成测试一样,但更易于使用。这里最大的好处是您的session在整个Scenario中保持透明。关于Cucumber的另一件事是您(应该)从使用您的代码的浏览器或客户端的角度进行测试。如果您愿意,您可以使用步骤来构建对象和设置状态,但通常您

  9. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  10. Observability:从零开始创建 Java 微服务并监控它 (二) - 2

    这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/

随机推荐