本篇知识点沿用知识点15的项目,为大家介绍spring boot如何连接kafka,本章有些长请耐心看完。没有kafka集群的去我主页找各类型大数据集群搭建文档–>大数据原生集群本地测试环境搭建三
第一步:首先导入pom依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
第二步:修改spring boot配置文件
#kafka集群地址
spring.kafka.bootstrap-servers=192.168.88.186:9092,192.168.88.187:9092,192.168.88.188:9092
#生产者配置
#系列化方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#重试次数
spring.kafka.producer.retries=0
#采用的ack机制
spring.kafka.producer.acks=1
#批量提交的数据大小 16kb
spring.kafka.producer.batch-size=16384
#生产者暂存数据的缓冲区大小
spring.kafka.producer.buffer-memory=33554432
#消费者配置
#是否自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
#消费消息后间隔多长时间提交偏移量
spring.kafka.consumer.auto-commit-interval=100
#默认的消费者组,代码中可以热键修改
spring.kafka.consumer.group-id=test
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset=latest
#系列化方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
第三步:我们建立一个测试类,来认识如何使用生产者
package com.wy.scjg;
import com.wy.scjg.bean.User;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import javax.annotation.Resource;
@SpringBootTest
public class KfKTest {
@Resource
private KafkaTemplate kafkaTemplate;
@Test
void pro_test(){
User user = new User();
user.setName("张三");
user.setAge(20);
kafkaTemplate.send("test",user.toString());
}
}
我们在服务器端开一个消费者,消费test主题的消息,集群地址改你自己的
./bin/kafka-console-consumer.sh --bootstrap-server hdp1:9092,hdp2:9092,hdp3:9092 --topic test
我们运行测试类,看效果

通过效果,我们可以看到数据成功发送
第四步:我们要知道spring boot整合的kafka为我们提供了两种可供选择的生产方式,上一步是其一叫做异步发送,也是默认的发送方式,另一种是同步发送,区别就在于异步发送是生产者消息发送到集群后一边等集群成功收到消息的回馈一边发送下一条,同步发送是发送后不再去马上准备下一条,而是等收到集群反馈的成功消息才准备下一条,下面我们看一下如何使用同步发送,修改测试类的发送方法
@Test
void pro_test() throws ExecutionException, InterruptedException, TimeoutException {
User user = new User();
user.setName("张三");
user.setAge(20);
//同步发送其实就是发送时强制监听结果
ListenableFuture<SendResult<String, Object>> sendResult = kafkaTemplate.send("test", user.toString());
//开始监听,设置一个时间,超过后放弃此处监听
SendResult<String, Object> result = sendResult.get(3, TimeUnit.SECONDS);
System.out.println("监听到的结果-------"+result.getProducerRecord().value());
}
再次运行,我们可以看到,集群再次收到了消息

控制台也监听到了结果

第五步:生产者我们会用了,我们现在看看消费者如何使用,消费者必须要框架的支撑,因此我们不在使用测试类,我们让它以一个配置类的形式存在,当然大家如果愿意也可以新建一个独立的包存放,然后使用@Component注解向框架注册
package com.wy.scjg.config;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class ConsumerConfig {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
//不指定group,默认取spring boot里配置的
@KafkaListener(id = "test1", topics = "test")
public void listen(String mes) {
logger.info("我收到的数据是-------------"+mes);
}
}
我们在服务器开一个生产者
./bin/kafka-console-producer.sh --broker-lisdp1:9092,hdp2:9092,hdp3:9092 --topic test
运行项目,生产者发生消息,看控制台的输出

