问题

  1. 由于RocketMQ操作CommitLog、ConsumeQueue文件,都是基于内存映射办法并在发动的时分,会加载commitlog、ConsumeQueue目录下的一切文件,为了避免内存与磁盘的浪费,不可能将音讯永久存储在音讯服务器上,所以需求一种机制来删去已过期的文件。

  2. 假如让你来完成怎样完成,思考一下,借鉴于Redis删去战略,慵懒删去和守时删去,慵懒删去呢,由于音讯能够查询之前的,所以也不主张此办法,那么就剩下守时删去了,发动一个守时使命然后超了某个时刻之后进行删去。

    1. 守时使命
    2. 获取文件终究修正时刻
    3. 是否超了某个时刻点,超了则删去,持续下一个文件
  3. 下面看看RocketMQ是如何完成的

原理

  • RocketMQ顺序写Commitlog、ConsumeQueue文件,一切写操作悉数落在终究一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后,将不会再被更新。
  • RocketMQ铲除过期文件的办法是:假如非当时写文件在一守时刻距离内没有再次被更新,则认为是过期文件,能够被删去,RocketMQ不会管这个这个文件上的音讯是否被悉数消费。默认每个文件的过期时刻为72小时。经过在Broker配置文件中设置fileReservedTime来改动过期时刻,单位为小时。接下来详细分析RocketMQ是如何规划与完成上述机制的。想一个问题,是否存在这么一个情况,音讯还在consumerQueue中,可是CommitLog重现已没有了,是不是需求确保CommitLog和ConsumerQueue同步删去

源码

进口在 DefaultMessageStore中,直接看start办法 –> this.addScheduleTask();

主要铲除CommitLog、ConsumeQueue的过期文件。CommitLog 与 ConsumeQueue 关于过期文件的删去算法

addScheduleTask

  • RocketMQ 会每隔10s调度一次cleanFilesPeriodically,已检测是否需求铲除过期文件。履行频率能够经过设置cleanResourceInterval,默认为10s。
private void addScheduleTask() {
​
 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  @Override
  public void run() {
   //阶段性的删去文件
   DefaultMessageStore.this.cleanFilesPeriodically();
   }
  }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
 //。。。。省略其他守时使命。。。。
}

cleanFilesPeriodically()

  • 主要铲除CommitLog、ConsumeQueue的过期文件。CommitLog 与 ConsumeQueue 关于过期文件的删去算法
private void cleanFilesPeriodically() {
 this.cleanCommitLogService.run();
 this.cleanConsumeQueueService.run();
}
  • 经过之前的学习,肯定知道这个 Service肯定是 ServiceThread完成类,也便是Thread,此处的run呢,这个就要看看根底了,可不是start办法哈。可是,仔细看,其实啥也不是,便是一个普通的类,仅仅里边有个run办法罢了。额。。。。。。
  • 由于两个Service逻辑相似,只去关注commitLog办法了

CleanCommitLogService

  • 分成了两步

    • 第一个步骤:测验删去过期文件
    • 第二个步骤:删去上一步落网之鱼,重试删去被hange(由于被其他线程引证在第一阶段未删去的文件),在这里再重试一次。
class CleanCommitLogService {
​
 //省略了一些特点public void run() {
  try {
​
   //K1 删去过期文件
   this.deleteExpiredFiles();
​
   //k1 删去被挂起的文件
   this.redeleteHangedFile();
   } catch (Throwable e) {
   DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
   }
  }
}

deleteExpiredFiles

  • 大体分为两步

    1. 到达了删去的条件
    2. 进行删去逻辑

