继续创作,加速成长!这是我参与「日新计划 10 月更文应战」的第28天,点击检查活动概况

前语

其实关于这个的话,我从前的几篇博文: SpringBoot+Netty+Vue+Websocket完结在线推送/谈天系统

实用水文篇–SpringBoot整合Netty完结音讯推送服务器

其完结已说的十分理解了,可是每想到后台仍是有人找我要代码,由于完整的代码其实在博文里边都是贴出来了的,当然前面的博文我都是给一个类似于脚手架的东西,没有给出实践的事例。那么今日也是直接给出代码的事例吧,这儿再次声明,一切的代码都是在这两篇博文里边有的,假如做了修改在本文当中会给出提示。此外的话,咱们的项目是彻底开源的,可是在开发阶段不开源,由于有一些铭感信息。在开发完结之后开源,这儿请见谅,当然也是为什么我没有办法给出源码的原因(暂时)也是为什么你能够在我的那两篇博文看到完整的代码信息的原因。

Tips: 在我的频道只会告诉你怎样做饭,不会把饭菜做好了再给你,假如需求,那是别的“价格”。

那么废话不多说,咱们开始。

技术架构

咱们今日来看到咱们的一个事例。首先是咱们的技术架构:

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

那么在咱们今日的话是这样的:

  1. 用户上传博文
  2. 博文经过之后发送审阅音讯给前端
  3. 前端对音讯进行展示

作用图

这个是咱们上传博文,博文经过之后会看到音讯有个提示。

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

之后到详细的页面能够看到音讯

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)
由于图片是中午截取的,有个小bug没有调整,可是不要在意这些,这个bug是很简略的,由于一开始我这边做过初始化,没有铲除缓存算了。

后端项目

之后的话来看到咱们的后端的一个服务情况。

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

咱们要进行互动的服务便是这三个,一个是网关,一个是博文的服务,还有便是咱们的音讯服务器。由于咱们是用户上传成功后再显示的。

那么关于博客的模块的话在这块有完整的阐明: SpringBoot + Vue完结博文上传+展示+博文列表

咱们这边做的便是一个系列。当然仅仅演示实践上,你用从前我给出的代码是彻底能够完结作用的,咱们这边仅仅说用那一套代码来真正做咱们详细的业务。

音讯数据界说

存储结构

那么在开始之前的话,咱们这边对咱们的音讯服务器规划了对应的数据库用来存储音讯。

这一块看你自己,咱们这边肯定是要的。

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

那么咱们这次的事例需求运用到的表是这个表:

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

音讯状况

之后的话,咱们需求对咱们的音讯进行界说。 咱们在这儿对音讯的状况做出如下界说:

  1. 音讯具有两者状况,针对两个情况
  2. 签收状况,即,服务器确定用户在线,并且将音讯传输到了客户端,为签收状况。
  3. 阅读状况,在确保已签收的情况下,用户是否现已阅读音讯,这部分的逻辑有客户端代码处理。
  4. 对应未签收的音讯,用户上线时,恳求服务器是否存在未签收的音讯,假如有,进行一致读取,存储到本地
  5. 对于未读音讯,主要是对用户的状况进行一个判别,音讯现已缓存到用户本地。

那么此刻的话,咱们就现已说清楚了这个。在咱们的数据库里边status这个字段便是用来判别用户是不是签收了音讯的。至于用户到底有没有读取音讯,那么彻底便是客户端需求做的判别了。

当然你也能够规划为全部由服务端来处理。

Nutty音讯服务

项目结构

ok,说完了这个的话,咱们再来看到咱们的音讯服务端是怎样处理的。

首先咱们依然是和从前的博文相同,保留了从前的东西。 可是咱们这边多了Controller,和Dao层。

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)
那么在这边的话,咱们需求关注的只有这几个东西:
SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

这几个东西便是咱们来完结前面的作用的实践的业务代码。

除了这些当然还有咱们的Dao,可是这个是根据你的业务来的,这儿我就不展示了,类比嘛。

