草庐IT

图文并茂!深入了解RocketMQ的过期删除机制

Leo聊技术 2023-03-28 原文
大家好,我是Leo。

今天聊一下RocketMQ的文件过期删除机制

本章概括

源码定位

Broker是RocketMQ的核心,提供了消息的接收,存储,拉取等功能

我们可以先从Broker服务入手。从源码可以得知。RocketMQ启用了一个 ​​BrokerController​​​ 的 ​​start​​ 函数

public static void main(String[] args) {
start(createBrokerController(args));
}

public static BrokerController start(BrokerController controller) {
try {
controller.start();

String tip = "The broker[";
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
// 日志拼接
}

log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}
下列是​​start​​​ 函数启动的异步线程,他启动了一个 ​​messageStore​

public void start() throws Exception {
if (this.messageStore != null) {
this.messageStore.start();
}
}
从 ​​messageStore.start()​​ 函数进入后会有一个消息存储的第三方接口。

public interface MessageStore {

/**
* Load previously stored messages.
*
* @return true if success; false otherwise.
*/
boolean load();

/**
* Launch this message store.
*
* @throws Exception if there is any error.
*/
void start() throws Exception;
}
继续围绕 ​​start​​​ 函数展开实现类查找,可以看到最终由 ​​DefaultMessageStore​​ 实现类实现

定位到具体问题之后,可以看到 ​​start​​​ 调用了一个 ​​addScheduleTask​​ 函数

这个函数主要处理的就是清除过期日志服务。

public void start() throws Exception {
//刷新ConsumeQueue的服务启动
this.flushConsumeQueueService.start();
//CommitLog刷新的服务启动
this.commitLog.start();
//存储状态检测的服务启动
this.storeStatsService.start();

//创建临时文件,来表示是否正常关机
this.createTempFile();
//启动其他服务。比如清除过期日志的服务等
this.addScheduleTask();
this.shutdown = false;
}
这篇文件聊的就是这个 ​​addScheduleTask​​ 函数。言归正传,步入正题!

流程图

过期删除机制

文件过期删除

首次执行时间是60000毫秒=60秒。其余间隔执行都是每10秒执行一次删除。

// 资源回收间隔
private int cleanResourceInterval = 10000;
/**
* {}要执行的任务
* 1.延迟第一次执行的时间
* 2.两次执行之间的时间 10000 资源回收间隔
* 3.毫秒
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
// 延迟第一次执行的时间
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

对于删除过期的时机包括以下3种:

  1. 默认凌晨4点。这个也比较好理解,这个时候用的人也比较少,删除对系统的影响就降到最小。
  2. 磁盘空间不足。当磁盘空间不足的时候,就要删除过期文件以提供更多的空间出来接收消息。
  3. 人工触发,指人为的介入去删除。

删除的文件是过期文件,那哪些文件是过期的呢?

首先是保留时间,默认72小时,也就是3天,超过3天的数据,是需要删除的。

​deleteExpiredFiles​​ 是用于删除过期文件。执行步骤如下:

  1. 首先是需要判断是否需要删除文件,通过两个方法的调用​​isTimeToDelete​​​和​​isSpaceToDelete​​​判断是否达到定时删除时间以及是否磁盘已满需要删除,以及判断属性​​DefaultMessageStore.CleanCommitLogService#manualDeleteFileSeveralTimes​​是否大于0意味着需要手动删除。如果这三个条件任意为真,意味着需要执行删除,那就继续后续的流程。否则结束当前方法。
  2. 如果是手动删除,则属性​​DefaultMessageStore.CleanCommitLogService#manualDeleteFileSeveralTimes​​减1.
  3. 如果属性​​MessageStoreConfig#cleanFileForciblyEnable​​​和​​DefaultMessageStore.CleanCommitLogService#cleanImmediately​​为真,声明cleanAtOnece为true,否则为false。
  4. 调用方法​​CommitLog#deleteExpiredFile​​ 进行文件删除。方法需要4个入参,分别是:
  1. expiredTime:过期时间或者说文件删除前的保留时间,默认为72小时。
  2. deleteFilesInterval:文件删除间隔,这里取值为100.
  3. intervalForcibly:该参数用于强制文件强制释放时间间隔,单位是毫秒。这里取值为120*1000,
  4. cleanImmediately:是否立即执行删除,这边使用的就是步骤3中的数据。
/**
* 删除已经失效的
*/
private void deleteExpiredFiles() {
int deleteCount = 0;
// 文件保留时长 72
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// 100
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
// 1000 * 120 = 120000毫秒 = 120秒
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 判断有没到凌晨4点
boolean timeup = this.isTimeToDelete();
// 空间是否上限
boolean spacefull = this.isSpaceToDelete();
// 手动删除 经过20次的调度
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;

boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce);

