Распространение ThreadLocal на новую тему, полученную из ExecutorService
Я запускаю процесс в отдельном потоке с таймаутом, используя ExecutorService и Будущее (пример кода здесь) (происходит поток "нереста" в AOP Aspect).
Теперь основной поток представляет собой запрос Resteasy. Resteasy использует одну или несколько переменных ThreadLocal для хранения некоторой информации контекста, которую мне нужно получить в какой-то момент в вызове метода Rest. Проблема в том, что, поскольку поток Resteasy запущен в новом потоке, переменные ThreadLocal arelost.
Каким будет лучший способ "распространять" любую переменную ThreadLocal, используемую Resteasy для нового потока? Кажется, что Resteasy использует более одной переменной ThreadLocal для отслеживания контекстной информации, и я хотел бы "слепо" передать всю информацию в новый поток.
Я просмотрел подклассификацию ThreadPoolExecutor
и используя метод beforeExecute, чтобы передать текущий поток в пул, но я не мог 't найти способ передать переменные ThreadLocal в пул.
Любое предложение?
Спасибо
Ответы
Ответ 1
Набор экземпляров ThreadLocal
, связанных с потоком, хранится в частных членах каждого Thread
. Ваш единственный шанс перечислить их - сделать некоторое отражение на Thread
; таким образом, вы можете переопределить ограничения доступа в полях потока.
Как только вы можете получить набор ThreadLocal
, вы можете скопировать в потоки фона с помощью beforeExecute()
и afterExecute()
перехватчиков ThreadPoolExecutor
, или создав обертку Runnable
для ваших задач, которая перехватывает run()
вызов, чтобы установить ненужные экземпляры ThreadLocal
. Фактически, последний способ может работать лучше, поскольку он предоставит вам удобное место для хранения значений ThreadLocal
во время постановки задачи в очередь.
Обновление: здесь более конкретная иллюстрация второго подхода. Вопреки моему первоначальному описанию, все, что хранится в оболочке, является вызывающим потоком, который допрашивается при выполнении задачи.
static Runnable wrap(Runnable task)
{
Thread caller = Thread.currentThread();
return () -> {
Iterable<ThreadLocal<?>> vars = copy(caller);
try {
task.run();
}
finally {
for (ThreadLocal<?> var : vars)
var.remove();
}
};
}
/**
* For each {@code ThreadLocal} in the specified thread, copy the thread
* value to the current thread.
*
* @param caller the calling thread
* @return all of the {@code ThreadLocal} instances that are set on current thread
*/
private static Collection<ThreadLocal<?>> copy(Thread caller)
{
/* Use a nasty bunch of reflection to do this. */
throw new UnsupportedOperationException();
}
Ответ 2
Как я понимаю вашу проблему, вы можете взглянуть на InheritableThreadLocal, который предназначен для передачи переменных ThreadLocal
из контекста родительского потока Контекст дочернего потока
Ответ 3
Основываясь на ответе @erickson, я написал этот код. Он работает для inheritableThreadLocals. Он создает список inheritableThreadLocals, используя тот же метод, что и в контрукторе Thread. Конечно, я использую размышления, чтобы сделать это. Также я переопределяю класс исполнителя.
public class MyThreadPoolExecutor extends ThreadPoolExecutor
{
@Override
public void execute(Runnable command)
{
super.execute(new Wrapped(command, Thread.currentThread()));
}
}
Упаковочный:
private class Wrapped implements Runnable
{
private final Runnable task;
private final Thread caller;
public Wrapped(Runnable task, Thread caller)
{
this.task = task;
this.caller = caller;
}
public void run()
{
Iterable<ThreadLocal<?>> vars = null;
try
{
vars = copy(caller);
}
catch (Exception e)
{
throw new RuntimeException("error when coping Threads", e);
}
try {
task.run();
}
finally {
for (ThreadLocal<?> var : vars)
var.remove();
}
}
}
метод копирования:
public static Iterable<ThreadLocal<?>> copy(Thread caller) throws Exception
{
List<ThreadLocal<?>> threadLocals = new ArrayList<>();
Field field = Thread.class.getDeclaredField("inheritableThreadLocals");
field.setAccessible(true);
Object map = field.get(caller);
Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
table.setAccessible(true);
Method method = ThreadLocal.class
.getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap"));
method.setAccessible(true);
Object o = method.invoke(null, map);
Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals");
field2.setAccessible(true);
field2.set(Thread.currentThread(), o);
Object tbl = table.get(o);
int length = Array.getLength(tbl);
for (int i = 0; i < length; i++)
{
Object entry = Array.get(tbl, i);
Object value = null;
if (entry != null)
{
Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod(
"get");
referentField.setAccessible(true);
value = referentField.invoke(entry);
threadLocals.add((ThreadLocal<?>) value);
}
}
return threadLocals;
}
Ответ 4
Вот пример, чтобы передать текущий LocaleContext в родительском потоке дочерний поток, натянутый на CompletableFuture [По умолчанию используется ForkJoinPool].
Просто определите все, что вы хотели сделать в дочернем потоке внутри блока Runnable. Поэтому, когда CompletableFuture выполняет блок Runnable, его дочерний поток, который находится под контролем и voila, у вас есть родительский материал ThreadLocal, установленный в Child ThreadLocal.
Проблема здесь заключается не в полном копировании ThreadLocal. Скопирован только файл LocaleContext. Поскольку ThreadLocal имеет частный доступ только к Thread, он тоже использует Reflection и пытается получить и установить в Child, это слишком много шуточных вещей, которые могут привести к утечке памяти или повышению производительности.
Итак, если вы знаете параметры, которые вас интересуют от ThreadLocal, то это решение работает чище.
public void parentClassMethod(Request request) {
LocaleContext currentLocale = LocaleContextHolder.getLocaleContext();
executeInChildThread(() -> {
LocaleContextHolder.setLocaleContext(currentLocale);
//Do whatever else you wanna do
}));
//Continue stuff you want to do with parent thread
}
private void executeInChildThread(Runnable runnable) {
try {
CompletableFuture.runAsync(runnable)
.get();
} catch (Exception e) {
LOGGER.error("something is wrong");
}
}
Ответ 5
Мне не нравится подход Reflection. Альтернативным решением было бы реализовать оболочку исполнителя и передать объект непосредственно в контексте ThreadLocal
ко всем дочерним потокам, распространяющим родительский контекст.
public class PropagatedObject {
private ThreadLocal<ConcurrentHashMap<AbsorbedObjectType, Object>> data = new ThreadLocal<>();
//put, set, merge methods, etc
}
== >
public class ObjectAwareExecutor extends AbstractExecutorService {
private final ExecutorService delegate;
private final PropagatedObject objectAbsorber;
public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber){
this.delegate = delegate;
this.objectAbsorber = objectAbsorber;
}
@Override
public void execute(final Runnable command) {
final ConcurrentHashMap<String, Object> parentContext = objectAbsorber.get();
delegate.execute(() -> {
try{
objectAbsorber.set(parentContext);
command.run();
}finally {
parentContext.putAll(objectAbsorber.get());
objectAbsorber.clean();
}
});
objectAbsorber.merge(parentContext);
}
Ответ 6
Если вы посмотрите код ThreadLocal, вы можете увидеть:
public T get() {
Thread t = Thread.currentThread();
...
}
текущий поток не может быть перезаписан.
Возможные решения:
-
Посмотрите на механизм java 7 fork/join (но я думаю, что это плохой способ)
-
Посмотрите одобренный механизм, чтобы перезаписать класс ThreadLocal
в вашей JVM.
-
Попробуйте переписать RESTEasy (вы можете использовать инструменты Refactor в своей среде IDE для замены всего использования ThreadLocal, это выглядит легко)