改动

那么说完了这些,咱们来看到和从前的代码有哪些改动的东西。

音讯bean

首先是咱们的音讯的改动。

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

@AllArgsConstructor
@NoArgsConstructor
@ToString
/**
 * 由于咱们这边Nutty处理的音讯只有注册,所以话这儿只需求
 * 保留action和userid即可
 * */
public class DataContent implements Serializable {
    private Integer action;
    private String userid;
}

那么咱们的音讯的类型是这样的:

public enum MessageActionEnum {
    //界说音讯类型
    CONNECT(1,"第一次(或重连)初始化衔接"),
    CHAT(2,"谈天音讯"),
    SIGNED(3,"音讯签收"),
    KEEPALIVE(4,"客户端坚持心跳"),
    PULL_FRIEND(5, "拉取好友"),
    HOLEADUITMSG(6,"审阅音讯");
    public final Integer type;
    public final String content;
    MessageActionEnum(Integer type,String content) {
        this.type = type;
        this.content = content;
    }
}

音讯处理器

既然咱们的这个音讯类型变了,那么咱们的这个代码也变了:

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

@Component
@ChannelHandler.Sharable
public class ServerListenerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger log = LoggerFactory.getLogger(ServerBoot.class);
    static {
        //先初始化出来
        UserConnectPool.getChannelMap();
        UserConnectPool.getChannelGroup();
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        String content = msg.text();
        /**获取客户端传过来的音讯*/
        DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
        assert dataContent != null;
        Integer action = dataContent.getAction();
        Channel channel =  ctx.channel();
        /**
         * 根据音讯类型对其进行处理,咱们这儿只做两个事情
         * 1. 注册用户
         * 2. 心跳在线
         * */
        if(Objects.equals(action, MessageActionEnum.CONNECT.type)){
            /**
             * 2.1 当websocket 第一次 open 的时分,
             * 初始化channel,把用的 channel 和 userid 关联起来
             * */
            String userid = dataContent.getUserid();
            AttributeKey<String> key = AttributeKey.valueOf("userId");
            ctx.channel().attr(key).setIfAbsent(userid);
            UserConnectPool.getChannelMap().put(userid,channel);
            UserConnectPool.output();
        } else if(Objects.equals(action, MessageActionEnum.KEEPALIVE.type)){
            /**
             * 心跳包的处理
             * */
            System.out.println("收到来自channel 为["+channel+"]的心跳包"+dataContent);
            channel.writeAndFlush(
                    new TextWebSocketFrame(
                            JsonUtils.objectToJson(R.ok("回来心跳包").
                                    put("type", MessageActionEnum.KEEPALIVE.type))
                    )
            );
            System.out.println("已回来音讯");
        }
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //接纳到恳求
        log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        ctx.channel().attr(key).setIfAbsent("temp");
        UserConnectPool.getChannelGroup().add(ctx.channel());
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        String chanelId = ctx.channel().id().asShortText();
        log.info("客户端被移除:channel id 为:"+chanelId);
        removeUserId(ctx);
        UserConnectPool.getChannelGroup().remove(ctx.channel());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        //发生了异常后封闭衔接,同时从channelgroup移除
        ctx.channel().close();
        removeUserId(ctx);
        UserConnectPool.getChannelGroup().remove(ctx.channel());
    }
    /**
     * 删去用户与channel的对应联系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        UserConnectPool.getChannelMap().remove(userId);
    }
}

这个便是咱们核心的音讯处理器。

那么其他的关于Nutty的玩意我压根没有改动。

音讯转化pojo东西

这儿还有咱们的音讯转化的东西类。这个的话,我也给一下:

public class JsonUtils {
    // 界说jackson目标
    private static final ObjectMapper MAPPER = new ObjectMapper();
    /**
     * 将目标转化成json字符串。
     * <p>Title: pojoToJson</p>
     * <p>Description: </p>
     * @param data
     * @return
     */
    public static String objectToJson(Object data) {
        try {
            String string = MAPPER.writeValueAsString(data);
            return string;
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     * 将json结果集转化为目标
     *
     * @param jsonData json数据
     * @param beanType 目标类型
     * @return
     */
    public static <T> T jsonToPojo(String jsonData, Class<T> beanType) {
        try {
            T t = MAPPER.readValue(jsonData, beanType);
            return t;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     * 将json数据转化成pojo目标list
     * <p>Title: jsonToList</p>
     * <p>Description: </p>
     * @param jsonData
     * @param beanType
     * @return
     */
    public static <T>List<T> jsonToList(String jsonData, Class<T> beanType) {
        JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
        try {
            List<T> list = MAPPER.readValue(jsonData, javaType);
            return list;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

前面应该是有给的。

要点便是咱们的这个Controller这一块。

审阅音讯处理

controller

咱们首先来看到咱们的Controller

@RestController
@RequestMapping("/message/holeAduit")
public class HoleAduitMsgController {
    @Autowired
    HoleAduitMsgService holeAduitMsgService;
    @PostMapping("/aduit")
    public R holeAduitMsg(@Validated @RequestBody HoleAduitMsgQ holeAduitMsgQ){
        return holeAduitMsgService.holeaduitMsg(holeAduitMsgQ);
    }
}

咱们只看到这一个接口,由于其他的都是类似的。

那么这儿的话咱们仍是需求一个恳求的实体类的。 那么这个实体类的话是这个样子的:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class HoleAduitMsgQ {
    @NotEmpty
    private String userid;
    private String msg;
    private String msgtitle;
    private Long linkid;
    private Integer type;
}

这个实体类的话,是被我封装了这儿:

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)
由于咱们是一个微服务,所以的话,对应的这个恳求咱们都是放在了第三方的一个包下面。 那么对于的还有咱们暴露出来的服务。


@FeignClient("message")
@RequestMapping("/message/holeAduit")
public interface FeignHoleAduitMsgService {
    @PostMapping("/aduit")
    public R holeAduitMsg(@RequestBody HoleAduitMsgQ holeAduitMsgQ);
}

之后的话,咱们能够看到详细的完结类。

完结类

@Service
public class HoleAduitMsgServiceImpl implements HoleAduitMsgService {
    @Autowired
    HoleAuditService auditService;
    @Override
    public R holeaduitMsg(HoleAduitMsgQ holeAduitMsgQ) {
        //1.对音讯进行存储,只要用户在线的话,咱们就直接先给他签收一下
        String userid = holeAduitMsgQ.getUserid();
        Channel channel = UserConnectPool.getChannelFromMap(userid);
        HoleAuditEntity holeAuditEntity = new HoleAuditEntity();
        BeanUtils.copyProperties(holeAduitMsgQ,holeAuditEntity);
        holeAuditEntity.setCreateTime(DateUtils.getCurrentTime());
        if(channel!=null){
            //这边仅仅确保存在,双层稳妥,这个时分的话便是在线
            Channel realChannel = UserConnectPool.getChannelGroup().find(channel.id());
            if(realChannel!=null){
                holeAuditEntity.setStatus(1);
                //咱们这边直接转发音讯就好了,不需求再额外处理
                realChannel.writeAndFlush(
                        new TextWebSocketFrame(
                                JsonUtils.objectToJson(
                                        Objects.requireNonNull(R.ok().put("data", holeAuditEntity))
                                                .put("type", MessageActionEnum.HOLEADUITMSG.type)
                                )
                        )
                );
            }
        }
        //这儿进行音讯的存储
        auditService.save(holeAuditEntity);
        return R.ok();
    }
}

这儿边的逻辑其实十分简略,就几个步骤。

1.承受恳求 2.判别用户是否在线,在线推送,并保存设置为已签收(音讯) 假如不在线,不进行推送,可是保存音讯并设置为未签收

这儿的话便是十分简略的。

服务调用

之后的话,便是咱们的调用。咱们的调用是在咱们的博客服务进行调用的。

咱们先看到咱们完整的博客服务的完结类。

public class BlogUpServiceImpl implements BlogUpService {
    @Autowired
    FeignUserService feignUserService;
    @Autowired
    ContentService contentService;
    @Autowired
    FeignHeadimgService feignHeadimgService;
    @Autowired
    WordFilter wordFilter;
    @Autowired
    BlogService blogService;
    @Autowired
    FeignLogActicleService feignLogActicleService;
    @Autowired
    RedisUtils redisUtils;
    @Autowired
    FeignHoleAduitMsgService feignHoleAduitMsgService;
    private final static Double threshold = 0.05;
    /**
     * 接口对用户进行十分钟限制
     *  1.完结用户博文的上传
     *  2.存储用户博文,博文对应信息
     *  3.修改用户日志
     * */
    @Override
    public R blogUp(UpBlogEntity entity) {
        String userid = entity.getUserid();
        String backMessage = "success";
        //接口限流
        if(redisUtils.hasKey(RedisTransKey.getBlogUpKey(entity.getUserid()))){
            return R.error(BizCodeEnum.OVER_UPBLOG.getCode(), BizCodeEnum.OVER_UPBLOG.getMsg());
        }
        R info = feignUserService.info(userid);
        String userString = FastJsonUtils.toJson(info.get("user"));
        UserEntity user = FastJsonUtils.fromJson(userString, UserEntity.class);
        if(user!=null){
            String context = entity.getContext();
            String blogInfo = entity.getInfo();
            /**
             * 先对context和bloginfo进行校验,是否为存在不友好的信息
             * */
            int countContext = wordFilter.wordCount(context);
            int countInfo = wordFilter.wordCount(blogInfo);
            int status = 1;
            //博文的摘要过滤,只要摘要没有过,直接先打回去!
            if(countInfo>=blogInfo.length()*threshold){
                return R.error(BizCodeEnum.BAD_BLOGINFO.getCode(),BizCodeEnum.BAD_BLOGINFO.getMsg());
            }
            //博文内容的过滤
            if(countContext>=context.length()*threshold){
                //直接便是没有经过审阅
                return R.error(BizCodeEnum.BAD_CONTEXT.getCode(),BizCodeEnum.BAD_CONTEXT.getMsg());
            }else if (countContext>0&&countContext<context.length()*threshold){
                backMessage="哇!您的提交直接经过了呢!";
            }else {
                status = 2;
                context = wordFilter.replace(context, '*');
                backMessage="您的发问已提交,正在等候审阅哟!";
            }
            //预存储content
            ContentEntity contentEntity = new ContentEntity();
            contentEntity.setContent(context);
            contentEntity.setVersion("1.0");
            contentEntity.setCreateTime(DateUtils.getCurrentTime());
            contentService.save(contentEntity);
            Long contentid = contentEntity.getContentid();
            //预存储博文
            BlogEntity blogEntity = new BlogEntity();
            blogEntity.setBlogTitle(entity.getBlogTitle());
            blogEntity.setLevel(entity.getLevel());
            blogEntity.setBlogtype(entity.getBlogtype());
            //查询用户的头像信息
            R RHeadImg = feignHeadimgService.headimg(userid);
            String headImgString = FastJsonUtils.toJson(RHeadImg.get("headimg"));
            final HeadimgEntity headimg = FastJsonUtils.fromJson(headImgString, HeadimgEntity.class);
            if(headimg!=null){
                blogEntity.setUserImg(headimg.getImgpath());
            }
            blogEntity.setCreateTime(DateUtils.getCurrentTime());
            blogEntity.setUserNickname(user.getNickname());
            blogEntity.setUserid(userid);
            blogEntity.setStatus(status);
            blogEntity.setInfo(blogInfo);
            blogService.save(blogEntity);
            Long blogid = blogEntity.getBlogid();
            //完结正式存储
            contentEntity.setBlogid(blogid);
            blogEntity.setContentid(contentid);
            blogService.updateById(blogEntity);
            contentService.updateById(contentEntity);
            /**
             * 更新用户日志
             * */
            LogActicleEntity logActicleEntity = new LogActicleEntity();
            logActicleEntity.setAction(1);
            logActicleEntity.setUserid(userid);
            logActicleEntity.setArticleid(blogEntity.getBlogid());
            logActicleEntity.setArticleTitle(blogEntity.getBlogTitle());
            logActicleEntity.setCreteTime(blogEntity.getCreateTime());
            feignLogActicleService.save(logActicleEntity);
            /**
             * 发送音讯
             * */
            if(status==1){
                /**
                 * 此刻是直接经过了审阅,那么直接进行发送
                 * 假如没有的话,那么便是后台经过审阅由MQ发送音讯
                 * */
                HoleAduitMsgQ holeAduitMsgQ = new HoleAduitMsgQ();
                holeAduitMsgQ.setMsg("您的博文"+blogEntity.getBlogTitle()+"直接经过了审阅");
                holeAduitMsgQ.setMsgtitle("博文审阅经过");
                holeAduitMsgQ.setUserid(user.getUserid());
                holeAduitMsgQ.setLinkid(blogid);
                holeAduitMsgQ.setType(1);
                feignHoleAduitMsgService.holeAduitMsg(holeAduitMsgQ);
            }
            /**
             * 设置标志
             */
            redisUtils.set(RedisTransKey.setBlogUpKey(entity.getUserid())
                    ,1,10, TimeUnit.MINUTES
            );
        }else{
            return R.error(BizCodeEnum.NO_SUCHUSER.getCode(),BizCodeEnum.NO_SUCHUSER.getMsg());
        }
        return R.ok(backMessage);
    }
}

这儿边有注释,要点便是那个发送音讯的。

到此的话,咱们的后端就没啥事情了。

前端

之后的话便是咱们的前端。

咱们的前端主要是担任两件事情

  1. 向服务器注册(假如用户登录了的话)
  2. 坚持心跳在线
  3. 接纳服务器发送过来的音讯,并保存,然后要通知用户

衔接代码

首先是咱们从前博文有提到的,咱们的衔接封装好的代码。

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)
我这儿是放在了socket包下面,其他的你们自己看着办。


// 导出socket目标
export {
  socket
}
import { Message } from 'element-ui'
// socket主要目标
var socket = {
  websock: null,
  /**
   * 这个是咱们的ws的地址
   * */
  ws_url: "ws://localhost:9000/ws",
  userid: null,
  msgfunc: null,
  /**
   * 敞开标识
   * */
  socket_open: false,
  /**
   * 心跳timer
   * */
  hearbeat_timer: null,
  /**
   * 心跳发送频率
   * */
  hearbeat_interval: 10000,
  /**
   * 是否敞开重连
   * */
  is_reonnect: true,
  /**
   * 从头衔接的次数
   * */
  reconnect_count: 3,
  /**
   * 当时从头衔接的次数,默以为:1
   * */
  reconnect_current: 1,
  /**
   * 从头衔接的时间类型
   * */
  reconnect_timer: null,
  /**
   * 从头衔接的间隔
   * */
  reconnect_interval: 3000,
  /**
   * 登录后才进行衔接
   * */
  /**
   * 初始化衔接
   */
  init: () => {
    let loginToken = localStorage.getExpire("LoginToken");
    let userid = localStorage.getExpire("userid");
    if(loginToken==null && userid==null) {
      Message({
        message: '当时正在以游客身份拜访',
        type: 'info',
      });
      return ;
    }
    if (!("WebSocket" in window)) {
      Message({
        message: '当时浏览器与网站不兼容丫',
        type: 'error',
      });
      console.log('浏览器不支持WebSocket')
      return null
    }
    // 现已创立过衔接不再重复创立
    if (socket.websock) {
      return socket.websock
    }
    socket.websock = new WebSocket(socket.ws_url)
    socket.websock.onmessage = function (e) {
      socket.receive(e)
    }
    // 封闭衔接
    socket.websock.onclose = function (e) {
      console.log('衔接已断开')
      console.log('connection closed (' + e.code + ')')
      clearInterval(socket.hearbeat_interval)
      socket.socket_open = false
      // 需求从头衔接
      if (socket.is_reonnect) {
        socket.reconnect_timer = setTimeout(() => {
          // 超过重连次数
          if (socket.reconnect_current > socket.reconnect_count) {
            clearTimeout(socket.reconnect_timer)
            return
          }
          // 记录重连次数
          socket.reconnect_current++
          socket.reconnect()
        }, socket.reconnect_interval)
      }
    }
    // 衔接成功
    socket.websock.onopen = function () {
      Message({
        message: 'Welcome here',
        type: 'success',
      });
      let userid = localStorage.getExpire("userid");
      socket.userid = userid;
      console.log('衔接成功')
      socket.socket_open = true
      socket.is_reonnect = true
      // 敞开心跳
      socket.heartbeat()
      //注册用户
      let resit={
        "action": 1,
        "userid": userid
      }
      socket.send(resit)
    }
    // 衔接发生过错
    socket.websock.onerror = function (err) {
      Message({
        message: '无法衔接至服务器!',
        type: 'error',
      });
      console.log('WebSocket衔接发生过错')
    }
  },
  /**
   * 获取websocket目标
   * */
  getSocket:()=>{
    //创立了直接回来,反之重来
    if (socket.websock) {
      return socket.websock
    }else {
      socket.init();
    }
  },
  getStatus:()=> {
    if (socket.websock.readyState === 0) {
      return "未衔接";
    } else if (socket.websock.readyState === 1) {
      return "已衔接";
    } else if (socket.websock.readyState === 2) {
      return "衔接正在封闭";
    } else if (socket.websock.readyState === 3) {
      return "衔接已封闭";
    }
  },
  /**
   * 发送音讯
   * @param {*} data 发送数据
   * @param {*} callback 发送后的自界说回调函数
   */
  send: (data, callback = null) => {
    // 敞开状况直接发送
    if (socket.websock.readyState === socket.websock.OPEN) {
      socket.websock.send(JSON.stringify(data))
      if (callback) {
        callback()
      }
      // 正在敞开状况,则等候1s后从头调用
    } else if (socket.websock.readyState === socket.websock.CONNECTING) {
      setTimeout(function () {
        socket.send(data, callback)
      }, 1000)
      // 未敞开,则等候1s后从头调用
    } else {
      socket.init()
      setTimeout(function () {
        socket.send(data, callback)
      }, 1000)
    }
  },
  /**
   * 接纳音讯
   * @param {*} message 接纳到的音讯
   */
  receive: (message) => {
    var recData = JSON.parse(message.data)
    /**
     *这部分是咱们详细的对音讯的处理
     * */
    if(socket.msgfunc==null){
      Message({
        message: 'receive需求传入一个func进行大局音讯处理!',
        type: 'error',
      });
    }else {
      socket.msgfunc(recData)
    }
  },
  /**
   * 心跳
   */
  heartbeat: () => {
    console.log('socket', 'ping')
    if (socket.hearbeat_timer) {
      clearInterval(socket.hearbeat_timer)
    }
    socket.hearbeat_timer = setInterval(() => {
      //发送心跳包
      let data = {
        "action": 4,
        "userid": socket.userid
      }
      socket.send(data)
    }, socket.hearbeat_interval)
  },
  /**
   * 自动封闭衔接
   */
  close: () => {
    console.log('自动断开衔接')
    clearInterval(socket.hearbeat_interval)
    socket.is_reonnect = false
    socket.websock.close()
  },
  /**
   * 从头衔接
   */
  reconnect: () => {
    console.log('发起从头衔接', socket.reconnect_current)
    if (socket.websock && socket.socket_open) {
      socket.websock.close()
    }
    socket.init()
  },
}

这段代码里边主要有两个重要的当地、

  1. 初始化的衔接
  2. 发送音讯(这边的话咱们基本上是经过http发送到详细的服务,然后由音讯服务器转发的,可是心跳在线仍是需求这个的)
  3. 接纳到音讯的处理办法(这个要自己完结的,怎样完结待会会有阐明)

初始化

首先是初始化,我这边的话是做音讯的推送,包含用户的谈天,所以的话,咱们这边是需求大局运用的。那么这边的话,咱们初始化需求在你的根页面进行运用。那么我这边是在这边在这儿。你们自己的自己看着办。

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

承受音讯

之后是咱们承受音讯的当地。

哪里需求哪里运用。我这边是在这个当地运用:

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

这部分的完整代码是这样的:

 created() {
    /**
     * 在这儿咱们担任对音讯进行处理,咱们把音讯存储到缓存当中
     * 到详细的页面的时分,咱们就加载,这样就好了。
     * */
    socket.msgfunc=this.msgfunc;
      //加载音讯
    let messageNum_Local = localStorage.getExpire("messageNum");
    let messageContent_Local = localStorage.getExpire("messageContent");
    if(messageNum_Local){
      this.messageNum = messageNum_Local;
      this.total = messageNum_Local.total;
    }else {
      this.messageNum = messageNum;
      localStorage.setExpire("messageNum", this.messageNum,this.OverTime);
      this.total = 0;
    }
    if(messageContent_Local){
      this.messageContent = messageContent_Local;
    }else {
      this.messageContent = messageContent;
      //由于一开始有初始值,这个初始值是不要的
      delete this.messageContent[0];
      localStorage.setExpire("messageContent",this.messageContent,this.OverTime);
    }
  },
    msgfunc(res){
      //这个msgfunc是担任处理大局信息的
      if(res.type===4){
        //心跳正常
      }else {
        /**
         * 这儿边便是咱们接下来要做的逻辑了,
         * 这儿的话音讯是分为两个状况的,签收和读取
         * */
        let messageNum_Local = localStorage.getExpire("messageNum");
        messageNum_Local.total+=1;
        this.total = messageNum_Local.total;
        this.messageNum = messageNum_Local;
        //加载音讯
        this.messageContent = localStorage.getExpire("messageContent");
        if(res.type===6){
          //这个时分是咱们的审阅音讯
          this.messageNum.auditNum+=1;
          //头部刺进音讯
          this.messageContent.auditContent.unshift(res.data)
        }
        //当token过期的时分,咱们这些东西都需求进行删去
        localStorage.setExpire("messageNum",this.messageNum,this.OverTime);
        localStorage.setExpire("messageContent",this.messageContent,this.OverTime);
      }
    },

咱们在这个页面完结了对音讯的处理,并且让咱们的socket进行了指定。

那么这段代码主要做了这些事情

  1. 指定了音讯的处理办法(初始化部分)
  2. 在音讯处理部分,主要是针对音讯的类型,进行不同的存储

咱们着这边界说了两个音讯存储的玩意。

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

这个便是咱们界说存储音讯的玩意

let messageNum = {
  total: 0,
  auditNum: 0,
};
let messageContent = {
    auditContent:[
      {
        userid: null,
        msg: null,
        msgtitle: null,
        linkid: null,
        createTime: null,
        msgid: null,
        status: null,
        type: null,
      }
    ]
}
export {
  messageNum,
  messageContent
}

刚刚的那一段代码便是为了存储音讯。这儿的话,目前是只界说了一个音讯类型。

音讯的展示

尽然咱们把音讯保存起来了,那么咱们就需求进行读取了。 那么读取的话很简略啊,咱们都存起来了,直接读取加载不就好了。 那么在这边的话,便是咱们的这个页面

<template>
  <div style="background-color: rgba(239,250,246,0.53)">
    <br>
    <div class="head" style="width: 90%;margin: 0 auto">
      <el-button style="margin-left:80%" type="primary" plain>清空一切</el-button>
    </div>
    <br>
    <div style="width: 80%;margin-left: 1%" class="main">
      <el-card shadow="hover" v-for="(message,index) in Messages" :key="index">
        <div style="height:80px">
          <div style="display:inline-block;margin-left: 5%">
            <p class="message">
              <router-link v-if="message.type==1" class="alink"
                           :to="{ path: '/blogshow',query:{'blogid':message.linkid}}"
              >
              {{message.msgtitle}}&nbsp;&nbsp;<i class="el-icon-circle-check"></i>
              </router-link>
            </p>
            <el-tooltip class="item" effect="dark" :content="message.msg" placement="top-start">
            <p class="message">
              检查概况:{{message.msg}}
            </p>
            </el-tooltip>
          </div>
          <div style="display:inline-block;margin-left: 20%">
            <p>
              <el-button  icon="el-icon-delete" ></el-button>
            </p>
            <p style="font-size: 8px">
              {{message.createTime}}
            </p>
          </div>
        </div>
        <br>
      </el-card>
    </div>
    <br>
    <div class="footer" style="margin: 0 auto;width: 100%;">
      <div class="block" >
        <el-pagination
          background
          layout="total, prev, pager, next, jumper"
          :total=total>
        </el-pagination>
      </div>
    </div>
  </div>
</template>
<script>
import {messageContent} from "../../socket/message";
export default {
  //这儿的话是咱们审阅音讯的详细的页面
  name: "auditInformation",
  data(){
    return{
      total: 0,
      Messages: messageContent.auditContent,
    }
  },
  created() {
    //假如在咱们这边本地有缓存的话,那么咱们就加载缓存的
    //假如没有缓存的话,那么咱们就需求去求取服务器,加载
    let messageContent= localStorage.getExpire("messageContent");
    if(messageContent){
      //分页的话,咱们在这边做切片即可
      this.Messages = messageContent.auditContent;
    }else {
      //这儿去求取服务器拿到数据,然后从头保存在本地
    }
  }
}
</script>
<style scoped>
.message{
  width: 20em;
  overflow: hidden;
  text-overflow:ellipsis;
  white-space: nowrap;
}
.alink{
  text-decoration: none;
  color: #333333;
}
</style>

这块的完结现在仍是很粗糙,横竖便是一个事例。这下子的话完整的进程我是现已阐理解了的。

其实假如有时间的话,彻底是能够把这个封装成一个专门的音讯处理中间件的。刚好有用,而且很有必要。这儿的话,我能够有机会试试,仿制一下,也搞一个音讯中间件玩玩。

作用

之后的 话便是咱们的测试作用。 登录有提示:

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)
这边有心跳包:
SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)
后端也能够看到衔接
SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

编写文章的时分,上传成功后有音讯提示:

这个是咱们上传博文,博文经过之后会看到音讯有个提示。

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

然后还能够到详细的页面检查。

SpringBoot+SpringCloud+Nutty打造分布式在线消息推送服务(实例)

总结

这儿的话,假如这个系列ok了的话,那么此刻你现已学会了 CURD+Nutty了。此刻你现已能够脱离XXX办理系统了,能够做一个微信谈天之类的东西了。那么在咱们这边的不足之处的话,还差一个便是谈天的信息加密,可是这个东西,怎样说呢,假如是类似于md5这种类型的加密,那么是不可能有人破解,可是我也不可能还原,我还原不了,你也就只能看到乱码。我要是能够解密,那么仅仅确保了在传输的时分是安全的,可是我相同能够看,除非谈天秘钥在你手里,可是你的秘钥总仍是要再咱们的服务器存储的,至少是要上传的,不然我怎样给你解密。所以这个我可能都不会去加密,顶多传输的时分加一下,乃至不加上一个https就完了。