fileReservedTime *= 60 * 60 * 1000;

deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {} else if (spacefull) {
// 删除文件失败
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
如果这个文件被其他线程引用了,此时就不会进行删除,记录第一次删除的时间戳,退出本次任务,等120s后,就会把文件的引用减1000,再强制删除。

在删除的过程中,会存在删除多个文件的情况,每个文件之间,还有一个时间间隔,比如第一个文件删除完后,需要等100ms再删除第二个文件。

120s可以通过 destroyMapedFileIntervalForcibly 得知

100ms可以通过 deletePhysicFilesInterval 得知

如果当前删除的文件数量,已经超过了可以删除的最大批量数,则退出本次任务。可以通过上述代码中的 ​​spacefull​​ 得出

/**
* 根据时间删除过期文件
* @param expiredTime 保留时长 一般是 72
* @param deleteFilesInterval 删除间隔 100
* @param intervalForcibly 120秒 延迟
* @param cleanImmediately 是否强制启用
* @return
*/
public int deleteExpiredFileByTime(final long expiredTime, final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately) {
//获取映射文件列表 commitlog文件可能随时有写入,copy一份不影响写入
Object[] mfs = this.copyMappedFiles(0);
//如果映射文件列表为空直接返回
if (null == mfs)
return 0;

int mfsLength = mfs.length - 1;
int deleteCount = 0;
// 存放要删除的MappedFile
List < MappedFile > files = new ArrayList < MappedFile > ();
if (null != mfs) {
//对映射文件进行遍历
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
//文件最后的修改时间 + 过期时间 = 文件最终能够存活的时间
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
// 如果文件最新修改已经超过三天或者是磁盘空间达到85%以上 而要在此之前需要满足3个条件之一,时间,容量,和手动触发
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
//删除文件,就是解除对文件的引用
if (mappedFile.destroy(intervalForcibly)) {
//要删除的的文件加入到要删除的集合中
files.add(mappedFile);
//增加计数
deleteCount++;

if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}

//如果删除时间间隔大于0,并且没有循环玩,则睡眠指定的删除间隔时长后在杀出
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {}
}
} else break;
} else {
// 避免在中间删除文件
break;
}
}
}
//从文件映射队列中删除对应的文件映射
deleteExpiredFile(files);
//返回删除的文件个数
return deleteCount;
}
由 ​​timeup​​ 变量我们可以引申出 isTimeToDelete函数

RocketMQ会配置执行删除工作的时间,默认是早上四点。如果当前时间在04:00~04:59之间,就返回true。

/**
* 判断时间是否到 凌晨4点
* @return
*/
private boolean isTimeToDelete() {
// 04
String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
if (UtilAll.isItTimeToDo(when)) {
DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
return true;
}
return false;
}
由 ​​spacefull​​ 变量我们可以引申出 isSpaceToDelete函数