但是消费者还需要两个东西,这里提供给大家
package com.wy.scjg.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
@Configuration
public class ConsumerConfig {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
//不指定group,默认取spring boot里配置的
@KafkaListener(id = "test1", topics = "test")
public void listen(String mes) {
logger.info("我收到的数据是-------------"+mes);
}
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Bean
public NewTopic topic() {
//主题 分区数 副本数
return new NewTopic("test", 1, (short) 1);
}
}
我这里说一下这两个东西有什么用,NewTopic 是消费一个没有的主题时框架会创建,一般工作中如果用到了,切记副本数一定要按需求做更改,一般不可能是1,RecordMessageConverter是很早之前我碰到一个没有这个bean的bug,所以大家如果也遇到了可以怎么写一个
但其实消费者不止这一种写法,你还可以写成下面这个样子
@Component
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
//不指定group,默认取spring boot配置文件里面的
@KafkaListener(topics = {"test"})
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("message:{}", msg);
}
}
}
第六步:前面我们在发送消息时对同步的发送做了监听,那大家有没有想过,异步发送这么监听?有人会说同步监听不就相当于异步监听了吗,但本质上是不一样的,异步优点就是发送消息不会由于监听结果而造成阻塞,所以这个时候也就需要去注册一个异步监听的配置类
package com.wy.scjg.config;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Configuration
public class ProSendYBLen {
private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class);
@Resource
KafkaTemplate kafkaTemplate;
//配置监听
@PostConstruct
private void listener() {
kafkaTemplate.setProducerListener(new ProducerListener() {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
logger.info("我已经接收到消息-----message={}", producerRecord.value());
}
@Override
public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
logger.error("接收失败--------message={}", producerRecord.value());
}
});
}
}
想要看到效果,我们就需要有一个生产者的Controller,因为如果你还使用测试类的话生产者发送完消息后测试程序就结束了,和项目没关联的
package com.wy.scjg.controller;
import com.wy.scjg.bean.User;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@Controller
@RequestMapping("/kfk")
public class KfkController {
@Resource
private KafkaTemplate kafkaTemplate;
@RequestMapping("/send")
@ResponseBody
public String pro_test() throws ExecutionException, InterruptedException, TimeoutException {
User user = new User();
user.setName("张三");
user.setAge(20);
kafkaTemplate.send("test", user.toString());
return "发送完成";
}
}
注意如果你是跟着知识点走的,那就把拦截器设置一下,不然影响演示

然后运行项目,对发送消息的kafka控制器发出请求,之后你会在控制台中发现如下输出

到此我希望大家不要钻牛角尖,去考虑消费者监听怎么写,消费者进程本身就是个监听器,没有说像生产者一样再去写一个的说法
第七步:这一步是个理论知识,可以跳过
我们知道生产者和消费者是有系列化方式的,一般默认用org.apache.kafka.common.serialization.StringDeserializer,同时数据的发送一般都是字符串就足以满足需求,很少用到其他的类型,但是我们要知道系列化方式不止有String,还有Long等等的其他类型,大家可以看下序列化包下的类,如下图

在网络上你会看到自己写系列化类的,比如我想要发出去的数据本身就是JSON格式,那就自定义一个生产者用的系列化类,实现Serializable接口,重写public byte[] serialize(String s, Object o) {}方法,然后在有一个消费者用的反系列化类,实现Deserializer接口,重写public Object deserialize(String s, byte[] bytes) 方法,最后将这两个类配置在spring boot的kafka配置里
这一套理论上是可行的,但是实际开发中我们不会去做,因为没有必要,一个搞不好出问题还不好解决,同时之所以说是理论上可行,还因为Serializable和Deserializer接口的方法编译不让重写会报错
第八步:在生产和消费的时候,我们可以指定发送到那个分区下,想要指定分区有两种方式
#直接指定0分区
kafkaTemplate.send("test", 0, key, "");
#不指定数字分区时,按照key的hash决定分区
kafkaTemplate.send("test", key, "")
消费的时候你也可以指定消费的分区,不过只能直接指定
@KafkaListener(topics = {"test"},topicPattern = "0")
你如果想用给自定义的也可以,那你就需要自定义一个分区的策略类
package com.wy.scjg.config;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
@Override
public int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
String keyStr = key+"";
if (keyStr.startsWith("0")){
return 0;
}else {
return 1;
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
然后自定义一个Kafka的配置类
package com.wy.scjg.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MyPartitionTemplate {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
KafkaTemplate kafkaTemplate;
@PostConstruct
public void setKafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//注意分区器在这里!!!
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
}
public KafkaTemplate getKafkaTemplate(){
return kafkaTemplate;
}
}
解释一下PostConstruct的作用,它用来标注一个非静态的方法,用来在spring boot注入一个对象后被调用,对该对象做配置
这样你就不能直接用KafkaTemplate了,你要注入你自定义的MyPartitionTemplate,我们用前面介绍生产者时的测试类修改一下
package com.wy.scjg;
import com.wy.scjg.config.MyPartitionTemplate;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@SpringBootTest
public class KfKTest {
@Autowired
private MyPartitionTemplate myPartitionTemplate;
@Test
void pro_test() throws ExecutionException, InterruptedException, TimeoutException {
myPartitionTemplate.getKafkaTemplate().send("test",0,"0", "0开头的数据");
}
}
同时你的消费者配置类也要改一下,我改的时候为了和前面呼应,我将消费者改成了另一种写法,大家注意一下
package com.wy.scjg.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import java.util.Optional;
@Configuration
public class ConsumerConfig {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(id = "test1", topics = "test" ,topicPattern = "0")
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("partition=0,message:{}", msg);
}
}
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Bean
public NewTopic topic() {
//主题 分区数 副本数
return new NewTopic("test", 1, (short) 1);
}
}
此时运行项目,通过测试类向不同分区发送消息,看控制台的输出,你就会发现,你只能接收到0分区的内容

