- 本文根据
ThreadLocal
原理,完成了一个上下文状况办理组件Scope
,经过敞开一个自定义的Scope
,在Scope
范围内,能够经过Scope
各个办法读写数据; - 经过自定义线程池完成上下文状况数据的线程间传递;
- 提出了一种根据
Filter
和Scope
的Request
粒度的上下文办理计划。
github:github.com/pengchengSU…
1 ThreadLocal原理
ThreadLocal
主要效果便是完成线程间变量阻隔,关于一个变量,每个线程保护一个自己的实例,避免多线程环境下的资源竞赛,那ThreadLocal
是如何完成这一特性的呢?
图1
从上图可知:
- 每个
Thread
目标中都包含一个ThreadLocal.ThreadLocalMap
类型的threadlocals
成员变量; - 该map对应的每个元素
Entry
目标中:key是ThreadLocal
目标的弱引证,value是该threadlocal
变量在当时线程中的对应的变量实体; - 当某一线程履行获取该
ThreadLocal
目标对应的变量时,首先从当时线程目标中获取对应的threadlocals
哈希表,再以该ThreadLocal
目标为key查询哈希表中对应的value; - 因为每个线程独占一个
threadlocals
哈希表,因此线程间ThreadLocal
目标对应的变量实体也是独占的,不存在竞赛问题,也就避免了多线程问题。
有人可能会问:ThreadLocalMap
是Thread
成员变量(非public,只有包访问权限,Thread和Threadlocal都在java.lang 包下,Thread能够访问ThreadLocal.ThreadLocalMap),定义却在ThreadLocal中,为什么要这么规划?
源码的注释给出了解释:ThreadLocalMap
便是保护线程本地变量规划的,便是让运用者知道ThreadLocalMap
就只做保存线程局部变量这一件事。
set() 办法
public void set(T value) {
Thread t = Thread.currentThread(); //获取当时线程
ThreadLocalMap map = getMap(t); //从当时线程目标中获取threadlocals,该map保存了所用的变量实例
if (map != null) {
map.set(this, value);
} else {
createMap(t, value); //初始threadlocals,并设置当时变量
}
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
get() 办法
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t); //从当时线程目标中获取threadlocals,该map保存了所用的变量实体
if (map != null) {
// 获取当时threadlocal目标对应的变量实体
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 假如map没有初始化,那么在这里初始化一下
return setInitialValue();
}
withInitial()办法
因为经过 ThreadLocal
的 set()
设置的值,只会设置当时线程对应变量实体,无法完成统一初始化一切线程的ThreadLocal
的值。ThreadLocal
提供了一个 withInitial()
办法完成这一功用:
ThreadLocal<String> initValue = ThreadLocal.withInitial(() -> "initValue");
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
// 返回SuppliedThreadLocal类型目标
return new SuppliedThreadLocal<>(supplier);
}
static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {
private final Supplier<? extends T> supplier;
SuppliedThreadLocal(Supplier<? extends T> supplier) {
this.supplier = Objects.requireNonNull(supplier);
}
@Override
protected T initialValue() {
// 获取初始化值
return supplier.get();
}
}
ThreadLocal中的内存走漏问题
由图1可知,ThreadLocal.ThreadLocalMap
对应的Entry
中,key为ThreadLocal
目标的弱引证,办法履行对应栈帧中的ThreadLocal
引证为强引证。当办法履行过程中,因为栈帧毁掉或者主动开释等原因,开释了ThreadLocal
目标的强引证,即表示该ThreadLocal
目标能够被收回了。又因为Entry
中key为ThreadLocal
目标的弱引证,所以当jvm履行GC操作时是能够收回该ThreadLocal
目标的。
而Entry
中value对应的是变量实体目标的强引证,因此开释一个ThreadLocal
目标,是无法开释ThreadLocal.ThreadLocalMap
中对应的value目标的,也就造成了内存走漏。除非开释当时线程目标,这样整个threadlocals
都被收回了。但是日常开发中会常常运用线程池等线程池化技能,开释线程目标的条件往往无法到达。
JDK处理的办法是,在ThreadLocalMap
进行set()
、get()
、remove()
的时分,都会进行整理:
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
//假如key为null,对应的threadlocal目标现已被收回,整理该Entry
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
2 自定义上下文Scope
在工作中,我们常常需求保护一些上下文,这样能够避免在办法调用过程中传入过多的参数,需求查询/修正一些数据的时分,直接在当时上下文中操作就行了。举个详细点的比如:当web服务器收到一个恳求时,需求解析当时登录态的用户,在后续的事务履行流程中都需求这个用户名。
假如只需求保护一个上下文状况数据还比较好处理,能够经过办法传参的形式,履行每个事务办法的时分都经过增加一个表示用户名办法参数传递进去,但是假如需求保护上下文状况数据比较多的话,这个办法就不太高雅了。
一个可行的计划是经过Threadlocal
完成恳求线程的上下文,只要是同一线程的履行过程,不同办法间不传递上下文状况变量,直接操作ThreadLocal
目标完成状况数据的读写。当存在多个上下文状况的话,则需求保护多个ThreadLocal
,好像也能够勉强接受。但是当遇到事务流程中运用线程池的情况下,从Tomcat传递这些ThreadLocal
到线程池中的线程里就变的比较麻烦了。
根据以上考虑,下面介绍一种根据Threadlocal
完成的上下文办理组件Scope
:
Scope.java
public class Scope {
// 静态变量,保护不同线程的上下文Scope
private static final ThreadLocal<Scope> SCOPE_THREAD_LOCAL = new ThreadLocal<>();
// 实例变量,保护每个上下文中一切的状况数据,为了区别不同的状况数据,运用ScopeKey类型的实例作为key
private final ConcurrentMap<ScopeKey<?>, Object> values = new ConcurrentHashMap<>();
// 获取当时上下文
public static Scope getCurrentScope() {
return SCOPE_THREAD_LOCAL.get();
}
// 在当时上下文设置一个状况数据
public <T> void set(ScopeKey<T> key, T value) {
if (value != null) {
values.put(key, value);
} else {
values.remove(key);
}
}
// 在当时上下文读取一个状况数据
public <T> T get(ScopeKey<T> key) {
T value = (T) values.get(key);
if (value == null && key.initializer() != null) {
value = key.initializer().get();
}
return value;
}
// 敞开一个上下文
public static Scope beginScope() {
Scope scope = SCOPE_THREAD_LOCAL.get();
if (scope != null) {
throw new IllegalStateException("start a scope in an exist scope.");
}
scope = new Scope();
SCOPE_THREAD_LOCAL.set(scope);
return scope;
}
// 封闭当时上下文
public static void endScope() {
SCOPE_THREAD_LOCAL.remove();
}
}
ScopeKey.java
public final class ScopeKey<T> {
// 初始化器,参阅 ThreadLocal 的 withInitial()
private final Supplier<T> initializer;
public ScopeKey() {
this(null);
}
public ScopeKey(Supplier<T> initializer) {
this.initializer = initializer;
}
// 统一初始化一切线程的 ScopeKey 对应的值,参阅 ThreadLocal 的 withInitial()
public static <T> ScopeKey<T> withInitial(Supplier<T> initializer) {
return new ScopeKey<>(initializer);
}
public Supplier<T> initializer() {
return this.initializer;
}
// 获取当时上下文中 ScopeKey 对应的变量
public T get() {
Scope currentScope = getCurrentScope();
return currentScope.get(this);
}
// 设置当时上下文中 ScopeKey 对应的变量
public boolean set(T value) {
Scope currentScope = getCurrentScope();
if (currentScope != null) {
currentScope.set(this, value);
return true;
} else {
return false;
}
}
}
运用办法
@Test
public void testScopeKey() {
ScopeKey<String> localThreadName = new ScopeKey<>();
// 不同线程中履行时,敞开独占的 Scope
Runnable r = () -> {
// 敞开 Scope
Scope.beginScope();
try {
String currentThreadName = Thread.currentThread().getName();
localThreadName.set(currentThreadName);
log.info("currentThread: {}", localThreadName.get());
} finally {
// 封闭 Scope
Scope.endScope();
}
};
new Thread(r, "thread-1").start();
new Thread(r, "thread-2").start();
/** 履行成果
* [thread-1] INFO com.example.demo.testscope.TestScope - currentThread: thread-1
* [thread-2] INFO com.example.demo.testscope.TestScope - currentThread: thread-2
*/
}
@Test
public void testWithInitial() {
ScopeKey<String> initValue = ScopeKey.withInitial(() -> "initVal");
Runnable r = () -> {
Scope.beginScope();
try {
log.info("initValue: {}", initValue.get());
} finally {
Scope.endScope();
}
};
new Thread(r, "thread-1").start();
new Thread(r, "thread-2").start();
/** 履行成果
* [thread-1] INFO com.example.demo.testscope.TestScope - initValue: initVal
* [thread-2] INFO com.example.demo.testscope.TestScope - initValue: initVal
*/
}
上面的测验用例中在代码中手动敞开和封闭Scope
不太高雅,能够在Scope
中增加两个个静态办法包装下Runnable
和Supplier
接口:
public static <X extends Throwable> void runWithNewScope(@Nonnull ThrowableRunnable<X> runnable)
throws X {
supplyWithNewScope(() -> {
runnable.run();
return null;
});
}
public static <T, X extends Throwable> T
supplyWithNewScope(@Nonnull ThrowableSupplier<T, X> supplier) throws X {
beginScope();
try {
return supplier.get();
} finally {
endScope();
}
}
@FunctionalInterface
public interface ThrowableRunnable<X extends Throwable> {
void run() throws X;
}
public interface ThrowableSupplier<T, X extends Throwable> {
T get() throws X;
}
以新的Scope
履行,能够这样写:
@Test
public void testRunWithNewScope() {
ScopeKey<String> localThreadName = new ScopeKey<>();
ThrowableRunnable r = () -> {
String currentThreadName = Thread.currentThread().getName();
localThreadName.set(currentThreadName);
log.info("currentThread: {}", localThreadName.get());
};
// 不同线程中履行时,敞开独占的 Scope
new Thread(() -> Scope.runWithNewScope(r), "thread-1").start();
new Thread(() -> Scope.runWithNewScope(r), "thread-2").start();
/** 履行成果
* [thread-2] INFO com.example.demo.TestScope.testscope - currentThread: thread-2
* [thread-1] INFO com.example.demo.TestScope.testscope - currentThread: thread-1
*/
}
3 在线程池中传递Scope
在上一节中完成的Scope
,经过ThreadLocal
完成了了一个自定义的上下文组件,在同一个线程中经过ScopeKey.set()
/ ScopeKey.get()
读写同一个上下文中的状况数据。
现在需求完成这样一个功用,在一个线程履行过程中敞开了一个Scope
,随后运用线程池履行任务,要求在线程池中也能获取当时Scope
中的状况数据。典型的运用场景是:服务收到一个用户恳求,经过Scope
将登陆态数据存到当时线程的上下文中,随后运用线程池履行一些耗时的操作,希望线程池中的线程也能拿到Scope
中的登陆态数据。
因为线程池中的线程和恳求线程不是一个线程,按照目前的完成,线程池中的线程是无法拿到恳求线程上下文中的数据的。
解决办法是,在提交runnable
时,将当时的Scope
引证存到runnable
目标中,当获得线程履行时,将Scope
替换到履行线程中,履行完成后,再康复现场。在Scope
中新增如下静态办法:
// 以给定的上下文履行 Runnable
public static <X extends Throwable> void runWithExistScope(Scope scope, ThrowableRunnable<X> runnable) throws X {
supplyWithExistScope(scope, () -> {
runnable.run();
return null;
});
}
// 以给定的上下文履行 Supplier
public static <T, X extends Throwable> T supplyWithExistScope(Scope scope, ThrowableSupplier<T, X> supplier) throws X {
// 保存现场
Scope oldScope = SCOPE_THREAD_LOCAL.get();
// 替换成外部传入的 Scope
SCOPE_THREAD_LOCAL.set(scope);
try {
return supplier.get();
} finally {
if (oldScope != null) {
// 康复线程
SCOPE_THREAD_LOCAL.set(oldScope);
} else {
SCOPE_THREAD_LOCAL.remove();
}
}
}
完成支撑Scope
切换的自定义线程池ScopeThreadPoolExecutor
:
public class ScopeThreadPoolExecutor extends ThreadPoolExecutor {
ScopeThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public static ScopeThreadPoolExecutor newFixedThreadPool(int nThreads) {
return new ScopeThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* 只要override这一个办法就能够
* 一切submit, invokeAll等办法都会代理到这里来
*/
@Override
public void execute(Runnable command) {
Scope scope = getCurrentScope();
// 提交任务时,把履行 execute 办法的线程中的 Scope 传进去
super.execute(() -> runWithExistScope(scope, command::run));
}
}
测验下ScopeThreadPoolExecutor
是否生效:
@Test
public void testScopeThreadPoolExecutor() {
ScopeKey<String> localVariable = new ScopeKey<>();
Scope.beginScope();
try {
localVariable.set("value out of thread pool");
Runnable r = () -> log.info("localVariable in thread pool: {}", localVariable.get());
// 运用线程池履行,能获取到外部Scope中的数据
ExecutorService executor = ScopeThreadPoolExecutor.newFixedThreadPool(10);
executor.execute(r);
executor.submit(r);
} finally {
Scope.endScope();
}
/** 履行成果
* [pool-1-thread-1] INFO com.example.demo.testscope.TestScope - localVariable in thread pool: value out of thread pool
* [pool-1-thread-2] INFO com.example.demo.testscope.TestScope - localVariable in thread pool: value out of thread pool
*/
}
@Test
public void testScopeThreadPoolExecutor2() {
ScopeKey<String> localVariable = new ScopeKey<>();
Scope.runWithNewScope(() -> {
localVariable.set("value out of thread pool");
Runnable r = () -> log.info("localVariable in thread pool: {}", localVariable.get());
// 运用线程池履行,能获取到外部Scope中的数据
ExecutorService executor = ScopeThreadPoolExecutor.newFixedThreadPool(10);
executor.execute(r);
executor.submit(r);
});
/** 履行成果
* [pool-1-thread-2] INFO com.example.demo.testscope.TestScope - localVariable in thread pool: value out of thread pool
* [pool-1-thread-1] INFO com.example.demo.testscope.TestScope - localVariable in thread pool: value out of thread pool
*/
}
以上两个测验用例,分别经过手动敞开Scope
、借助runWithNewScope
东西办法主动敞开Scope
两种办法验证了自定义线程池ScopeThreadPoolExecutor
的Scope
可传递性。
4 经过Filter、Scope完成Request上下文
接下来介绍如何经过Filter
和Scope
完成Request
粒度的Scope
上下文。思路是:经过注入一个拦截器,在进入拦截器后敞开Scope
作为一个恳求的上下文,解析Request
目标获取获取相关状况信息(如登陆用户),并在Scope
中设置,在脱离拦截器时封闭Scope
。
AuthScope.java
// 获取登录态的东西类
public class AuthScope {
private static final ScopeKey<String> LOGIN_USER = new ScopeKey<>();
public static String getLoginUser() {
return LOGIN_USER.get();
}
public static void setLoginUser(String loginUser) {
if (loginUser == null) {
loginUser = "unknownUser";
}
LOGIN_USER.set(loginUser);
}
}
ScopeFilter.java
@Lazy
@Order(0)
@Service("scopeFilter")
public class ScopeFilter extends OncePerRequestFilter {
@Override
protected String getAlreadyFilteredAttributeName() {
return this.getClass().getName();
}
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response,
FilterChain filterChain) throws ServletException, IOException {
// 敞开Scope
beginScope();
try {
Cookie[] cookies = request.getCookies();
String loginUser = "unknownUser";
if (cookies != null) {
for (Cookie cookie : cookies) {
if (cookie.getName().equals("login_user")) {
loginUser = cookie.getValue();
break;
}
}
}
// 设置该 Request 上下文对用的登陆用户
AuthScope.setLoginUser(loginUser);
filterChain.doFilter(request, response);
} finally {
// 封闭Scope
endScope();
}
}
}
注入Filter
@Slf4j
@Configuration
public class FilterConfig {
@Bean
public FilterRegistrationBean<ScopeFilter> scopeFilterRegistration() {
FilterRegistrationBean<ScopeFilter> registration = new FilterRegistrationBean<>();
registration.setFilter(new ScopeFilter());
registration.addUrlPatterns("/rest/*");
registration.setOrder(0);
log.info("scope filter registered");
return registration;
}
}
UserController.java
@Slf4j
@RestController
@RequestMapping("/rest")
public class UserController {
// curl --location --request GET 'localhost:8080/rest/getLoginUser' --header 'Cookie: login_user=zhangsan'
@GetMapping("/getLoginUser")
public String getLoginUser() {
return AuthScope.getLoginUser();
}
// curl --location --request GET 'localhost:8080/rest/getLoginUserInThreadPool' --header 'Cookie: login_user=zhangsan'
@GetMapping("/getLoginUserInThreadPool")
public String getLoginUserInThreadPool() {
ScopeThreadPoolExecutor executor = ScopeThreadPoolExecutor.newFixedThreadPool(4);
executor.execute(() -> {
log.info("get login user in thread pool: {}", AuthScope.getLoginUser());
});
return AuthScope.getLoginUser();
}
}
经过以下恳求验证,恳求线程和线程池线程是否能获取登录态,其中登录态经过Cookie模仿:
curl --location --request GET 'localhost:8080/rest/getLoginUser' --header 'Cookie: login_user=zhangsan'
curl --location --request GET 'localhost:8080/rest/getLoginUserInThreadPool' --header 'Cookie: login_user=zhangsan'
5 总结
源代码
github:github.com/pengchengSU…