判断磁盘空间是否满足删除的条件,判断要求如下:

  1. 使用提交日志的路径,检查其所在的磁盘空间的使用率。默认情况下,使用率超过90%,设置磁盘不可用标志位,并且设置属性​​DefaultMessageStore.CleanCommitLogService#cleanImmediately​​​为true。使用率超过85%,设置属性​​DefaultMessageStore.CleanCommitLogService#cleanImmediately​​为true。其他情况,设置运行状态位为磁盘可用。
  2. 磁盘使用率小于0或者大于属性​​MessageStoreConfig#diskMaxUsedSpaceRatio​​的要求,默认是75%,则返回true给调用。
  3. 针对消费队列的文件路径,上述步骤重复一次。
  4. 如果步骤1~3都没有返回true,则返回false给调用者。意味着此时磁盘空间有剩余,不要求删除。
/**
* 空间是否上限
* @return
*/
private boolean isSpaceToDelete() {
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;

cleanImmediately = false;

{
String commitLogStorePath = DefaultMessageStore.this.getStorePathPhysic();
String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
Set < String > fullStorePath = new HashSet < > ();
double minPhysicRatio = 100;
String minStorePath = null;
for (String storePathPhysic: storePaths) {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (minPhysicRatio > physicRatio) {
minPhysicRatio = physicRatio;
minStorePath = storePathPhysic;
}
if (physicRatio > diskSpaceCleanForciblyRatio) {
fullStorePath.add(storePathPhysic);
}
}
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
if (minPhysicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
", so mark disk full, storePathPhysic=" + minStorePath);
}

cleanImmediately = true;
} else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio + ", so mark disk ok, storePathPhysic=" + minStorePath);
}
}

if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + minPhysicRatio + ", storePathPhysic=" + minStorePath);
return true;
}
}

{
String storePathLogics = DefaultMessageStore.this.getStorePathLogic();
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
if (logicsRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
}

cleanImmediately = true;
} else if (logicsRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
}
}

if (logicsRatio < 0 || logicsRatio > ratio) {
DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
return true;
}
}

return false;
}

消费队列过期删除

​CLeanConsumeQueueService​​​的​​run​​方法就是直接委托这个方法来实现。这个方法的作用就是删除无效的消费队列条目内容或者文件本身。其代码逻辑如下:

  1. 通过方法​​CommitLog#getMinOffset​​获取提交日志最小的偏移量,声明为minOffset。
  2. 如果​​minOffset​​​大于类属性​​lastPhysicalMinOffset​​,那么意味着当前提交日志的最小偏移量对比上一次查询的值发生了变化,也就是说必然是最少一个提交日志文件被删除,那么相应的在消费队列中的过期数据也可以被删除,就执行后面的流程。反之,则意味着不需要执行任何操作,结束方法即可。
  3. 将​​minOffset​​​赋值给​​lastPhysicalMinOffset​​。
  4. 对属性​​consumeQueueTable​​​进行遍历,遍历其中每一个​​ConsumeQueue​​​对象。使用本次的​​minOffset​​​作为入参,调用方法​​ConsumeQueue#deleteExpiredFile​​​删除过期的消费队列文件以及更新消费队列的最小偏移量。如果有删除到文件,则休眠​​MessageStoreConfig#deleteConsumeQueueFilesInterval​​配置的时间,继续对下一个消费队列执行删除。
  5. 当循环执行完毕,使用参数​​minOffset​​​作为入参,调用方法​​IndexService#deleteExpiredFile(long)​​来删除索引文件中已经完全无效的索引文件。
