背景

运营反馈,有用户在直播间内发送公屏,A用户发送的公屏 结果直播间内所有人员都显示了B用户发送的,而实践却是A用户发的。

原因剖析

日志排查

  1. 问询运营要到问题截图

【从0-1 千万级直播项目实战】线上拦截器中使用ThreadLocal失效问题排查

  1. 定位公屏内容和出现问题的时刻点
    【从0-1 千万级直播项目实战】线上拦截器中使用ThreadLocal失效问题排查

3.查看log打印代码位置

【从0-1 千万级直播项目实战】线上拦截器中使用ThreadLocal失效问题排查

好家伙,发现服务端没有没错,实践发消息的用户ID和大家收到公屏的发送人不一样? 又让客户端排查了下日志,发现客户端A发送的,实践到了服务端从Token中获取到的当时发送公屏用户竟然是B?

代码剖析

gRpc恳求阻拦器代码

@Slf4j(topic = SLSTopicType.TOPIC_GRPC)
public class CommonGrpcServerInterceptor implements ServerInterceptor {
    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
                                                                 ServerCallHandler<ReqT, RespT> serverCallHandler) {
        String token = metadata.get(Metadata.Key.of(MetadataConstants.AUTHENTICATION, Metadata.ASCII_STRING_MARSHALLER)),
                clientType = metadata.get(Metadata.Key.of(MetadataConstants.CLIENT_TYPE, Metadata.ASCII_STRING_MARSHALLER));
        if (StringUtils.isBlank(token)) {
            log.error("grpc恳求异常,token为空.");
            serverCall.close(Status.UNAUTHENTICATED, null);
        }
        Long userId = JWTUtil.getUserId(token);
        log.info("grpc | token:{} | methodName:{} | userId:{}", token, serverCall.getMethodDescriptor().getFullMethodName(),
                userId);
        try {
            ThreadLocalUtil.set(ThreadLocalConstant.GRPC_USER_KEY,
                    GrpcThreadData.builder()
                            .userId(userId)
                            .clientType(Integer.valueOf(clientType))
                            .build());
            return serverCallHandler.startCall(serverCall, metadata);
        } finally {
            ThreadLocalUtil.remove(ThreadLocalConstant.GRPC_USER_KEY);
        }
    }
}

gRpc发送公屏消息接口

@Override
public void sendMessage(SendBarrageMessageRequest request, StreamObserver<SendBarrageMessageResponse> responseStreamObserver) {
    Long roomId = request.getRoomId(), currentUserId = GrpcUtil.getUserId();
    int type = request.getType().getNumber(), barrageType = BarrageInfo.MessageType.TEXT_VALUE;
    String content = request.getContent();
    log.info("用户发送房间公屏消息 | roomId:{} | currentUserId:{} | content:{} | type {}", roomId, currentUserId, content, type);
    rocketMqTemplate.send(RocketMqBizConstant.User.Cluster.BARRAGE_RECORD_SAVE_MSG, dto);
    String filterContent = content;
    if (type != SendBarrageMessageRequest.Type.EMOJI_VALUE) {
        if (SensitiveWordUtil.WORD_FILTER.include(StringUtils.deleteWhitespace(filterContent))) {
            filterContent = SensitiveWordUtil.WORD_FILTER.replace(StringUtils.deleteWhitespace(content));
        }
    }
    //用户发送公屏
    rocketMqTemplate.send(RocketMqBizConstant.Grpc.Broadcast.ROOM_RTMP_MESSAGE, GrpcRoomRtmpMessageDto.builder()
            .roomId(roomId)
            .pushType(RtmpPushType.BROADCAST.getType())
            .rtmpMessageType(RtmpMessage.MessageType.BARRAGE_VALUE)
            .userId(currentUserId)
            .sendUserId(currentUserId)
            .data(GsonUtil.GsonString(LiveRoomSendBarrageDto.builder()
                    .barrageType(barrageType)
                    .content(filterContent)
                    .build()))
            .build());
    responseStreamObserver.onNext(SendBarrageMessageResponse.newBuilder()
            .setSuccess(true)
            .setContent(filterContent)
            .build());
    responseStreamObserver.onCompleted();
}

