说明
首要,感谢您抽出宝贵的时刻来读我的文章。我知道您的时刻很宝贵,并且有许多其他风趣的事情能够做。可是您还是挑选了点进来,这让我感到十分欣慰。
在我前面的文章中有说到长衔接选型,经过各种综合考虑后面咱们选了gRpc,现在距离gRpc成功落地并上线已有几个月,在此期间也是遇到了一些坑,也翻遍了许多社区和文档,最终结合自己的一些思考,成功落地了一套gRpc服务端依据Java微服务、分布式、高可用的长衔接服务。
关于标题
请允许我说为什么或许是全网最全实战篇,我并不是标题党,在许多关于gRpc的社区和文章中,多是偏基础运用入门的demo,比较难满意出产环境的运用需求,本文主要针对出产环境落地实践进行阐述,结合直播事务功用点对gRpc进行详细的运用说明。
系统、版别
gRpc服务端
依据Java,SpringCloud Alibaba/SpringBoot、Nacos 3.0(3.0版别的Nacos才支持gRpc协议,但假如你不需求在微服务的rpc调用中运用gRpc,则无需关怀此项)
gRpc客户端
Java微服务、安卓、IOS
服务依靠、插件
gRpc、protobuf 依靠
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.51.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.51.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.51.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.7.1</version>
</dependency>
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
<version>1.4</version>
</dependency>
protobuf文件代码主动生成插件
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.34.1:exe:${os.detected.classifier}</pluginArtifact>
<!--设置grpc生成代码到指定路径-->
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<!--生成代码前是否清空目录-->
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 设置多个源文件夹 -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- 增加主源码目录 -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/src/main/gen</source>
<source>${project.basedir}/src/main/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
gRpc Client SpringBoot集成依靠
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-client-spring-boot-starter</artifactId>
<version>2.14.0.RELEASE</version>
</dependency>
gRpc Server SpringBoot集成依靠
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-server-spring-boot-starter</artifactId>
<version>2.14.0.RELEASE</version>
</dependency>
proto文件接口界说、代码生成
咱们依据实践事务出发,先简单来个麦位办理接口练练手
syntax = "proto3";
import "google/protobuf/any.proto";
package com.xxx.grpc.room;
option java_multiple_files = true;
option java_package = "com.xxx.xxx.room";
option objc_class_prefix = "PB3Room";
//麦位接口
service Mic{
//获取房间麦位列表
rpc getMicList(GetMicListRequest) returns (GetMicListResponse) {};
//上麦
rpc micUp(MicUpRequest) returns (MicUpResponse) {};
//下麦
rpc micDown(MicDownRequest) returns (MicDownResponse) {};
//更改麦位状况
rpc micStateUpdate(MicStateUpdateRequest) returns (MicStateUpdateResponse) {};
//抱上麦位
rpc micHoldUp (MicHoldUpRequest) returns (MicHoldUpResponse) {};
//抱下麦位
rpc micHoldDown (MicHoldDownRequest) returns (MicHoldDownResponse) {};
//恳求排麦
rpc micUpApply (MicUpApplyRequest) returns (MicUpApplyResponse) {};
//撤销排麦
rpc micUpApplyCancel (MicUpApplyCancelRequest) returns (MicUpApplyCancelResponse) {};
}
//request----------------------------
message GetMicListRequest {
int64 roomId = 1; //房间ID
}
message MicUpRequest {
int64 roomId = 1; //房间ID
int32 position = 2; //麦位次序
bool auto = 3; //是否需求主动上麦 true:需求 false:不需求 此参数为true时主动忽略position字段的值 为false时才会取position字段进行手动上麦
}
message MicDownRequest {
int64 roomId = 1; //房间ID
int32 position = 2; //麦位次序
}
message MicStateUpdateRequest {
int64 roomId = 1; //房间ID
int32 position = 2; //麦位次序
MicState state = 3; //状况
int64 userId = 4; //用户ID
}
message MicHoldUpRequest{
int64 roomId = 1; //房间ID
int64 userId = 2; //被操作的用户ID
int32 position = 3; //麦位 (有position时先判别position 没有判别userId地点position)
}
message MicHoldDownRequest{
int64 roomId = 1; //房间ID
int64 userId = 2; //被操作的用户ID
int32 position = 3; //麦位次序
}
message MicUpApplyRequest{
int64 roomId = 1; //房间ID
}
message MicUpApplyCancelRequest{
int64 roomId = 1; //房间ID
}
// response----------------------------------------------------------
message GetMicListResponse{
repeated MicInfo micList = 1; //麦位列表
}
message MicUpResponse{
bool success = 1; //是否成功
}
message MicDownResponse{
bool success = 1; //是否成功
}
message MicStateUpdateResponse{
bool success = 1; //是否成功
int32 position = 2; //当时修正的麦位
}
message MicHoldUpResponse{
bool success = 1; //是否成功
}
message MicHoldDownResponse{
bool success = 1; //是否成功
}
message MicUpApplyResponse{
bool success = 1; //是否成功
}
message MicUpApplyCancelResponse{
bool success = 1; //是否成功
}
// object---------------------
//麦位信息
message MicInfo{
int64 userId = 1; //用户ID
int32 position = 2; //麦位
string nickname = 3;//昵称
string avatar = 4;// 头像
bool bossPosition = 5;//是否老板麦位
MicState state = 6; //麦位状况
int64 charmScore = 7; //在麦位上的魅力值
string avatarFrameUrl = 8; //用户佩带头像框
}
//麦位状况
enum MicState{
OPEN = 0;//麦位麦克风正常翻开状况
CLOSE = 1;//麦位麦克风被封闭状况
BAN = 2; //麦位被封禁
}
//反常状况码
enum ErrorCode{
//服务器反常
SERVER_ERROR = 0;
//非法操作
ILLEGAL_OPERATION = 1;
//有人在麦上
SOMEONE_IN_MIC = 1000;
//麦位被封禁
MIC_BAN = 1001;
//没有闲暇麦位--麦下用户主动上麦时提示
NO_FREE_MIC = 1002;
//用户现已脱离房间
NOT_IN_ROOM = 1003;
//无权限操作
NO_ACCESS = 1999;
}
找到proto文件地点项目-Lifecycle-complile双击主动生成代码
生成的代码如下,主动帮咱们完结了序列化与反序列化的协议,一起它也是一种契约,一切的客户端+服务端都用着同一套契约
开端运用
@GrpcService注解效果
在依据 Spring Boot 的 gRPC 服务中,这个注解它的效果是将 gRPC 服务完结类符号为可被 gRPC 服务器扫描和注册的组件。
@GrpcService
注解的效果包含:
-
服务注册:经过将
@GrpcService
注解运用于 gRPC 服务完结类,将其主动注册到 gRPC 服务器中。这样,gRPC 服务器能够知道有哪些服务可用,并能够处理客户端的恳求。 -
依靠注入:在依据 Spring Boot 的运用中,
@GrpcService
注解通常与依靠注入结构(如 Spring)一起运用。当运用@GrpcService
注解符号服务完结类时,依靠注入结构会主动扫描并创立该类的实例,并将其注入到 gRPC 服务器中。这使得你能够在服务完结类中运用其他依靠注入的组件或进行相关的事务逻辑。 -
简化装备:运用
@GrpcService
注解能够简化 gRPC 服务的装备。经过符号服务完结类,你无需手动编写繁琐的注册和装备代码,而是让注解和依靠注入结构处理这些细节,使得服务的开发愈加便利和高效。
需求注意的是,@GrpcService
注解是针对依据 Spring Boot 的项目,并与 gRPC 结构集成时的一种常用做法。假如你运用其他的 gRPC 结构或渠道,或许会有不同的办法来注册和办理 gRPC 服务完结类。
用户鉴权
咱们在运用惯例Http恳求的的时分,通用的鉴权办法是经过HttpRequest拿到header中的token信息来对用户进行鉴权,在gRPC中并没有这种直接获取恳求头部信息的办法,可是咱们能够经过gRpc给咱们供给的阻拦器 ServerInterceptor
自己简单写一个相似的功用,代码如下。
public class MetadataConstants {
/**
* 令牌自界说标识
*/
public static final String AUTHENTICATION = "Authorization";
/**
* 客户端类型
*/
public static final String CLIENT_TYPE = "clientType";
}
public class ThreadLocalUtil {
private static final ThreadLocal<Map<String, Object>> threadLocal = ThreadLocal.withInitial(() -> new HashMap<>(10));
public static Map<String, Object> getThreadLocal() {
return threadLocal.get();
}
public static Object get(String key) {
Map<String, Object> map = threadLocal.get();
return map.get(key);
}
public static void set(String key, Object value) {
Map<String, Object> map = threadLocal.get();
map.put(key, value);
}
public static void set(Map<String, Object> keyValueMap) {
Map<String, Object> map = threadLocal.get();
map.putAll(keyValueMap);
}
public static void remove() {
threadLocal.remove();
}
public static <T> T remove(String key) {
Map<String, Object> map = threadLocal.get();
return (T) map.remove(key);
}
}
@Data
@Builder
public class GrpcThreadData {
/**
* 用户ID
*/
private Long userId;
/**
* 渠道类型
*/
private int clientType;
}
public class GrpcUtil {
private static final String GRPC_USER_KEY = "grpc-user";
public static Long getUserId() {
GrpcThreadData grpcThreadData = (GrpcThreadData) ThreadLocalUtil.get(GRPC_USER_KEY);
return grpcThreadData.getUserId();
}
public static int getClientType() {
GrpcThreadData grpcThreadData = (GrpcThreadData) ThreadLocalUtil.get(GRPC_USER_KEY);
return grpcThreadData.getClientType();
}
}
public class CommonGrpcServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
String token = metadata.get(Metadata.Key.of(MetadataConstants.AUTHENTICATION, Metadata.ASCII_STRING_MARSHALLER)),
clientType = metadata.get(Metadata.Key.of(MetadataConstants.CLIENT_TYPE, Metadata.ASCII_STRING_MARSHALLER));
if (StringUtils.isBlank(token)) {
log.error("grpc恳求反常,token为空.");
serverCall.close(Status.UNAUTHENTICATED, null);
}
Long userId = JWTUtil.getUserId(token);
if (userId ==null){
log.error("grpc恳求反常,token验证失败.");
serverCall.close(Status.UNAUTHENTICATED, null);
}
log.info("grpc恳求 | clientType:{} | userId:{} | methodName:{}", clientType, userId, serverCall.getMethodDescriptor().getFullMethodName());
ThreadLocalUtil.set(ThreadLocalConstant.GRPC_USER_KEY,
GrpcThreadData.builder()
.userId(userId)
.clientType(Integer.valueOf(clientType))
.build());
return serverCallHandler.startCall(serverCall, metadata);
}
}
经过阻拦器,咱们能够简单的对用户进行鉴权操作,并且每次调用咱们的接口前将用户鉴权相关信息保存到 ThreadLocal
中,就能够在咱们实践的gRpc接口中轻松获取用户鉴权信息了。
这儿顺带再提一下 ServerInterceptor
的其他用法
-
认证和授权:
ServerInterceptor
可用于在服务端对恳求进行认证和授权的处理。经过阻拦恳求,你能够验证客户端的身份并对其进行授权,以确保只需经过验证的客户端才能访问受保护的服务。 -
监控和日志:经过
ServerInterceptor
,你能够阻拦恳求和呼应,以记录和监控服务端的功能指标、恳求时刻、过错日志等信息。这关于追踪问题、功能优化和日志记录十分有用。 -
转化和修正:
ServerInterceptor
允许你在恳求到达服务端之前或呼应回来客户端之前对音讯进行转化和修正。这能够包含音讯格式转化、恳求参数校验、呼应数据的加工处理等,以满意特定事务需求。 -
过错处理:经过
ServerInterceptor
,你能够捕获并处理服务端的反常和过错。这使得你能够自界说过错处理逻辑,例如回来特定的过错码、供给友爱的过错信息等。 -
功能优化:经过
ServerInterceptor
,你能够施行缓存、恳求合并、限流等功能优化策略,以进步服务端的呼应功率和稳定性。
经过完结和注册自界说的 ServerInterceptor
,你能够对 gRPC 服务端进行灵敏的定制和扩展。多个 ServerInterceptor
能够以链式的办法组合,构成阻拦器的处理链,顺次对恳求进行处理。
客户端创立存根的办法
- 堵塞式存根(Blocking Stub): 堵塞式存根供给了同步的办法调用,即客户端在发送恳求后会一直等候服务器的呼应,并回来成果。在办法调用期间,客户端线程会被堵塞,直到服务器呼应或超时。 堵塞式存根的办法一般以
blocking
或sync
最初,例如myMethod
。 - 异步存根(Async Stub): 异步存根供给了非堵塞的办法调用,客户端能够异步发送恳求,然后经过回调机制或 Future 目标来获取呼应。在异步办法调用中,客户端线程不会被堵塞,能够继续执行其他操作。 异步存根的办法一般以
future
、listen
、observe
等关键词最初,例如myMethodAsync
。 - 流存根(Streaming Stub): 流存根用于处理流式数据,支持客户端流、服务器流和双向流三种模式。客户端流表明客户端向服务器发送多个恳求,服务器回来一个呼应。服务器流表明服务器向客户端发送多个呼应,客户端回来一个恳求。双向流表明客户端和服务器之间一起发送多个恳求和呼应。 流存根的办法一般以
stream
、streaming
等关键词最初,例如myMethodStream
。
一元式运用
客户端一元式代码
public class TestClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 32001)
.usePlaintext()
.build();
// 创立一元式堵塞式存根
MicGrpc.MicBlockingStub blockingStub = MicGrpc.newBlockingStub(channel);
// 创立恳求目标
GetMicListRequest request = GetMicListRequest.newBuilder()
.setRoomId(20012520L)
.build();
// 发送恳求麦位列表信息并接纳呼应
GetMicListResponse response = blockingStub.getMicList(request);
// 处理麦位列表信息呼应
System.out.println("Received mic list response: " + response);
}
}
服务端一元式代码
@GrpcService
@Slf4j
public class RoomMicGrpcService extends MicGrpc.MicImplBase {
//注入实践事务服务接口,不便利贴完结代码
@Autowired
private LiveRoomMicClient liveRoomMicClient;
@Override
public void getMicList(GetMicListRequest getMicListRequest, StreamObserver<GetMicListResponse> responseObserver) {
Long userId = GrpcUtil.getUserId(),
roomId = getMicListRequest.getRoomId();
log.info("用户获取麦位信息 | roomId:{} | userId:{}", roomId, userId);
//拿到 8个麦位里的信息
List<LiveRoomMicVo> list = liveRoomMicClient.getMicList(roomId, userId);
//回来数据给客户端
responseObserver.onNext(GrpcBuildMessageUtil.buildMicListResponse(list));
//告知客户端该 gRPC 调用现已完结
responseObserver.onCompleted();
}
}
这样子,一个简单的gRpc一元式接口就完结了。
双向流运用
这儿我直接讲双向流,跳过了客户端流和服务端流的运用,由于双向流是它们两个的扩展,触及的知识点也比较多,所以只需把握了双向流,其他两个都不是什么大问题。
运用场景
在直播间内,有麦位信息更新(上下麦、魅力值改变时)需求对一切用户进行一个信息更新
客户端与服务端树立衔接
proto文件增加一个双向流协议
service Rtmp{
rpc listener(stream RtmpMessage) returns (stream RtmpMessage) {};
}
//实时音讯协议信息
message RtmpMessage{
int64 roomId = 1; //房间ID
MessageType messageType = 2; //音讯类型
repeated MicInfo micList = 3; //麦位信息列表
enum MessageType{
//麦位音讯,entity:MicInfo
MIC = 0;
}
}
这边以Java客户端代码为例,写一个Demo
// 创立与 gRPC 服务器的通道
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 32001)
.usePlaintext() // 在本示例中运用明文衔接,出产环境请运用安全衔接
.build();
//当时房间ID
Long roomId = 20012520L;
// 创立双向流 Stub
RtmpGrpc.RtmpStub stub = RtmpGrpc.newStub(channel);
// 创立双向流观察者
StreamObserver<RtmpMessage> requestObserver = stub.listener(new StreamObserver<RtmpMessage>() {
@Override
public void onNext(RtmpMessage response) {
// 处理从服务端接纳到的呼应音讯
System.out.println("Received response: " + response.getRoomId());
}
@Override
public void onError(Throwable t) {
// 处理过错状况
System.err.println("Error occurred: " + t.getMessage());
}
@Override
public void onCompleted() {
// 处理流完毕的完结状况
System.out.println("Stream completed");
}
});
// 发送音讯到服务端
requestObserver.onNext(RtmpMessage.newBuilder().setMessageType(RtmpMessage.MessageType.MIC)
.setRoomId(roomId)
.build());
requestObserver.onNext(RtmpMessage.newBuilder().setMessageType(RtmpMessage.MessageType.MIC)
.setRoomId(roomId)
.build());
// 完毕流通讯
requestObserver.onCompleted();
// 封闭通道
channel.shutdown();
服务端保护客户端存根
@GrpcService
@Slf4j
public class RoomRtmpGrpcService extends RtmpGrpc.RtmpImplBase {
// 保护客户端存根
private static ConcurrentHashMap<Long, StreamObserver> userStreamMap = new ConcurrentHashMap<>();
@Override
public StreamObserver<RtmpMessage> listener(StreamObserver<RtmpMessage> streamObserver) {
// server => client
Long userId = GrpcUtil.getUserId();
putUserStream(streamObserver);
// client => server
return new StreamObserver<RtmpMessage>() {
@Override
public void onNext(RtmpMessage value) {
log.info("接纳到客户端音讯:rtmp message type:{} | roomId:{}", value.getMessageTypeValue(), value.getRoomId());
}
@Override
public void onError(Throwable t) {
log.info("grpc反常 | {}", t.getMessage());
}
@Override
public void onCompleted() {
log.info("客户端 rtmp 流运用完结");
StreamObserver<RtmpMessage> rtmpMessageStreamObserver = getUserStream(userId);
if (null != rtmpMessageStreamObserver) {
rtmpMessageStreamObserver.onCompleted();
}
removeUserStream(roomId, userId);
}
};
}
private void putUserStream(StreamObserver<RtmpMessage> streamObserver) {
Long userId = GrpcUtil.getUserId();
log.info("客户端翻开双向流 | userId:{}", userId);
userStreamMap.put(userId, streamObserver);
}
private void removeUserStream(Long roomId, Long userId) {
userId = userId == null ? GrpcUtil.getUserId() : userId;
log.info("除掉客户端流 | roomId:{} | userId:{}", roomId, userId);
userStreamMap.remove(userId);
}
private StreamObserver<RtmpMessage> getUserStream(Long userId) {
return userStreamMap.get(userId == null ? GrpcUtil.getUserId() : userId);
}
}
如上所示,客户端的双向流存根咱们就保护好了,接下来能够考虑集群环境下衔接和推送音讯的问题了。
集群模式下gRpc长衔接的处理计划
考虑关键
-
负载均衡:负载均衡器应该能够保护和办理长衔接,确保衔接的均衡分布在后端服务上。负载均衡器应依据后端服务的负载状况和可用性,动态地挑选和分配长衔接。
-
心跳检测:为了坚持长衔接的健康状况,能够运用心跳机制进行守时的衔接检测。心跳检测能够经过守时发送心跳音讯并等候呼应来确认衔接是否依然有用。假如衔接出现反常或超时,则能够进行相应的重连或过错处理。
-
断线重连:在长衔接的状况下,或许会出现网络中止、服务器故障等问题导致衔接中止。在这种状况下,客户端和服务器应该能够进行断线重连,以康复衔接并继续通讯。断线重连机制应该是主动的,并具有恰当的重试策略和指数退避机制,以避免过度的重连尝试。
-
优雅封闭:当客户端或服务器需求封闭衔接时,应该运用优雅封闭的办法来停止衔接。优雅封闭能够确保正在进行的恳求得到处理,并在封闭前完结。这能够经过发送特定的封闭信号或协议音讯来触发。
-
装备调整:长衔接的参数和装备能够依据实践需求进行调整。这包含衔接超时时刻、心跳距离、衔接池大小等。依据负载和功能需求,能够进行恰当的调整和优化。
SpringBoot下gRpc长衔接装备调整优化
grpc:
server:
port: 32001
#当启用保活时,gRPC 将周期性地发送心跳音讯来检测衔接的状况,并坚持衔接处于活动状况
enable-keep-alive: true
#表明当发送完ping packet后多久没收到client回应算超时
keep-alive-timeout: 1s
#表明当grpc衔接没有数据传递时,多久之后开端向client发送ping packet
keep-alive-time: 10s
#是否允许客户端发送坚持活泼的HTTP/2 ping,即便在衔接上没有未完结的rpc。默以为false。
permit-keep-alive-without-calls: true
max-inbound-message-size: 10485760 # 10Mb 该参数操控服务端能够接纳的最大音讯大小。关于高并发场景,主张将其设置为较小的值,以削减网络推迟和内存占用。
客户端断线重连机制
- 守时检测性重连机制
/**
- 运用 `ManagedChannelBuilder` 设置 `usePlaintext()` 或 `useTransportSecurity()` 办法,以指定衔接的安全性。
- 运用 `withWaitForConnected()` 办法设置等候从头衔接的时刻距离。
- 运用 `withReconnectBackoff()` 办法设置重连的退避策略,例如指数退避。
- 经过调用 `build()` 办法创立 `ManagedChannel` 目标。
- 当与服务器的衔接中止时,gRPC 客户端会主动尝试从头衔接。
**/
ManagedChannel channel = ManagedChannelBuilder
.forAddress('localhost', 32001)
.usePlaintext()
.withWaitForConnected()
.withReconnectBackoff(...)
.build();
- 监听衔接封闭重连机制
@Component
public class GRpcClientReconnect implements ApplicationListener<ApplicationReadyEvent> {
@Value("${grpc.server.host}")
private String serverHost;
@Value("${grpc.server.port}")
private int serverPort;
private ManagedChannel channel;
private RtmpGrpc.RtmpStub client;
@PostConstruct
private void initialize() {
channel = createChannel();
client = RtmpGrpc.newStub(channel);
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
// 在运用程序发动后,能够运用 client 执行 gRPC 恳求
}
private ManagedChannel createChannel() {
ManagedChannel channel = ManagedChannelBuilder
.forAddress(serverHost, serverPort)
.usePlaintext()
.build();
// 增加断线重连的 ClientInterceptor
ClientInterceptor reconnectInterceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
// 创立一个新的 Call 实例
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
// 包装 Call 实例,增加断线重连逻辑
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// 创立一个新的监听器,以便处理衔接中止和从头衔接逻辑
Listener<RespT> reconnectingListener = new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
private boolean isReconnecting = false;
@Override
public void onClose(Status status, Metadata trailers) {
// 当衔接封闭时,判别是否需求重连
if (status.getCode() == Status.Code.UNAVAILABLE && !isReconnecting) {
System.out.println("Connection lost. Reconnecting...");
isReconnecting = true;
// 完结自界说的重连逻辑,例如等候一段时刻后从头创立 Channel
// 这儿简单地等候 5 秒后从头创立 Channel
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ManagedChannel newChannel = createChannel();
ClientCall<ReqT, RespT> newCall = newChannel.newCall(method, callOptions);
} else {
super.onClose(status, trailers);
}
}
};
delegate().start(reconnectingListener, headers);
}
};
}
};
// 增加断线重连的 ClientInterceptor
channel = (ManagedChannel) ClientInterceptors.intercept(channel, reconnectInterceptor);
return channel;
}
}
长衔接负载均衡
惯例负载计划
客户端的长衔接经过网关负载均衡或许走到不同的长衔接服务器上,大致如图所示
此架构下处理的问题
- 负载均衡
- 长衔接服务高可用
此架构下存在的问题
- 不确认用户长衔接详细涣散在哪一台机器,无法对用户进行点对点的音讯推送
客户端拉服务路由表,Hash算法负载
针对以上问题咱们能够依据用户ID进行hash算法计算出客户端需求路由到的长衔接服务
此架构下处理的问题
- 负载均衡
- 确认用户详细涣散机器,能进行点对点推送
此架构下存在的问题
- 长衔接服务非高可用,重启、部分机器宕时机导致其下一切衔接受影响
客户端拉本地集群路由表,Hash算法集群负载
针对以上两种计划的问题咱们出个多集群模式的架构计划进行改进
此架构下处理的问题
- 负载均衡
- 长衔接服务高可用
- 确认用户长衔接详细涣散集群机器,可进行精准点对点
- 处理了重启、部分机器宕机导致衔接不可用的问题
存在问题
- 本钱稍大
- 滚动发布时单个集群内至少需求确保有长衔接服务是可用状况
双向流音讯推送
点对点音讯推送
为什么是播送消费呢?
用户长衔接落在一块集群,当然是能够知道在哪一台机器,但由于MQ音讯分发是依据主题和标签的发布/订阅模型,只需播送消费,咱们才能在消费时去过滤音讯中对应的用户ID是否在对应机器上有树立长衔接,有的话推送音讯出去。
代码
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class GrpcRoomRtmpMessageDto extends BaseMqMessage implements Serializable {
/**
* 房间ID
*/
private Long roomId;
/**
* 播送一切房间
*/
private boolean broadcastAllRoom;
/**
* 用户ID
*/
private Long userId;
/**
* 用户ID调集
*/
private Set<Long> userIdSet;
/**
* rtmp音讯类型
*/
private int rtmpMessageType;
/**
* 推送类型
*/
private int pushType;
/**
* 数据
*/
private String data;
}
对应图上gRpc集群服务A中的服务下MQ消费监听
@Slf4j
@Service
@MqGrpcGroupConsumeEvent(event = RocketMqBizConstant.Grpc.Broadcast.ROOM\_RTMP\_MESSAGE)
public class RoomRtmpMqMessageHandle extends BaseGrpcGroupMqHandler {
@Override
public void handleMessage(String message) {
log.info("RTMP事务音讯处理:{}", message);
GrpcRoomRtmpMessageDto grpcRoomRtmpMessageDto = GsonUtil.GsonToBean(message, GrpcRoomRtmpMessageDto.class);
int pushType = grpcRoomRtmpMessageDto.getPushType();
if (pushType == RtmpPushType.PPP.getType()) {
//点对点
if (rtmpMessageType == RtmpMessage.MessageType.ROOM_VALUE) {
//房间点信息对点
if (CollectionUtils.isEmpty(userIdSet)) {
roomRtmpGrpcService.sendRoomChangeMessage(roomId, userId);
} else {
userIdSet.forEach(o -> {
roomRtmpGrpcService.sendRoomChangeMessage(roomId, userId);
});
}
}
}
}
}
public void sendRoomChangeMessage(Long roomId, Long userId) {
StreamObserver streamObserver = userStreamMap.get(userId);
if (null == streamObserver) {
//假如此条音讯不在此实例,直接过滤
return;
}
log.info("推送客户端房间改变音讯 | roomId:{} | userId:{}", roomId, userId);
try {
streamObserver.onNext(RtmpMessage.newBuilder().setMessageType(RtmpMessage.MessageType.ROOM)
.build());
log.info("推送客户端房间改变音讯成功 | roomId:{} | userId:{}", roomId, userId);
} catch (IllegalStateException illegalStateException) {
log.error("客户端长衔接状况反常 | userId:{} | message", userId,illegalStateException.getMessage());
} catch (Exception e) {
log.error(String.format("推送客户端房间改变音讯反常 | roomId:%s | userId:%s", roomId, userId), e);
}
}
播送音讯推送
如图所示,假定图上的用户都在同一个房间,他们的长衔接都在不同的集群、机器上,这个时分假定有一个用户在房间内上麦,需求向房间内一切用户推送麦位信息更新的播送。
代码
对应图上gRpc集群服务A中的服务下MQ消费监听
@Slf4j
@Service
@MqGrpcGroupConsumeEvent(event = RocketMqBizConstant.Grpc.Broadcast.ROOM_RTMP_MESSAGE)
public class RoomRtmpMqMessageHandle extends BaseGrpcGroupMqHandler {
@Override
public void handleMessage(String message) {
log.info("RTMP事务音讯处理:{}", message);
GrpcRoomRtmpMessageDto grpcRoomRtmpMessageDto = GsonUtil.GsonToBean(message, GrpcRoomRtmpMessageDto.class);
int pushType = grpcRoomRtmpMessageDto.getPushType();
if (pushType == RtmpPushType.BROADCAST.getType()) {
//播送
if (rtmpMessageType == RtmpMessage.MessageType.MIC_VALUE) {
//麦位播送
roomRtmpGrpcService.broadcastRoom(roomId, broadcastAllRoom, userId, RtmpMessage.MessageType.MIC);
}
}
}
}
public void broadcastRoom(Long roomId, boolean broadcastAllRoom, Long currentUserId, Long sendUserId, RtmpMessage.MessageType messageType) {
Set<Long> roomUserSet;
if (broadcastAllRoom) {
log.info("全服房间播送");
roomUserSet = userStreamMap.keySet().stream().collect(Collectors.toSet());
} else {
log.info("房间播送 | roomId:{}", roomId);
//事务服务从redis获取房间内一切用户列表
roomUserSet = SetUtil.defaultSet(liveRoomUserClient.getLiveRoomUserList(roomId));
if (CollectionUtils.isEmpty(roomUserSet)) {
log.info("房间没人,不播送 | roomId:{}", roomId);
return;
}
}
if (null == messageType) {
return;
}
if (messageType.equals(RtmpMessage.MessageType.MIC)) {
//播送麦位改变
//事务服务获取全部麦位信息
List<LiveRoomMicVo> list = liveRoomMicClient.getMicList(roomId, currentUserId == null ? GrpcUtil.getUserId() : currentUserId);
GetMicListResponse getMicListResponse = GrpcBuildMessageUtil.buildMicListResponse(list);
roomUserSet.forEach(o -> sendMicChangeMessage(roomId, o, getMicListResponse));
}
}
public void sendMicChangeMessage(Long roomId, Long userId, GetMicListResponse getMicListResponse) {
StreamObserver streamObserver = userStreamMap.get(userId);
if (null == streamObserver) {
return;
}
log.info("推送客户端麦位改变音讯 | roomId:{} | userId:{}", roomId, userId);
try {
streamObserver.onNext(RtmpMessage.newBuilder().setMessageType(RtmpMessage.MessageType.MIC).setRoomId(roomId)
.addAllMicList(getMicListResponse.getMicListList())
.build());
log.info("推送客户端麦位改变音讯成功 | roomId:{} | userId:{}", roomId, userId);
} catch (IllegalStateException illegalStateException) {
log.error("客户端长衔接状况反常 | userId:{} | message", userId,illegalStateException.getMessage());
} catch (Exception e) {
log.error(String.format("推送客户端麦位改变音讯反常 | roomId:%s | userId:%s", roomId, userId), e);
}
}
推送音讯优化
上面的推送音讯代码仅仅一个简单的示例,假如放在出产环境,功能是欠安的,并且或许存在一定问题,主要优化点有以下几点
同一时刻内对同一用户推送多条音讯,音讯没有次序性
咱们都对A-B-A十分熟知了,或许导致的问题就不必多说了,我这直接说常用的几种处理计划
-
序列号:在每条音讯中增加一个序列号字段,客户端能够依据序列号来判别音讯的次序,并进行处理。服务端发送音讯时,依照预订的次序设置序列号。
-
推迟处理:客户端接纳到多条音讯后,能够将这些音讯存储在缓冲区中,依照一定的规矩进行排序和处理。例如,能够运用优先级行列或时刻戳来确认音讯的次序,并逐个处理。
-
约束并发推送:服务端能够约束并发推送给同一用户的音讯数量,确保只需一条音讯被推送并处理,直到客户端完结当时音讯的处理后再推送下一条音讯。
-
引进应对机制:服务端在推送音讯后,等候客户端的应对,确认上一条音讯现已处理完毕,然后再推送下一条音讯。这能够经过客户端发送一个确认音讯的办法完结。
1,2两点很显然客户端处理就行,第4点没有那么必要,在高并发场景下,一切音讯都引进应对机制那功能消耗太大,并且有些播送音讯咱们甚至都不关怀是否需求100%送达,所以第4点一般只合适对推送音讯送达可靠性需求确保的事务,咱们这边服务端这边直接实践第3点 运用Sentinel约束并发推送
@SentinelResource(value = "sendMicChangeMessage", blockHandler = "handleBlock")
public void sendMicChangeMessage(Long roomId, Long userId, GetMicListResponse getMicListResponse) {
StreamObserver streamObserver = userStreamMap.get(userId);
if (null == streamObserver) {
return;
}
log.info("推送客户端麦位改变音讯 | roomId:{} | userId:{}", roomId, userId);
try {
streamObserver.onNext(RtmpMessage.newBuilder().setMessageType(RtmpMessage.MessageType.MIC).setRoomId(roomId)
.addAllMicList(getMicListResponse.getMicListList())
.build());
log.info("推送客户端麦位改变音讯成功 | roomId:{} | userId:{}", roomId, userId);
} catch (IllegalStateException illegalStateException) {
log.error("客户端长衔接状况反常 | userId:{} | message", userId,illegalStateException.getMessage());
} catch (Exception e) {
log.error(String.format("推送客户端麦位改变音讯反常 | roomId:%s | userId:%s", roomId, userId), e);
}
}
public String handleBlock(Long roomId, Long userId, GetMicListResponse getMicListResponse, BlockException ex) {
return "Blocked for user: " + userId;
}
@Component
public class SentinelConfig {
@PostConstruct
public void init() {
// 创立参数流控规矩
List<ParamFlowRule> rules = new ArrayList<>();
// 创立参数并发规矩
ParamFlowRule rule = new ParamFlowRule();
rule.setResource("sendMicChangeMessage"); // 资源称号,与事务接口的 @SentinelResource 注解中的 value 值对应
rule.setParamIdx(1); // 参数索引,0 表明第一个参数
rule.setGrade(RuleConstant.FLOW_GRADE_QPS); // 限流模式为 QPS
rule.setCount(10); // 每秒最大恳求数为 10
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); // 并发约束模式为 RateLimiter
rule.setMaxQueueingTimeMs(100); // 最大排队等候时刻为 100 毫秒
rules.add(rule);
ParamFlowRuleManager.loadRules(rules);
}
}
ok,至此,整个gRpc服务1.0阶段的全链路架构、运用办法都已完毕,便利并不完美,敬请期待后续2.0版别的晋级。