public void run() {
try {
this.deleteExpiredFiles();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}

private void deleteExpiredFiles() {
// 0.1秒
int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
// 得到commitlog中第一个文件的起始物理offset
long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
if (minOffset > this.lastPhysicalMinOffset) {
// 发现上次的已经变小了 说明commitlog已经发生过删除操作了
this.lastPhysicalMinOffset = minOffset;

ConcurrentMap < String, ConcurrentMap < Integer, ConsumeQueue >> tables = DefaultMessageStore.this.consumeQueueTable;

for (ConcurrentMap < Integer, ConsumeQueue > maps: tables.values()) {
for (ConsumeQueue logic: maps.values()) {
// 对某一个消费队列做删除 参数是commitlog最小的物理点位
int deleteCount = logic.deleteExpiredFile(minOffset);

if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
try {
// 当上一个ConsumeQueue成功删除之后,下一个ConsumeQueue删除需要等待0.1s
Thread.sleep(deleteLogicsFilesInterval);
} catch (InterruptedException ignored) {

}
}
}
}
// 删除索引文件
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
}

索引文件删除

索引文件的删除是在消费队列删除完成后,调用方法 ​​deleteExpiredFile​​ 完成的。

该方法是用于删除索引文件中的无效文件。执行流程如下:

  1. 首先需要确认,索引文件中是否存在无效文件。获取第一个索引文件,获取其​​endPhyOffset​​​属性,判断该属性的值是否小于入参的​​offset​​。如果是的话,至少意味着有一个文件是无效的,则执行后续流程。否则没有无效文件,则直接结束整个方法。
  2. 声明一个局部变量​​fileList​​​,遍历索引文件​​IndexFile​​​对象,如果其​​endPhyOffset​​​小于入参的​​offset​​​,说明该文件是无效的,添加到​​fileList​​中。
  3. 使用第二步的​​fileList​​​作为入参,调用方法​​IndexService#deleteExpiredFile(List)​​​。该方法内部调用了​​IndexFile#destory​​​方法,内部也是委托了​​MappedFile#destory​​​方法实现的文件销毁。并且删除成功的​​IndexFile​​​还会从属性​​indexFileList​​列表中删除对应的对象。
/**
* 删除索引文件
* @param offset
*/
public void deleteExpiredFile(long offset) {
Object[] files = null;
try {
this.readWriteLock.readLock().lock();
if (this.indexFileList.isEmpty()) {
return;
}

long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
if (endPhyOffset < offset) {
files = this.indexFileList.toArray();
}
} catch (Exception e) {
log.error("destroy exception", e);
} finally {
this.readWriteLock.readLock().unlock();
}

if (files != null) {
List < IndexFile > fileList = new ArrayList < IndexFile > ();
for (int i = 0; i < (files.length - 1); i++) {
IndexFile f = (IndexFile) files[i];
if (f.getEndPhyOffset() < offset) {
fileList.add(f);
} else {
break;
}
}

this.deleteExpiredFile(fileList);
}
}

文件恢复机制

从源码定位中,我们可以看到执行 ​​./mqbroker​​ 命令后,会启动main函数的 createBrokerController函数。

在函数中调用了一个 ​​initialize​​​ 初始化 ,我们在初始化函数中找到了 ​​this.messageStore.load​

public static void main(String[] args) {
start(createBrokerController(args));
}
public static BrokerController createBrokerController(String[] args) {
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
public boolean initialize() throws CloneNotSupportedException {
result = result && this.messageStore.load();

if (result) {

}
return result;
}
这里的 ​​load​​ 和上面的代码一样,都是接口实现类。统一由 DefaultMessageStore 实现。

所以文件恢复函数 ​​recover​​ 从 Broker启动之后,就会随之启动。启动之后

  1. 检查当前文件是否损坏(异常关闭)或者存不存在 (检查依据已在下列代码的尾部贴出)
  2. 加载Commit Log 和 Consume Queue文件。加载成功之后进行​​recover​​ 文件恢复
/**
* 检查abort文件是不是存在,如果存在表示上次是异常关闭,这个文件是一个空文件,在启动之后会创建,正常关闭的情况会删除掉。
* 加载延迟消息相关的配置,加载 Commit Log文件,加载Consume Queue文件
* 如果步骤2成功加载,则加载checkpoint文件,加载indexFile然后进行文件的恢复逻辑
* 对于文件的恢复逻辑在recover方法中,会调用CommitLog类中的方法
* @throws IOException
*/
public boolean load() {
boolean result = true;

try {
//是否存在abort文件,如果存在说明上次服务关闭时异常关闭的
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

// 加载 Commit Log文件
result = result && this.commitLog.load();

// 加载 Consume Queue文件
result = result && this.loadConsumeQueue();

//检查前面3个文件是不是加载成功
if (result) {
//加载成功则继续加载checkpoint文件
this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
//加载indexFile
this.indexService.load(lastExitOK);
//进行文件的恢复逻辑
this.recover(lastExitOK);

log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());

if (null != scheduleMessageService) {
result = this.scheduleMessageService.load();
}
}

} catch (Exception e) {
log.error("load exception", e);
result = false;
}

if (!result) {
this.allocateMappedFileService.shutdown();
}

return result;
}

// 检查依据是从这个路径中
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
recover 函数的实现逻辑

从 ConsumeQueue文件的集合中取出,从倒数第三个文件开始,逐条遍历消息,如果取出的物理点位大于0并且message的size大于0,说明数据有效。

恢复commitlog分正常退出和非正常退出。

正常退出的commitlog所有数据都是flush完成的,所以只要从倒数第三个文件开始恢复即可,遍历每一个message,并校验其CRC。

非正常退出则从最后一个文件开始恢复,一般出现问题的都是最后一个文件,然后获取文件中的第一个message,其存储时间是否小于checkpoint时间点中的最小的一个,如果是,表示其就是需要恢复的起始文件。然后检验每一个message的CRC,并将通过校验的数据dispatch到consumelog和index文件中。

/**
* 进行文件的恢复逻辑
* @param lastExitOK
*/
private void recover(final boolean lastExitOK) {
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();

//上次服务关闭是不是正常关闭
if (lastExitOK) {
//正常情况关闭
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
//异常情况关闭
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}

//恢复topic消费相关相关的缓存
this.recoverTopicQueueTable();
}
/**
* 计算恢复ConsumeQueue文件集合的下标
*/
private long recoverConsumeQueue() {
long maxPhysicOffset = -1;
for (ConcurrentMap < Integer, ConsumeQueue > maps: this.consumeQueueTable.values()) {
for (ConsumeQueue logic: maps.values()) {
logic.recover();
if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
maxPhysicOffset = logic.getMaxPhysicOffset();
}
}
}
return maxPhysicOffset;
}
/**
* 恢复topic消费相关相关的缓存
*/
public void recoverTopicQueueTable() {
/* topic-queueid */
/* offset */
HashMap < String, Long > table = new HashMap < String, Long > (1024);
long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentMap < Integer, ConsumeQueue > maps: this.consumeQueueTable.values()) {
for (ConsumeQueue logic: maps.values()) {
String key = logic.getTopic() + "-" + logic.getQueueId();
table.put(key, logic.getMaxOffsetInQueue());
logic.correctMinOffset(minPhyOffset);
}
}

