问题
-
由于RocketMQ操作CommitLog、ConsumeQueue文件,都是基于内存映射办法并在发动的时分,会加载commitlog、ConsumeQueue目录下的一切文件,为了避免内存与磁盘的浪费,不可能将音讯永久存储在音讯服务器上,所以需求一种机制来删去已过期的文件。
-
假如让你来完成怎样完成,思考一下,借鉴于Redis删去战略,慵懒删去和守时删去,慵懒删去呢,由于音讯能够查询之前的,所以也不主张此办法,那么就剩下守时删去了,发动一个守时使命然后超了某个时刻之后进行删去。
- 守时使命
- 获取文件终究修正时刻
- 是否超了某个时刻点,超了则删去,持续下一个文件
-
下面看看RocketMQ是如何完成的
原理
- RocketMQ顺序写Commitlog、ConsumeQueue文件,一切写操作悉数落在终究一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后,将不会再被更新。
- RocketMQ铲除过期文件的办法是:假如非当时写文件在一守时刻距离内没有再次被更新,则认为是过期文件,能够被删去,RocketMQ不会管这个这个文件上的音讯是否被悉数消费。默认每个文件的过期时刻为72小时。经过在Broker配置文件中设置fileReservedTime来改动过期时刻,单位为小时。接下来详细分析RocketMQ是如何规划与完成上述机制的。想一个问题,是否存在这么一个情况,音讯还在consumerQueue中,可是CommitLog重现已没有了,是不是需求确保CommitLog和ConsumerQueue同步删去
源码
进口在 DefaultMessageStore中,直接看start办法 –> this.addScheduleTask();
主要铲除CommitLog、ConsumeQueue的过期文件。CommitLog 与 ConsumeQueue 关于过期文件的删去算法
addScheduleTask
- RocketMQ 会每隔10s调度一次cleanFilesPeriodically,已检测是否需求铲除过期文件。履行频率能够经过设置cleanResourceInterval,默认为10s。
private void addScheduleTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//阶段性的删去文件
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
//。。。。省略其他守时使命。。。。
}
cleanFilesPeriodically()
- 主要铲除CommitLog、ConsumeQueue的过期文件。CommitLog 与 ConsumeQueue 关于过期文件的删去算法
private void cleanFilesPeriodically() {
this.cleanCommitLogService.run();
this.cleanConsumeQueueService.run();
}
- 经过之前的学习,肯定知道这个 Service肯定是 ServiceThread完成类,也便是Thread,此处的run呢,这个就要看看根底了,可不是start办法哈。可是,仔细看,其实啥也不是,便是一个普通的类,仅仅里边有个run办法罢了。额。。。。。。
- 由于两个Service逻辑相似,只去关注commitLog办法了
CleanCommitLogService
-
分成了两步
- 第一个步骤:测验删去过期文件
- 第二个步骤:删去上一步落网之鱼,重试删去被hange(由于被其他线程引证在第一阶段未删去的文件),在这里再重试一次。
class CleanCommitLogService {
//省略了一些特点
public void run() {
try {
//K1 删去过期文件
this.deleteExpiredFiles();
//k1 删去被挂起的文件
this.redeleteHangedFile();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
}
deleteExpiredFiles
-
大体分为两步
- 到达了删去的条件
- 进行删去逻辑
删去条件判别
-
获取一些基本的变量信息
- fileReservedTime:文件保存时刻,也便是从终究一次更新时刻到现在,假如超越了该时刻,则认为是过期文件,能够被删去。
- deletePhysicFilesInterval:删去物理文件的距离,由于在一次铲除过程中,可能需求删去的文件不止一个,该值指定两次删去文件的距离时刻。
- destroyMapedFileIntervalForcibly:在铲除过期文件时,假如该文件被其他线程所占用(引证次数大于0,比方读取音讯),此时会阻挠此次删去使命,一起在第一次企图删去该文件时记载当时时刻戳,destroyMapedFileIntervalForcibly表明第一次回绝删去之后能保存的最大时刻,在此时刻内,相同能够被回绝删去,一起会将引证削减1000个,超越该时刻距离后,文件将被强制删去。(上面的东西,感觉有点虚,什么引证啥的,后边持续看看)
//文件保存时刻,也便是从终究一次更新时刻到现在,假如超越了该时刻,则认为是过期文件,能够被删去。 long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); //删去物理文件的距离,由于在一次铲除过程中,可能需求删去的文件不止一个,该值指定两次删去文件的距离时刻。 int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); //无敌时刻:表明第一次回绝删去之后能保存的最大时刻 // 在铲除过期文件时,假如该文件被其他线程所占用(引证次数大于0,比方读取音讯),此时会阻挠此次删去使命,一起在第一次企图删去该文件时记载当时时刻戳, // destroyMapedFileIntervalForcibly表明第一次回绝删去之后能保存的最大时刻,在此时刻内,相同能够被回绝删去,一起会将引证削减1000个, // 超越该时刻距离后,文件将被强制删去。 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
-
判别条件
- 是否时刻到了,RocketMQ经过deleteWhen设置一天的固守时刻履行一次删去过期文件操作,默认为凌晨4点。
- 判别磁盘空间是否充足,假如不充足,则返回true,表明应该触发过期文件删去操作。
- 预留,手艺触发,能够经过调用excuteDeleteFilesManualy办法手艺触发过期文件删去,目前RocketMQ暂未封装手艺触发文件删去的命令
isSpaceToDelete
-
获取文件删去的临界点,超了次临界点,删去文件
//K1 获取maxUsedSpaceRatio,表明commitlog、consumequeue文件地点磁盘分区的最大运用量,假如超越该值,则需求当即铲除过期文件。 double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
-
删去符号位,注意此对象是特点字段,肯定后边删去的时分,也会判别此值
cleanImmediately = false;
-
获取commit的一切存储途径,然后依次进行判别,存在两个非常重要的值
- diskSpaceWarningLevelRatio=0.90:假如磁盘分区运用率超越该阔值,将设置磁盘不可写,此时会回绝新音讯的写入。
- diskSpaceCleanForciblyRatio=0.85:假如磁盘分区运用超越该阔值,主张当即履行过期文件铲除,但不会回绝新音讯的写入。
String commitLogStorePath = DefaultMessageStore.this.getStorePathPhysic(); String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); Set<String> fullStorePath = new HashSet<>(); //最小的分区份额 double minPhysicRatio = 100; String minStorePath = null; //K2 获取一切的存储文件途径 for (String storePathPhysic : storePaths) { //K2 获取当时物理磁盘运用率:获取commitlog地点磁盘分区总的存储容量 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); //不断更新最小值 if (minPhysicRatio > physicRatio) { minPhysicRatio = physicRatio; minStorePath = storePathPhysic; } //IMP // diskSpaceWarningLevelRatio=0.90:假如磁盘分区运用率超越该阔值,将设置磁盘不可写,此时会回绝新音讯的写入。 // diskSpaceCleanForciblyRatio=0.85:假如磁盘分区运用超越该阔值,主张当即履行过期文件铲除,但不会回绝新音讯的写入。 //K2 假如磁盘分区运用超越该阔值,主张当即履行过期文件铲除,但不会回绝新音讯的写入。 if (physicRatio > diskSpaceCleanForciblyRatio) { fullStorePath.add(storePathPhysic); } }
-
判别磁盘是否可用,用当时已运用物理磁盘率maxUsedSpaceRatio、diskSpaceWarningLevelRatio、diskSpaceCleanForciblyRatio,假如当时磁盘运用率到达上述阔值,将返回true表明磁盘已满,需求进行过期文件删去操作。此处将 cleanImmediately设置为了true
//IMP CommitLog设置哪些文件满了,可是没有针对文件进行禁止写入的情况 DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); //K2 假如磁盘分区运用率超越该阔值,将设置磁盘不可写,此时会回绝新音讯的写入。 if (minPhysicRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio + ", so mark disk full, storePathPhysic=" + minStorePath); } //IMP 当即整理 cleanImmediately = true; //K2 假如磁盘分区运用超越该阔值,主张当即履行过期文件铲除,但不会回绝新音讯的写入。 } else if (minPhysicRatio > diskSpaceCleanForciblyRatio) { //IMP 当即整理 cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio + ", so mark disk ok, storePathPhysic=" + minStorePath); } } if (minPhysicRatio < 0 || minPhysicRatio > ratio) { DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + minPhysicRatio + ", storePathPhysic=" + minStorePath); return true; }
删去逻辑
-
上面三种情况任一满足,则进行整理作业
if (timeup || spacefull || manualDelete) { //.....整理作业...... }
-
判别是否马上整理,此处看到了之前设置的 cleanImmediately字段信息
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
-
整理过期文件
//IMP 整理过期文件 deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce);
deleteExpiredFile
public int deleteExpiredFile(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
int mfsLength = mfs.length - 1;
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
//K1 文件存储太久,或许,之前判别出的当即整理
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
//IMP 删去文件
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
//K1 办理 mappedFiles 中的存储数据
deleteExpiredFile(files);
return deleteCount;
}
-
两种条件履行整理
- 文件现已存储很久了
- 经过之前的判别,文件需求整理,
-
有个疑问,是不是上面判别了,就必须整理文件呢,假如文件再被运用呢。。。。
MapperFile.destory
public boolean destroy(final long intervalForcibly) {
this.shutdown(intervalForcibly);
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
shutDown
- 注意几个点,一开始文件肯定是 available=true,一开始肯定是有用的,相当于一个开关,一开始是可用的,那么设置为不可用,那么文件处理的时分,肯定会判别,文件是否可用这样就能够封闭一切的写入程序。
- 设置timestamp
- 履行release办法。下面是挂起的逻辑,可是现在跑到了上面的逻辑岂不是就跑不到了。。。感觉有点不对呀,正在运用中的文件怎样办呢,岂不是就删去了
public void shutdown(final long intervalForcibly) {
//K1 能够整理
if (this.available) {
//K2 设置available为false
this.available = false;
//K2 记载firstShutdownTimestamp 时刻戳
this.firstShutdownTimestamp = System.currentTimeMillis();
this.release();
//K1 假如被其他线程引证,本次不删去
} else if (this.getRefCount() > 0) {
//K2 在回绝被删去保护期内(destroyMapedFileIntervalForcibly)每履行一次整理使命,将引证次数减去1000,引证数小于1后,该文件终究将被删去
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
先看看假如走到了 else if的地方,那么肯定是文件现已设置了不可用了,而且文件还被某些文件正在运用中,那么假如超了距离时刻,就将引证削减1000,这样能够确保,超了一守时刻之后,引证肯定会变低,还会履行release办法,该办法详细干了什么,瞅瞅
release办法
- 好吧,本来,里边判别了引证数,假如被引证了,也便是设置avalialbe=false了,这样就很显着了,第一次来了,假如还有引证,那么仅仅更改了可用的符号位,并且引证削减,第二次来的时分,引证数仍是大于0的,超了超时时刻之后,就会履行cleanUp办法,然后履行文件的整理了。其实文件的整理的时分,也是用到了available进行进一步的判别,不得不说,形式规划的好,能够很好的控制一切的流程信息。
public void release() {
long value = this.refCount.decrementAndGet();
//IMP 引证大于0,不删去
if (value > 0)
return;
synchronized (this) {
this.cleanupOver = this.cleanup(value);
}
}
cleanUp
@Override
public boolean cleanup(final long currentRef) {
//IMP 文件被设置了可用,就不去履行
if (this.isAvailable()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have not shutdown, stop unmapping.");
return false;
}
if (this.isCleanupOver()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have cleanup, do not do it again.");
return true;
}
clean(this.mappedByteBuffer);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
TOTAL_MAPPED_FILES.decrementAndGet();
log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
return true;
}
引证学习链接:blog.csdn.net/prestigedin…