每次gRpc恳求进来前,在阻拦其中解析token并将对应的用户ID设置到 ThreadLocal中,这样从接口中可以直接从ThreadLocal的值中获取恳求的用户ID,看着如同没啥问题,莫非是ThreadLocal设置的时候有线程安全问题?

来看一下ThreadLocal设置中的代码

public class ThreadLocalUtil {
    private static final ThreadLocal<Map<String, Object>> threadLocal = ThreadLocal.withInitial(() -> new HashMap<>(10));
    public static Map<String, Object> getThreadLocal() {
        return threadLocal.get();
    }
    public static Object get(String key) {
        Map<String, Object> map = threadLocal.get();
        return map.get(key);
    }
    public static void set(String key, Object value) {
        Map<String, Object> map = threadLocal.get();
        map.put(key, value);
    }
    public static void set(Map<String, Object> keyValueMap) {
        Map<String, Object> map = threadLocal.get();
        map.putAll(keyValueMap);
    }
    public static void remove() {
        threadLocal.remove();
    }
    public static <T> T remove(String key) {
        Map<String, Object> map = threadLocal.get();
        return (T) map.remove(key);
    }
}

ThreadLocal.withInitial(() -> new HashMap<>(10)); 办法可以为每个线程供给独立的初始值,确保每个线程都具有自己的初始状况,看起来是线程安全的,没啥毛病,那就排除了ThreadLocal的问题

继续排查日志-日志上下文剖析

【从0-1 千万级直播项目实战】线上拦截器中使用ThreadLocal失效问题排查

好家伙,恍然大悟,阻拦器中与gRpc接口代码履行的竟然不是同一个线程,大概率是gRpc结构的线程池处理线程不行用时运用线程切换形成,并发不大的时候还真看不出来~ 看到这的时候我已经打自己两巴掌了,由于在我看来这如同算是一个初级的错误…

运用ThreadLocal和gRpc结构供给Context的差异点

有请CHAT-GPT给我们解释一下两者的差异吧:

Context.current().withValue(key, value)ThreadLocalset 办法都可以用于在恳求处理链路中传递数据,但它们之间有一些关键的差异:

  1. 效果范围:Context 是 gRPC 结构供给的上下文传递机制,可以在整个 gRPC 恳求处理链路中传递数据。而 ThreadLocal 是基于线程的局部变量,只在同一线程中有效。

  2. 线程切换:当恳求在 gRPC 结构中进行处理时,可能会涉及到线程的切换。在这种情况下,ThreadLocal 中的值无法在不同的线程之间同享,而 Context 可以跨线程传递数据。

  3. 显式传递:运用 Context 时,您需求在代码中显式地传递上下文对象。在 gRPC 恳求处理链路中,经过调用 Context.current().withValue(key, value) 办法,您可以将数据与当时的上下文相关起来,并在后续的办法调用中获取该数据。

  4. 线程安全性:Context 在规划上是线程安全的,可以在多个线程之间同享数据。它是为了支撑并发环境而规划的,可以在异步恳求和并发恳求处理中安全地传递数据。

总的来说,ContextThreadLocal 比较,更适用于 gRPC 结构中的恳求处理链路,可以在不同的线程和办法调用中传递数据,供给更灵活的上下文传递机制。而 ThreadLocal 则更适用于在单个线程内部同享数据。

运用 Context 时需求留意上述差异,并依据需求选择适宜的机制来传递数据。如果您需求在整个 gRPC 恳求处理链路中传递数据,而且可以跨线程同享,那么运用 Context 是一个更好的选择。

总结

  1. ThreadLocal不要轻易运用在恣意的阻拦器中,除非你能保证阻拦器的代码与实践履行的事务代码严厉在同一个线程里,而不是运用线程池履行,否则在履行线程不行用时线程切换导致ThreadLocal读取失效
  2. 运用新组件时尽量运用组件供给的解决方案,这需求在运用前认真阅览文档,必要时多读读组件源码加深了解。