this.commitLog.setTopicQueueTable(table);
}
/**
* 当正常退出、数据恢复时,所有内存数据均已刷新
* 服务正常恢复 加载的映射文件列表进行遍历,对文件进行校验,和文件中的消息的魔数进行校验,来判断哪些数据是正常的,
* 并计算出正常的数据的最大偏移量。然后,根据偏移量设置对应的提交和刷新的位置以及不正常数据的删除。
*/
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List < MappedFile > mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
//如果文件列表大于3就从倒数第3个开始,否则从第一个开始
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;

MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
//校验消息,然后返回转发请求,根据Magic_code正确,并且crc32正确,并且消息的msgSize记录大小和消息整体大小相等。则表示是合格的消息
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
// 是一个合格的消息并且消息体大于0
if (dispatchRequest.isSuccess() && size > 0) {
// 则读取的偏移量mapedFileOffset累加msgSize
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the return 0 representatives met last hole, this can not be included in truncate offset
// 是合格的消息,但是消息体为0,表示读取到了文件的最后一块信息
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
// 文件读完了
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
// 最后读取的MapedFile对象的fileFromOffset加上最后读取的位置mapedFileOffset值
processOffset += mappedFileOffset;
// 设置文件刷新到的offset
this.mappedFileQueue.setFlushedWhere(processOffset);
// 设置文件提交到的offset
this.mappedFileQueue.setCommittedWhere(processOffset);
// 删除offset之后的脏数据文件
this.mappedFileQueue.truncateDirtyFiles(processOffset);

// Clear ConsumeQueue redundant data
// 清除ConsumeQueue冗余数据
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// Commitlog case files are deleted 案例文件被删除
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}