删去条件判别

  1. 获取一些基本的变量信息

    1. fileReservedTime:文件保存时刻,也便是从终究一次更新时刻到现在,假如超越了该时刻,则认为是过期文件,能够被删去。
    2. deletePhysicFilesInterval:删去物理文件的距离,由于在一次铲除过程中,可能需求删去的文件不止一个,该值指定两次删去文件的距离时刻。
    3. destroyMapedFileIntervalForcibly:在铲除过期文件时,假如该文件被其他线程所占用(引证次数大于0,比方读取音讯),此时会阻挠此次删去使命,一起在第一次企图删去该文件时记载当时时刻戳,destroyMapedFileIntervalForcibly表明第一次回绝删去之后能保存的最大时刻,在此时刻内,相同能够被回绝删去,一起会将引证削减1000个,超越该时刻距离后,文件将被强制删去。(上面的东西,感觉有点虚,什么引证啥的,后边持续看看
    //文件保存时刻,也便是从终究一次更新时刻到现在,假如超越了该时刻,则认为是过期文件,能够被删去。
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    ​
    //删去物理文件的距离,由于在一次铲除过程中,可能需求删去的文件不止一个,该值指定两次删去文件的距离时刻。
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    ​
    ​
    //无敌时刻:表明第一次回绝删去之后能保存的最大时刻
    //  在铲除过期文件时,假如该文件被其他线程所占用(引证次数大于0,比方读取音讯),此时会阻挠此次删去使命,一起在第一次企图删去该文件时记载当时时刻戳,
    //  destroyMapedFileIntervalForcibly表明第一次回绝删去之后能保存的最大时刻,在此时刻内,相同能够被回绝删去,一起会将引证削减1000个,
    //  超越该时刻距离后,文件将被强制删去。
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
    
  2. 判别条件

    1. 是否时刻到了,RocketMQ经过deleteWhen设置一天的固守时刻履行一次删去过期文件操作,默认为凌晨4点。
    2. 判别磁盘空间是否充足,假如不充足,则返回true,表明应该触发过期文件删去操作。
    3. 预留,手艺触发,能够经过调用excuteDeleteFilesManualy办法手艺触发过期文件删去,目前RocketMQ暂未封装手艺触发文件删去的命令
isSpaceToDelete
  1. 获取文件删去的临界点,超了次临界点,删去文件

    //K1 获取maxUsedSpaceRatio,表明commitlog、consumequeue文件地点磁盘分区的最大运用量,假如超越该值,则需求当即铲除过期文件。
    double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
    
  2. 删去符号位,注意此对象是特点字段,肯定后边删去的时分,也会判别此值

    cleanImmediately = false;
    
  3. 获取commit的一切存储途径,然后依次进行判别,存在两个非常重要的值

    1. diskSpaceWarningLevelRatio=0.90:假如磁盘分区运用率超越该阔值,将设置磁盘不可写,此时会回绝新音讯的写入。
    2. diskSpaceCleanForciblyRatio=0.85:假如磁盘分区运用超越该阔值,主张当即履行过期文件铲除,但不会回绝新音讯的写入。
    String commitLogStorePath = DefaultMessageStore.this.getStorePathPhysic();
    String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
    Set<String> fullStorePath = new HashSet<>();
    ​
    //最小的分区份额
    double minPhysicRatio = 100;
    String minStorePath = null;
    ​
    //K2 获取一切的存储文件途径
    for (String storePathPhysic : storePaths) {
    ​
     //K2 获取当时物理磁盘运用率:获取commitlog地点磁盘分区总的存储容量
     double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
    ​
     //不断更新最小值
     if (minPhysicRatio > physicRatio) {
      minPhysicRatio = physicRatio;
      minStorePath = storePathPhysic;
      }
    ​
     //IMP
     //  diskSpaceWarningLevelRatio=0.90:假如磁盘分区运用率超越该阔值,将设置磁盘不可写,此时会回绝新音讯的写入。
     //  diskSpaceCleanForciblyRatio=0.85:假如磁盘分区运用超越该阔值,主张当即履行过期文件铲除,但不会回绝新音讯的写入。
     //K2 假如磁盘分区运用超越该阔值,主张当即履行过期文件铲除,但不会回绝新音讯的写入。
     if (physicRatio > diskSpaceCleanForciblyRatio) {
      fullStorePath.add(storePathPhysic);
      }
    }
    
  4. 判别磁盘是否可用,用当时已运用物理磁盘率maxUsedSpaceRatio、diskSpaceWarningLevelRatio、diskSpaceCleanForciblyRatio,假如当时磁盘运用率到达上述阔值,将返回true表明磁盘已满,需求进行过期文件删去操作。此处将 cleanImmediately设置为了true

    //IMP CommitLog设置哪些文件满了,可是没有针对文件进行禁止写入的情况
    DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
    ​
    //K2 假如磁盘分区运用率超越该阔值,将设置磁盘不可写,此时会回绝新音讯的写入。
    if (minPhysicRatio > diskSpaceWarningLevelRatio) {
     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
     if (diskok) {
      DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
                     ", so mark disk full, storePathPhysic=" + minStorePath);
      }
    ​
     //IMP 当即整理
     cleanImmediately = true;
    ​
     //K2 假如磁盘分区运用超越该阔值,主张当即履行过期文件铲除,但不会回绝新音讯的写入。
    } else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {
    ​
     //IMP 当即整理
     cleanImmediately = true;
    } else {
    ​
     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
     if (!diskok) {
      DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio +
                     ", so mark disk ok, storePathPhysic=" + minStorePath);
      }
    }
    ​
    if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
     DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "
                    + minPhysicRatio + ", storePathPhysic=" + minStorePath);
     return true;
    }
    

删去逻辑

  1. 上面三种情况任一满足,则进行整理作业

    if (timeup || spacefull || manualDelete) {
     //.....整理作业......
    }
    
  2. 判别是否马上整理,此处看到了之前设置的 cleanImmediately字段信息

    boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
    
  1. 整理过期文件

    //IMP 整理过期文件
    deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                                      destroyMapedFileIntervalForcibly, cleanAtOnce);
    