第九步:说完分区,我们再说一下消费者组,你可以为了提高效率,在消费者中写多个方法接收数据,如下面这样
//组1,消费者1
@KafkaListener(topics = {"test"},groupId = "group1")
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("group:group1-1 , message:{}", msg);
}
}
//组1,消费者2
@KafkaListener(topics = {"test"},groupId = "group1")
public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("group:group1-2 , message:{}", msg);
}
}
//组2,只有一个消费者
@KafkaListener(topics = {"test"},groupId = "group2")
public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("group:group2 , message:{}", msg);
}
}
注意如果你的主题分区数大于同组下消费者数,则正常按照消费者策略划分,但如果你的同组消费者数大于主题分区数,那就会出现空闲的消费者进程,这种情况你最好做处理,即使在正式工作开发中,也不要有这种情况,因为说不准空闲的消费者进程就会导致任务阻塞超时,接收不到数据就会认为网络卡顿,一直在那等着,不给你返回结果,处理方式要不对主题加分区,要不就是生成新的主题
第十步:最后我们说一下偏移量,一般情况下,项目开发用的是自动提交偏移量,但是有的时候不排除让你手动提交偏移量,这个时候也就需要自定义配置类了,和自定义发送分区差不多
首先你要有一个配置类,主要作用是修改手动提交的配置
@Configuration
public class MyOffsetConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 注意这里!!!设置手动提交
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
// ack模式:
// AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:
//
// RECORD
// 每处理一条commit一次
//
// BATCH(默认)
// 每次poll的时候批量提交一次,频率取决于每次poll的调用频率
//
// TIME
// 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
//
// COUNT
// 累积达到ackCount次的ack去commit
//
// COUNT_TIME
// ackTime或ackCount哪个条件先满足,就commit
//
// MANUAL
// listener负责ack,但是背后也是批量上去
//
// MANUAL_IMMEDIATE
// listner负责ack,每调用一次,就立即commit
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
然后再消费者中你就可以在消费者中选择一种下列代码中核实你的一种写法
@Component
public class MyOffsetConsumer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@KafkaListener(topics = "test", groupId = "myoffset-group-1", containerFactory = "manualKafkaListenerContainerFactory")
public void manualCommit(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer consumer,
Acknowledgment ack) {
logger.info("手动提交偏移量 , partition={}, msg={}", partition, message);
// 同步提交
consumer.commitSync();
//异步提交
//consumer.commitAsync();
// ack提交也可以,会按设置的ack策略走(参考MyOffsetConfig.java里的ack模式)
// ack.acknowledge();
}
@KafkaListener(topics = "test", groupId = "myoffset-group-2", containerFactory = "manualKafkaListenerContainerFactory")
public void noCommit(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer consumer,
Acknowledgment ack) {
logger.info("忘记提交偏移量, partition={}, msg={}", partition, message);
// 不做commit!这里如果你要用一定要提交,这个方法是用来做不提交,会重复消费的测试的
}
/**
* 现实状况:
* commitSync和commitAsync组合使用
* <p>
* 手工提交异步 consumer.commitAsync();
* 手工同步提交 consumer.commitSync()
* <p>
* commitSync()方法提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,
* commitSync()会一直重试,但是commitAsync()不会。
* <p>
* 一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题
* 因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。
* 但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。否则就会造成重复消费
* 因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。
*/
@KafkaListener(topics = "test", groupId = "myoffset-group-3",containerFactory = "manualKafkaListenerContainerFactory")
public void manualOffset(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer consumer,
Acknowledgment ack) {
try {
logger.info("同步异步搭配 , partition={}, msg={}", partition, message);
//先异步提交
consumer.commitAsync();
//继续做别的事
} catch (Exception e) {
System.out.println("commit failed");
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
}
/**
* 甚至可以手动提交,指定任意位置的偏移量
* 不推荐日常使用!!!
*/
@KafkaListener(topics = "test", groupId = "myoffset-group-4",containerFactory = "manualKafkaListenerContainerFactory")
public void offset(ConsumerRecord record, Consumer consumer) {
logger.info("手动指定任意偏移量, partition={}, msg={}", record.partition(), record);
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
currentOffset.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(currentOffset);
}
}
到此spring boot整合kafka就介绍完了,最后在强调一遍,如果关闭了自动提交偏移量,那一定要手动提交,并且推荐使用manualOffset这种提交方式
我还要给各位开发者提一个醒,千万要分清直接使用kafka和spring boot整合kafka这两种开发手段的区别,他们两个虽然原理相同,但是由于是两个相对独立的东西,所以在配置和使用以及理解上千万不要混为一谈,就比如最开始说的spring boot整合kafka的同步监听,这个只是恰巧spring boot给你提供了这种可操作手段,和kafka本身的ack是互不影响的两个东西
到此本篇知识点讲解结束,此外 本次整体讲解的spring boot项目已上传github
在VMware16.2.4安装Ubuntu一、安装VMware1.打开VMwareWorkstationPro官网,点击即可进入。2.进入后向下滑动找到Workstation16ProforWindows,点击立即下载。3.下载完成,文件大小615MB,如下图:4.鼠标右击,以管理员身份运行。5.点击下一步6.勾选条款,点击下一步7.先勾选,再点击下一步8.去掉勾选,点击下一步9.点击下一步10.点击安装11.点击许可证12.在百度上搜索VM16许可证,复制填入,然后点击输入即可,亲测有效。13.点击完成14.重启系统,点击是15.双击VMwareWorkstationPro图标,进入虚拟机主
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭9年前。我最近开始学习Ruby,这是我的第一门编程语言。我对语法感到满意,并且我已经完成了许多只教授相同基础知识的教程。我已经写了一些小程序(包括我自己的数组排序方法,在有人告诉我谷歌“冒泡排序”之前我认为它非常聪明),但我觉得我需要尝试更大更难的东西来理解更多关于Ruby.关于如何执行此操作的任何想法?
深度学习12.CNN经典网络VGG16一、简介1.VGG来源2.VGG分类3.不同模型的参数数量4.3x3卷积核的好处5.关于学习率调度6.批归一化二、VGG16层分析1.层划分2.参数展开过程图解3.参数传递示例4.VGG16各层参数数量三、代码分析1.VGG16模型定义2.训练3.测试一、简介1.VGG来源VGG(VisualGeometryGroup)是一个视觉几何组在2014年提出的深度卷积神经网络架构。VGG在2014年ImageNet图像分类竞赛亚军,定位竞赛冠军;VGG网络采用连续的小卷积核(3x3)和池化层构建深度神经网络,网络深度可以达到16层或19层,其中VGG16和VGG
开门见山|拉取镜像dockerpullelasticsearch:7.16.1|配置存放的目录#存放配置文件的文件夹mkdir-p/opt/docker/elasticsearch/node-1/config#存放数据的文件夹mkdir-p/opt/docker/elasticsearch/node-1/data#存放运行日志的文件夹mkdir-p/opt/docker/elasticsearch/node-1/log#存放IK分词插件的文件夹mkdir-p/opt/docker/elasticsearch/node-1/plugins若你使用了moba,直接右键新建即可如上图所示依次类推创建
如果您希望在Spring中启用定时任务功能,则需要在主类上添加 @EnableScheduling 注解。这样Spring才会扫描 @Scheduled 注解并执行定时任务。在大多数情况下,只需要在主类上添加 @EnableScheduling 注解即可,不需要在Service层或其他类中再次添加。以下是一个示例,演示如何在SpringBoot中启用定时任务功能:@SpringBootApplication@EnableSchedulingpublicclassApplication{publicstaticvoidmain(String[]args){SpringApplication.ru
软件特点部署后能通过浏览器查看线上日志。支持Linux、Windows服务器。采用随机读取的方式,支持大文件的读取。支持实时打印新增的日志(类终端)。支持日志搜索。使用手册基本页面配置路径配置日志所在的目录,配置后按回车键生效,下拉框选择日志名称。选择日志后点击生效,即可加载日志。windows路径E:\java\project\log-view\logslinux路径/usr/local/XX历史模式历史模式下,不会读取新增的日志。针对历史文件可以分页读取,配置分页大小、跳转。历史模式下,支持根据关键词搜索。目前搜索引擎使用的是jdk自带类库,搜索速度相对较低,优点是比较简单。2G日志全文搜
1.依赖导入org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-validation2.validation常用注解@Null被注释的元素必须为null@NotNull被注释的元素不能为null,可以为空字符串@AssertTrue被注释的元素必须为true@AssertFalse被注释的元素必须为false@Min(value)被注释的元素必须是一个数字,其值必须大于等于指定的最小值@Max(value)被注释的元素必须是一个数字,其值必须小于等于指定的最大值@D
我正在尝试找到一种更好的方法将IRB与我的常规ruby开发集成。目前我很少在我的代码中使用IRB。我只用它来验证语法或尝试一些小的东西。我知道我可以将我自己的代码加载到ruby中作为一个require'mycode'但这通常不符合我的编程风格。有时我要检查的变量超出范围或在循环内。有没有一种简单的方法可以启动我的脚本并在IRB内的某个点卡住?我想我正在寻找一种更简单的方法来调试我的ruby代码而不破坏我的F5(编译)键。也许有经验的ruby开发者可以和我分享一个更精简的开发方法。 最佳答案 安装ruby-debugg
我开始了一个小型网络项目并使用Drupal来构建它。到目前为止,还不错:您可以快速建立一个不错的面向CMS的网站,通过模块添加社交功能,并且您有一个广泛的API可以在一个架构良好的平台中进行自定义。现在问题来了:网站的增长超出了最初的计划,我发现自己正处于认真开始为它编写代码的境地。由于Drupal项目,我对PHP有了新的认识,但我想用Ruby来做。我会感觉更舒服,以后维护起来更容易,我可以在其他Ruby/Rails应用程序中重用它。随着时间的推移,我想我会用Ruby重写Drupal中的现有部分。基于此,问题是:是否有人将两者(成功或失败的故事)结合起来?这是一个相当大的决定,但我在G
Iparking停车收费管理系统-可商用介绍Iparking是一款基于springBoot的停车收费管理系统,支持封闭车场和路边车场,支持微信支付宝多种支付渠道,支持多种硬件,涵盖了停车场管理系统的所有基础功能。技术栈Springboot,MybatisPlus,Beetl,Mysql,Redis,RabbitMQ,UniApp功能云端功能序号模块功能描述1系统管理菜单管理配置系统菜单2系统管理组织管理管理组织机构3系统管理角色管理配置系统角色,包含数据权限和功能权限配置4系统管理用户管理管理后台用户5系统管理租户管理多租户管理6系统管理公众号配置租户公众号配置7系统管理操作日志审计日志8系统