结尾

本篇文件介绍的就是RocketMQ的过期删除机制,与恢复机制。

文件过期删除机制 触发主要有三点

  1. 默认凌晨4点。这个也比较好理解,这个时候用的人也比较少,删除对系统的影响就降到最小。
  2. 磁盘空间不足。当磁盘空间不足的时候,就要删除过期文件以提供更多的空间出来接收消息。
  3. 人工触发,指人为的介入去删除。
由上述三种情况展开聊了一些文件过大,被占用,文件损坏的一些安全性处理。

恢复机制 没有硬性条件,主要有以下2点

  1. 检查当前文件是否损坏(异常关闭)或者存不存在
  2. 加载Commit Log 和 Consume Queue文件。加载成功之后才执行
消息队列过期删除

取出commitlog中第一个文件的起始物理offset位置,与末次最小物理坐标offset做对比。如果发现上次的下标已经变小了,说明commitlog已经发生过删除操作了

索引过期删除

执行完消息队列的过期删除,根据坐标直接删掉对应的索引

非常欢迎大家加我个人微信有关后端方面的问题我们在群内一起讨论! 我们下期再见!

欢迎『点赞』、『在看』、『转发』三连支持一下,下次见~

有关图文并茂!深入了解RocketMQ的过期删除机制的更多相关文章

  1. ruby-on-rails - 如何从 format.xml 中删除 <hash></hash> - 2

    我有一个对象has_many应呈现为xml的子对象。这不是问题。我的问题是我创建了一个Hash包含此数据,就像解析器需要它一样。但是rails自动将整个文件包含在.........我需要摆脱type="array"和我该如何处理?我没有在文档中找到任何内容。 最佳答案 我遇到了同样的问题;这是我的XML:我在用这个:entries.to_xml将散列数据转换为XML,但这会将条目的数据包装到中所以我修改了:entries.to_xml(root:"Contacts")但这仍然将转换后的XML包装在“联系人”中,将我的XML代码修改为

  2. ruby - 我可以使用 Ruby 从 CSV 中删除列吗? - 2

    查看Ruby的CSV库的文档,我非常确定这是可能且简单的。我只需要使用Ruby删除CSV文件的前三列,但我没有成功运行它。 最佳答案 csv_table=CSV.read(file_path_in,:headers=>true)csv_table.delete("header_name")csv_table.to_csv#=>ThenewCSVinstringformat检查CSV::Table文档:http://ruby-doc.org/stdlib-1.9.2/libdoc/csv/rdoc/CSV/Table.html

  3. ruby - 我可以使用 aws-sdk-ruby 在 AWS S3 上使用事务性文件删除/上传吗? - 2

    我发现ActiveRecord::Base.transaction在复杂方法中非常有效。我想知道是否可以在如下事务中从AWSS3上传/删除文件:S3Object.transactiondo#writeintofiles#raiseanexceptionend引发异常后,每个操作都应在S3上回滚。S3Object这可能吗?? 最佳答案 虽然S3API具有批量删除功能,但它不支持事务,因为每个删除操作都可以独立于其他操作成功/失败。该API不提供任何批量上传功能(通过PUT或POST),因此每个上传操作都是通过一个独立的API调用完成的

  4. ruby - 如何安全地删除文件? - 2

    在Ruby中是否有Gem或安全删除文件的方法?我想避免系统上可能不存在的外部程序。“安全删除”指的是覆盖文件内容。 最佳答案 如果您使用的是*nix,一个很好的方法是使用exec/open3/open4调用shred:`shred-fxuz#{filename}`http://www.gnu.org/s/coreutils/manual/html_node/shred-invocation.html检查这个类似的帖子:Writingafileshredderinpythonorruby?

  5. ruby-on-rails - 标准化文件名的字符串,删除重音和特殊字符 - 2

    我正在尝试找到一种方法来规范化字符串以将其作为文件名传递。到目前为止我有这个:my_string.mb_chars.normalize(:kd).gsub(/[^\x00-\x7F]/n,'').downcase.gsub(/[^a-z]/,'_')但第一个问题:-字符。我猜这个方法还有更多问题。我不控制名称,名称字符串可以有重音符、空格和特殊字符。我想删除所有这些,用相应的字母('é'=>'e')替换重音符号,并将其余的替换为'_'字符。名字是这样的:“Prélèvements-常规”“健康证”...我希望它们像一个没有空格/特殊字符的文件名:“prelevements_routin

  6. ruby-on-rails - 为什么在 Rails 5.1.1 中删除了 session 存储初始化程序 - 2

    我去了这个website查看Rails5.0.0和Rails5.1.1之间的区别为什么5.1.1不再包含:config/initializers/session_store.rb?谢谢 最佳答案 这是删除它的提交:Setupdefaultsessionstoreinternally,nolongerthroughanapplicationinitializer总而言之,新应用没有该初始化器,session存储默认设置为cookie存储。即与在该初始值设定项的生成版本中指定的值相同。 关于

  7. ruby - 如果它是标点符号,我怎么能从字符串中删除最后一个字符,在 ruby​​ 中? - 2

    啊,正则表达式有点困惑。我正在尝试删除字符串末尾所有可能的标点符号:ifstr[str.length-1]=='?'||str[str.length-1]=='.'||str[str.length-1]=='!'orstr[str.length-1]==','||str[str.length-1]==';'str.chomp!end我相信有更好的方法来做到这一点。有什么指点吗? 最佳答案 str.sub!(/[?.!,;]?$/,'')[?.!,;]-字符类。匹配这5个字符中的任何一个(注意,。在字符类中并不特殊)?-前一个字符或组

  8. 键删除后 ruby​​ 哈希内存泄漏 - 2

    你好,我无法成功如何在散列中删除key后释放内存。当我从哈希中删除键时,内存不会释放,也不会在手动调用GC.start后释放。当从Hash中删除键并且这些对象在某处泄漏时,这是预期的行为还是GC不释放内存?如何在Ruby中删除Hash中的键并在内存中取消分配它?例子:irb(main):001:0>`ps-orss=-p#{Process.pid}`.to_i=>4748irb(main):002:0>a={}=>{}irb(main):003:0>1000000.times{|i|a[i]="test#{i}"}=>1000000irb(main):004:0>`ps-orss=-p

  9. ruby - 我可以删除 Ruby 中的方法别名吗? - 2

    假设我有一段Ruby代码,我想在其中为一个方法设置别名(我不知道为什么;让我们假设我有一个很好的理由)。classStringalias_method:contains?,:include?end我可以在本节之后删除这个别名吗? 最佳答案 remove_method在大多数情况下应该有效。但是,如果您的alias_method覆盖了现有方法,您可能需要通过单独的alias_method调用来保存原始方法。#assuming:contains?isalreadyamethodalias_method:original_contains

  10. ruby - 如何以编程方式删除实例上的 "singleton information"以使其编码(marshal)? - 2

    我创建了一个由于“在运行时执行的单例元类定义”而无法编码的对象(这段代码的描述是否正确?)。这是通过以下代码执行的:#defineclassXthatmyusesingletonclassmetaprogrammingfeatures#throughcallofmethod:break_marshalling!classXdefbreak_marshalling!meta_class=class我该怎么做才能使对象编码正确?是否可以从对象instance_of_x的classX中“移除”单例组件?我真的需要一个建议,因为我们的一些对象需要通过Marshal.dump序列化机制进行缓存。

随机推荐