deleteExpiredFile
public int deleteExpiredFile(
 final long expiredTime,
 final int deleteFilesInterval,
 final long intervalForcibly,
 final boolean cleanImmediately
) {
 return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
public int deleteExpiredFileByTime(final long expiredTime,
                  final int deleteFilesInterval,
                  final long intervalForcibly,
                  final boolean cleanImmediately) {
 Object[] mfs = this.copyMappedFiles(0);
​
 if (null == mfs)
  return 0;
​
 int mfsLength = mfs.length - 1;
 int deleteCount = 0;
 List<MappedFile> files = new ArrayList<MappedFile>();
 if (null != mfs) {
  for (int i = 0; i < mfsLength; i++) {
   MappedFile mappedFile = (MappedFile) mfs[i];
   long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
​
   //K1 文件存储太久,或许,之前判别出的当即整理
   if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
​
    //IMP 删去文件
    if (mappedFile.destroy(intervalForcibly)) {
     files.add(mappedFile);
     deleteCount++;
​
     if (files.size() >= DELETE_FILES_BATCH_MAX) {
      break;
      }
​
     if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
      try {
       Thread.sleep(deleteFilesInterval);
       } catch (InterruptedException e) {
       }
      }
     } else {
     break;
     }
    } else {
    //avoid deleting files in the middle
    break;
    }
   }
  }
​
 //K1 办理 mappedFiles 中的存储数据
 deleteExpiredFile(files);
​
 return deleteCount;
}
  • 两种条件履行整理

    • 文件现已存储很久了
    • 经过之前的判别,文件需求整理,
  • 有个疑问,是不是上面判别了,就必须整理文件呢,假如文件再被运用呢。。。。

MapperFile.destory
public boolean destroy(final long intervalForcibly) {
 this.shutdown(intervalForcibly);
​
 if (this.isCleanupOver()) {
  try {
   this.fileChannel.close();
   log.info("close file channel " + this.fileName + " OK");
​
   long beginTime = System.currentTimeMillis();
   boolean result = this.file.delete();
   log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
        + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
        + this.getFlushedPosition() + ", "
        + UtilAll.computeElapsedTimeMilliseconds(beginTime));
   } catch (Exception e) {
   log.warn("close file channel " + this.fileName + " Failed. ", e);
   }
​
  return true;
  } else {
  log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
       + " Failed. cleanupOver: " + this.cleanupOver);
  }
​
 return false;
}
shutDown
  • 注意几个点,一开始文件肯定是 available=true,一开始肯定是有用的,相当于一个开关,一开始是可用的,那么设置为不可用,那么文件处理的时分,肯定会判别,文件是否可用这样就能够封闭一切的写入程序。
  • 设置timestamp
  • 履行release办法。下面是挂起的逻辑,可是现在跑到了上面的逻辑岂不是就跑不到了。。。感觉有点不对呀,正在运用中的文件怎样办呢,岂不是就删去了
public void shutdown(final long intervalForcibly) {
​
 //K1 能够整理
 if (this.available) {
  //K2 设置available为false
  this.available = false;
​
  //K2 记载firstShutdownTimestamp 时刻戳
  this.firstShutdownTimestamp = System.currentTimeMillis();
  this.release();
​
  //K1 假如被其他线程引证,本次不删去
  } else if (this.getRefCount() > 0) {
​
  //K2 在回绝被删去保护期内(destroyMapedFileIntervalForcibly)每履行一次整理使命,将引证次数减去1000,引证数小于1后,该文件终究将被删去
  if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
   this.refCount.set(-1000 - this.getRefCount());
   this.release();
   }
  }
}

先看看假如走到了 else if的地方,那么肯定是文件现已设置了不可用了,而且文件还被某些文件正在运用中,那么假如超了距离时刻,就将引证削减1000,这样能够确保,超了一守时刻之后,引证肯定会变低,还会履行release办法,该办法详细干了什么,瞅瞅

release办法
  • 好吧,本来,里边判别了引证数,假如被引证了,也便是设置avalialbe=false了,这样就很显着了,第一次来了,假如还有引证,那么仅仅更改了可用的符号位,并且引证削减,第二次来的时分,引证数仍是大于0的,超了超时时刻之后,就会履行cleanUp办法,然后履行文件的整理了。其实文件的整理的时分,也是用到了available进行进一步的判别,不得不说,形式规划的好,能够很好的控制一切的流程信息。
public void release() {
   long value = this.refCount.decrementAndGet();
​
   //IMP 引证大于0,不删去
   if (value > 0)
    return;
​
   synchronized (this) {
    this.cleanupOver = this.cleanup(value);
    }
   }
cleanUp
@Override
public boolean cleanup(final long currentRef) {
 
 //IMP 文件被设置了可用,就不去履行
 if (this.isAvailable()) {
  log.error("this file[REF:" + currentRef + "] " + this.fileName
       + " have not shutdown, stop unmapping.");
  return false;
  }
​
 if (this.isCleanupOver()) {
  log.error("this file[REF:" + currentRef + "] " + this.fileName
       + " have cleanup, do not do it again.");
  return true;
  }
​
 clean(this.mappedByteBuffer);
 TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
 TOTAL_MAPPED_FILES.decrementAndGet();
 log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
 return true;
}

引证学习链接:blog.csdn.net/prestigedin…