TransmittableThreadLocal线程间传递逻辑 - 简书 (jianshu.com)
通过测试,确实可以

简单的测试代码如下
private final static TransmittableThreadLocal<Integer> LOCAL = new TransmittableThreadLocal<>();
@PostMapping("/a")
public void test() {
final int num = ThreadLocalRandom.current().nextInt(0, 1500);
LOCAL.set(num);
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 150; i++) {
list.add(i);
}
list.stream().parallel().forEach(o -> {
if (!Objects.equals(LOCAL.get(), num)) {
System.out.println("如果不同代表没有正确传递噢");
}
});
}
通过上述代码及通过javaagent引入jar包后不用手动设置TTL的线程池装饰器也可以了!我们接下来看看其中的奥秘!(javaAgent技术大家可以自行搜索学习一下,例如skywalking等无侵入场景应用)
// 通过premain 进行字节码操作添加我们想要的拦截,(attach,premain两个java agent核心方法大家可以搜索详细了解)
public static void premain(String agentArgs, @Nonnull Instrumentation inst) {
// 一个 volatile 变量,不清除这里volatile的必要性,此文章暂不关注
kvs = splitCommaColonStringToKV(agentArgs);
// 日志打印的一个多态,可以通过参数设置。默认只打印 StdErr,也可以设置为stdout
Logger.setLoggerImplType(getLogImplTypeFromAgentArgs(kvs));
final Logger logger = Logger.getLogger(TtlAgent.class);
try {
logger.info("[TtlAgent.premain] begin, agentArgs: " + agentArgs + ", Instrumentation: " + inst);
// 如果是用javaAgent无侵入的实现TTL逻辑(上一篇讲到正常使用TTL的装饰器线程池来实现)默认是开启的(TimerTask默认不开启) 这里是查看线程池的配置,默认开启,可以在jvm参数设置为关闭
final boolean disableInheritable = isDisableInheritableForThreadPool();
// 需要修改字节码的列表, 通过javassist修改字节码 JavassistTransformlet是TTL 做的抽象对jdk可能出现的3种出现线程传递的情况分别做了实现
final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>();
// 线程池的
transformletList.add(new TtlExecutorTransformlet(disableInheritable));
// forkjoin的
transformletList.add(new TtlForkJoinTransformlet(disableInheritable));
// timer task默认关闭,如果设置开启也加入
if (isEnableTimerTask()) transformletList.add(new TtlTimerTaskTransformlet());
// 这是jdk提供的接口,给客户端提供对class 字节码做增强的入口
final ClassFileTransformer transformer = new TtlTransformer(transformletList);
// 将TTL的字节码增强逻辑 织入,加载对应class时调用(也取决于要增强的class对象load时机)
inst.addTransformer(transformer, true);
logger.info("[TtlAgent.premain] addTransformer " + transformer.getClass() + " success");
logger.info("[TtlAgent.premain] end");
// 如果使用了 javaAgent 增强,你再手动给线程池包裹装饰器则会直接返回,不需要包装了
ttlAgentLoaded = true;
} catch (Exception e) {
String msg = "Fail to load TtlAgent , cause: " + e.toString();
logger.log(Level.SEVERE, msg, e);
throw new IllegalStateException(msg, e);
}
}
上面贴出了TTL使用javaAgent的premain进行字节码增强的流程下面看具体的实现以及如何使用jdk暴露的ClassFileTransformer接口进行操作字节码
先来看TtlTransformer ,我们的字节码增强逻辑的桥梁
public class TtlTransformer implements ClassFileTransformer {
private static final Logger logger = Logger.getLogger(TtlTransformer.class);
private static final byte[] EMPTY_BYTE_ARRAY = {};
// 要被增强的列表
private final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>();
TtlTransformer(List<? extends JavassistTransformlet> transformletList) {
// 复制到当前类的成员变量
for (JavassistTransformlet transformlet : transformletList) {
this.transformletList.add(transformlet);
logger.info("[TtlTransformer] add Transformlet " + transformlet.getClass() + " success");
}
}
@Override
public final byte[] transform(@Nonnull final ClassLoader loader, @Nullable final String classFile, final Class<?> classBeingRedefined,
final ProtectionDomain protectionDomain, final byte[] classFileBuffer) {
try {
// Lambda has no class file, no need to transform, just return.
if (classFile == null) return EMPTY_BYTE_ARRAY;
// 获取到当前加载的class的类名
final String className = toClassName(classFile);
for (JavassistTransformlet transformlet : transformletList) {
// 调用子类实现
final byte[] bytes = transformlet.doTransform(className, classFileBuffer, loader);
if (bytes != null) return bytes;
}
} catch (Throwable t) {
String msg = "Fail to transform class " + classFile + ", cause: " + t.toString();
logger.log(Level.SEVERE, msg, t);
throw new IllegalStateException(msg, t);
}
return EMPTY_BYTE_ARRAY;
}
private static String toClassName(final String classFile) {
return classFile.replace('/', '.');
}
}
下面来看具体的实现有四个

先看TtlExecutorTransformlet
// 为什么把源码中的注释放进来,因为TTL的注释写的还是挺全。
/**
* TTL {@link JavassistTransformlet} for {@link java.util.concurrent.Executor}.
*
* @author Jerry Lee (oldratlee at gmail dot com)
* @author wuwen5 (wuwen.55 at aliyun dot com)
// 通过字节码操作会覆盖以下线程执行相关类
* @see java.util.concurrent.Executor
* @see java.util.concurrent.ExecutorService
* @see java.util.concurrent.ThreadPoolExecutor
* @see java.util.concurrent.ScheduledThreadPoolExecutor
* @see java.util.concurrent.Executors
// 这是也是TTL的一个实现,之后再看
* @see TtlPriorityBlockingQueueTransformlet
* @since 2.5.1
*/
public class TtlExecutorTransformlet implements JavassistTransformlet {
private static final Logger logger = Logger.getLogger(TtlExecutorTransformlet.class);
// 装我们要拦截操作字节码的类
private static final Set<String> EXECUTOR_CLASS_NAMES = new HashSet<String>();
// 这次套路变了,如果使用javaAgent不用一层一层装饰再去增强了
// 我们的目的就是 在thread执行任务时候,
//任务: 初始化 [捕获capture] -> 目标执行前 [重放replay] -> 目标执行后 还原 [还原restore]即可
// 现在我们聚焦在 Runnable 和 Callable即可,TTL的切入点是在各种线程池
//(如果你直接new,这个原生就支持,可以看看前一篇文章,就算你使用TTL的父类InheritableThreadLocal在new thread时会将第一次主线程的ThreadLocalMap的引用都带过去噢,想一下childValue这个方法)
// 所以我们只要对所有线程池的 参数中含有 Runnable和 Callable替换成Ttl的增强装饰器即可!
private static final Map<String, String> PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS = new HashMap<String, String>();
private static final String THREAD_POOL_EXECUTOR_CLASS_NAME = "java.util.concurrent.ThreadPoolExecutor";
private static final String RUNNABLE_CLASS_NAME = "java.lang.Runnable";
static { EXECUTOR_CLASS_NAMES.add(THREAD_POOL_EXECUTOR_CLASS_NAME);
EXECUTOR_CLASS_NAMES.add("java.util.concurrent.ScheduledThreadPoolExecutor"); PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.put(RUNNABLE_CLASS_NAME, "com.alibaba.ttl.TtlRunnable"); PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.put("java.util.concurrent.Callable", "com.alibaba.ttl.TtlCallable");
}
private static final String THREAD_FACTORY_CLASS_NAME = "java.util.concurrent.ThreadFactory";
private final boolean disableInheritableForThreadPool;
public TtlExecutorTransformlet(boolean disableInheritableForThreadPool) {
this.disableInheritableForThreadPool = disableInheritableForThreadPool;
}
@Override
public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException {
//看issues可以知道为什么要 屏蔽java.util,但是具体原因还不明确。
// 因为所有类加载都会进入javaAgent的 premain方法,对应的实现自己选择去操作哪些类的字节码,例如skywalking那么多插件,就是用全类名一个一个指定常见开源框架的可做拦截逻辑的类,然后去添加apm的逻辑增强
// work-around ClassCircularityError:
// https://github.com/alibaba/transmittable-thread-local/issues/278
// https://github.com/alibaba/transmittable-thread-local/issues/234
if (isClassAtPackageJavaUtil(classInfo.getClassName())) return;
final CtClass clazz = classInfo.getCtClass();
// 指定的类
if (EXECUTOR_CLASS_NAMES.contains(classInfo.getClassName())) {
for (CtMethod method : clazz.getDeclaredMethods()) {
updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(method);
}
// 首先DisableInheritableThreadFactory和我之前的理解有些出入,但是看实现确实是一个屏蔽TTL的逻辑实现,但是如果使用agent需要添加jvm参数,那么去掉参数了就不会进入当前jar的premain方法
// 而且经过测试,就算让这里为true也不能屏蔽什么,因为就算替换了ThreadFactory但是Runnable,Callable已经在类加载时字节码层面替换为TTL的装饰器了,这里好像亡羊补牢了 - -!虽然不明白但是这里不影响我们使用逻辑
if (disableInheritableForThreadPool) updateConstructorDisableInheritable(clazz);
// 标记类被字节码增强了
classInfo.setModified();
} else {
// 如果不是被指定增强的类
// 基本类型数组,接口,注解直接返回
if (clazz.isPrimitive() || clazz.isArray() || clazz.isInterface() || clazz.isAnnotation()) {
return;
}
// 这里利用了 javassist一些操作,去获取已经加载的类对象,判断当前类是否是指定要被增强类的子类,后续要做一些操作
// 包括其他基于java实现的框架对线程池的扩展
// 我们debug启动时可以看到 tomcat线程池org.apache.tomcat.util.threads.ThreadPoolExecutor 继承了jdk的线程池
// 我们就可以进入到方法内,后续看下面的方法
if (!clazz.subclassOf(clazz.getClassPool().get(THREAD_POOL_EXECUTOR_CLASS_NAME))) return;
logger.info("Transforming class " + classInfo.getClassName());
// 返回一个 操作字节码的结果然后标记该类是否成功 增强了
final boolean modified = updateBeforeAndAfterExecuteMethodOfExecutorSubclass(clazz);
if (modified) classInfo.setModified();
}
}
/**
* @see com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils#doAutoWrap(Runnable)
* @see com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils#doAutoWrap(Callable)
*/
@SuppressFBWarnings("VA_FORMAT_STRING_USES_NEWLINE") // [ERROR] Format string should use %n rather than \n
private void updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(@NonNull final CtMethod method) throws NotFoundException, CannotCompileException {
final int modifiers = method.getModifiers();
if (!Modifier.isPublic(modifiers) || Modifier.isStatic(modifiers)) return;
CtClass[] parameterTypes = method.getParameterTypes();
StringBuilder insertCode = new StringBuilder();
for (int i = 0; i < parameterTypes.length; i++) {
// 核心的逻辑,我们只关注 Runnable,Callable,然后对其线程池所有方法含有该参数进行字节码增强,在进入方法并执行方法前替换为TTL的装饰器
final String paramTypeName = parameterTypes[i].getName();
if (PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.containsKey(paramTypeName)) {
String code = String.format(
// auto decorate to TTL wrapper
"$%d = com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doAutoWrap($%<d);",
i + 1);
insertCode.append(code);
}
}
if (insertCode.length() > 0) {
logger.info("insert code before method " + signatureOfMethod(method) + " of class " +
method.getDeclaringClass().getName() + ":\n" + insertCode);
method.insertBefore(insertCode.toString());
}
}
// 省略替换 ThreadFactory的代码
/**
* @see Utils#doUnwrapIfIsAutoWrapper(Runnable)
*/
private boolean updateBeforeAndAfterExecuteMethodOfExecutorSubclass(@NonNull final CtClass clazz) throws NotFoundException, CannotCompileException {
final CtClass runnableClass = clazz.getClassPool().get(RUNNABLE_CLASS_NAME);
final CtClass threadClass = clazz.getClassPool().get("java.lang.Thread");
final CtClass throwableClass = clazz.getClassPool().get("java.lang.Throwable");
boolean modified = false;
try {
// ScheduledThreadPoolExecutor 也是继承自ThreadPoolExecutor,TTL就是利用了ThreadPoolExecutor#beforeExecute(Thread t, Runnable r)
// 和ThreadPoolExecutor#afterExecute(Runnable r, Throwable t)
//这是jdk线程池提供的两个钩子,TTL对第三方框架的线程池类名完全不了解,所以我们无法直接拿到子类含有Runnable的方法(我们不知道继承了几层,无法通过反射去拿到method对象)
// 那么TTL目前通过jdk的 beforeExecute,和 afterExecute来对其他框架或者自定义的线程池进行去除TTL装饰器的逻辑
//猜测目的是无法保证正确控制线程间正确传递,我们直接消除测试开发上的迷惑行为
final CtMethod beforeExecute = clazz.getDeclaredMethod("beforeExecute", new CtClass[]{threadClass, runnableClass});
// unwrap runnable if IsAutoWrapper
String code = "$2 = com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doUnwrapIfIsAutoWrapper($2);";
logger.info("insert code before method " + signatureOfMethod(beforeExecute) + " of class " +
beforeExecute.getDeclaringClass().getName() + ": " + code);
beforeExecute.insertBefore(code);
modified = true;
} catch (NotFoundException e) {
// 前提是 目标线程池重写了 beforeExecute 才能进行unwraper操作
// clazz does not override beforeExecute method, do nothing.
}
// 下面逻辑类似 拦截afterExecute
try {
final CtMethod afterExecute = clazz.getDeclaredMethod("afterExecute", new CtClass[]{runnableClass, throwableClass});
// unwrap runnable if IsAutoWrapper
String code = "$1 = com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doUnwrapIfIsAutoWrapper($1);";
logger.info("insert code before method " + signatureOfMethod(afterExecute) + " of class " +
afterExecute.getDeclaringClass().getName() + ": " + code);
afterExecute.insertBefore(code);
modified = true;
} catch (NotFoundException e) {
// clazz does not override afterExecute method, do nothing.
}
return modified;
}
上面看了 典型的 ThreadPoolExecutor和ScheduledThreadPoolExecutor的增强以及对第三方扩展线程池一定程度上的(重写jdk线程池的钩子方法才会操作)消除歧义
下面来看关联的TtlPriorityBlockingQueueTransformlet
public class TtlPriorityBlockingQueueTransformlet implements JavassistTransformlet {
private static final Logger logger = Logger.getLogger(TtlPriorityBlockingQueueTransformlet.class);
// 我们要拦截 关于优先级队列的线程池
// https://github.com/alibaba/transmittable-thread-local/issues/330
// 这里有前因后果,因为runnable等接口也实现了 可排序接口,会castClass错误
private static final String PRIORITY_BLOCKING_QUEUE_CLASS_NAME = "java.util.concurrent.PriorityBlockingQueue";
private static final String PRIORITY_QUEUE_CLASS_NAME = "java.util.PriorityQueue";
private static final String COMPARATOR_CLASS_NAME = "java.util.Comparator";
private static final String COMPARATOR_FIELD_NAME = "comparator";
@Override
public void doTransform(@NonNull ClassInfo classInfo) throws IOException, CannotCompileException, NotFoundException {
final String className = classInfo.getClassName();
// 一共两种优先级队列的拦截,但增强逻辑是一样的
if (PRIORITY_BLOCKING_QUEUE_CLASS_NAME.equals(className)) {
updatePriorityBlockingQueueClass(classInfo.getCtClass());
classInfo.setModified();
}
if (PRIORITY_QUEUE_CLASS_NAME.equals(className)) {
updateBlockingQueueClass(classInfo.getCtClass());
classInfo.setModified();
}
}
private void updatePriorityBlockingQueueClass(@NonNull final CtClass clazz) throws CannotCompileException, NotFoundException {
if (!haveComparatorField(clazz)) {
// In Java 6, PriorityBlockingQueue implementation do not have field comparator,
// need transform more fundamental class PriorityQueue
logger.info(PRIORITY_BLOCKING_QUEUE_CLASS_NAME + " do not have field " + COMPARATOR_FIELD_NAME +
", transform " + PRIORITY_QUEUE_CLASS_NAME + " instead.");
return;
}
modifyConstructors(clazz);
}
private void updateBlockingQueueClass(@NonNull final CtClass clazz) throws CannotCompileException, NotFoundException {
final CtClass classPriorityBlockingQueue = clazz.getClassPool().getCtClass(PRIORITY_BLOCKING_QUEUE_CLASS_NAME);
if (haveComparatorField(classPriorityBlockingQueue)) return;
logger.info(PRIORITY_BLOCKING_QUEUE_CLASS_NAME + " do not have field " + COMPARATOR_FIELD_NAME +
", so need transform " + PRIORITY_QUEUE_CLASS_NAME);
modifyConstructors(clazz);
}
private static boolean haveComparatorField(CtClass clazz) {
try {
clazz.getDeclaredField(COMPARATOR_FIELD_NAME);
return true;
} catch (NotFoundException e) {
return false;
}
}
/**
* @see #wrapComparator$by$ttl(Comparator)
*/
private static final String WRAP_METHOD_NAME = "wrapComparator$by$ttl";
/**
* wrap comparator field in constructors
*
* @see #COMPARATOR_FIELD_NAME
*/
private static final String AFTER_CODE_REWRITE_FILED = String.format("this.%s = %s.%s(this.%1$s);",
COMPARATOR_FIELD_NAME, TtlPriorityBlockingQueueTransformlet.class.getName(), WRAP_METHOD_NAME
);
private static void modifyConstructors(@NonNull CtClass clazz) throws NotFoundException, CannotCompileException {
for (CtConstructor constructor : clazz.getDeclaredConstructors()) {
final CtClass[] parameterTypes = constructor.getParameterTypes();
final StringBuilder beforeCode = new StringBuilder();
for (int i = 0; i < parameterTypes.length; i++) {
///////////////////////////////////////////////////////////////
// rewrite Comparator constructor parameter
///////////////////////////////////////////////////////////////
final String paramTypeName = parameterTypes[i].getName();
// 这里的拦截角度比较清奇,因为runnable实现了 Comparator,所有入队的runable都会执行排序方法,那么只要对compator方法增强即可,老配方,在方法执行前拿到参数列表找到带有Comparator的runnable装饰它
if (COMPARATOR_CLASS_NAME.equals(paramTypeName)) {
String code = String.format("$%d = %s.%s($%1$d);",
i + 1, TtlPriorityBlockingQueueTransformlet.class.getName(), WRAP_METHOD_NAME
);
beforeCode.append(code);
}
}
if (beforeCode.length() > 0) {
logger.info("insert code before constructor " + signatureOfMethod(constructor) + " of class " +
constructor.getDeclaringClass().getName() + ": " + beforeCode);
constructor.insertBefore(beforeCode.toString());
}
///////////////////////////////////////////////////////////////
// rewrite Comparator class field
///////////////////////////////////////////////////////////////
logger.info("insert code after constructor " + signatureOfMethod(constructor) + " of class " +
constructor.getDeclaringClass().getName() + ": " + AFTER_CODE_REWRITE_FILED);
constructor.insertAfter(AFTER_CODE_REWRITE_FILED);
}
}
/**
* @see TtlExecutors#getTtlRunnableUnwrapComparatorForComparableRunnable()
* @see TtlExecutors#getTtlRunnableUnwrapComparator(Comparator)
*/
public static Comparator<Runnable> wrapComparator$by$ttl(Comparator<Runnable> comparator) {
if (comparator == null) return TtlExecutors.getTtlRunnableUnwrapComparatorForComparableRunnable();
return TtlExecutors.getTtlRunnableUnwrapComparator(comparator);
}
}
接下来看一下TtlTimerTaskTransformlet
public class TtlTimerTaskTransformlet implements JavassistTransformlet {
private static final Logger logger = Logger.getLogger(TtlTimerTaskTransformlet.class);
private static final String TIMER_TASK_CLASS_NAME = "java.util.TimerTask";
private static final String RUN_METHOD_NAME = "run";
@Override
public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException {
// work-around ClassCircularityError:
if (isClassAtPackageJavaUtil(classInfo.getClassName())) return;
// TimerTask class is checked by above logic.
//
// if (TIMER_TASK_CLASS_NAME.equals(classInfo.getClassName())) return; // No need transform TimerTask class
final CtClass clazz = classInfo.getCtClass();
if (clazz.isPrimitive() || clazz.isArray() || clazz.isInterface() || clazz.isAnnotation()) {
return;
}
// class contains method `void run()` ?
//TimerTask 简单粗暴就是要找 void run() 方法,因为它本身就是Runnable的子类并且有一个抽象方法 void run(),那么我们只要找到所有TimerTask子类找到其实现的run方法增强即可
try {
final CtMethod runMethod = clazz.getDeclaredMethod(RUN_METHOD_NAME, new CtClass[0]);
if (!CtClass.voidType.equals(runMethod.getReturnType())) return;
} catch (NotFoundException e) {
return;
}
if (!clazz.subclassOf(clazz.getClassPool().get(TIMER_TASK_CLASS_NAME))) return;
logger.info("Transforming class " + classInfo.getClassName());
updateTimerTaskClass(clazz);
classInfo.setModified();
}
/**
* @see Utils#doCaptureWhenNotTtlEnhanced(java.lang.Object)
*/
private void updateTimerTaskClass(@NonNull final CtClass clazz) throws CannotCompileException, NotFoundException {
final String className = clazz.getName();
// add new field
//新增一个 用于中转捕获主线程对象的变量
final String capturedFieldName = "captured$field$added$by$ttl";
final CtField capturedField = CtField.make("private final Object " + capturedFieldName + ";", clazz);
clazz.addField(capturedField, "com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doCaptureWhenNotTtlEnhanced(this);");
logger.info("add new field " + capturedFieldName + " to class " + className);
final CtMethod runMethod = clazz.getDeclaredMethod(RUN_METHOD_NAME, new CtClass[0]);
final String beforeCode = "Object backup = com.alibaba.ttl.TransmittableThreadLocal.Transmitter.replay(" + capturedFieldName + ");";
final String finallyCode = "com.alibaba.ttl.TransmittableThreadLocal.Transmitter.restore(backup);";
// 直接增强方法就不能用装饰器了,直接修改方法加入一个try{}finally逻辑,这本身就是装饰器内部的逻辑
// 这部分是一些代码拼接就不看了
doTryFinallyForMethod(runMethod, beforeCode, finallyCode);
}
}
public class TtlForkJoinTransformlet implements JavassistTransformlet {
private static final Logger logger = Logger.getLogger(TtlForkJoinTransformlet.class);
private static final String FORK_JOIN_TASK_CLASS_NAME = "java.util.concurrent.ForkJoinTask";
private static final String FORK_JOIN_POOL_CLASS_NAME = "java.util.concurrent.ForkJoinPool";
private static final String FORK_JOIN_WORKER_THREAD_FACTORY_CLASS_NAME = "java.util.concurrent.ForkJoinPool$ForkJoinWorkerThreadFactory";
private final boolean disableInheritableForThreadPool;
public TtlForkJoinTransformlet(boolean disableInheritableForThreadPool) {
this.disableInheritableForThreadPool = disableInheritableForThreadPool;
}
@Override
public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException {
if (FORK_JOIN_TASK_CLASS_NAME.equals(classInfo.getClassName())) {
//如果是 forkJoin的task
updateForkJoinTaskClass(classInfo.getCtClass());
classInfo.setModified();
} else if (disableInheritableForThreadPool && FORK_JOIN_POOL_CLASS_NAME.equals(classInfo.getClassName())) {
// forkJoin的 pool 或者 forkJoin的ThreadFactory
updateConstructorDisableInheritable(classInfo.getCtClass());
classInfo.setModified();
}
}
/**
* @see Utils#doCaptureWhenNotTtlEnhanced(java.lang.Object)
*/
private void updateForkJoinTaskClass(@NonNull final CtClass clazz) throws CannotCompileException, NotFoundException {
final String className = clazz.getName();
// 如果是 ForkJoinTask 同 TimerTask
// add new field
final String capturedFieldName = "captured$field$added$by$ttl";
final CtField capturedField = CtField.make("private final Object " + capturedFieldName + ";", clazz);
clazz.addField(capturedField, "com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doCaptureWhenNotTtlEnhanced(this);");
logger.info("add new field " + capturedFieldName + " to class " + className);
final CtMethod doExecMethod = clazz.getDeclaredMethod("doExec", new CtClass[0]);
final String doExec_renamed_method_name = renamedMethodNameByTtl(doExecMethod);
final String beforeCode = "if (this instanceof " + TtlEnhanced.class.getName() + ") {\n" + // if the class is already TTL enhanced(eg: com.alibaba.ttl.TtlRecursiveTask)
" return " + doExec_renamed_method_name + "($$);\n" + // return directly/do nothing
"}\n" +
"Object backup = com.alibaba.ttl.TransmittableThreadLocal.Transmitter.replay(" + capturedFieldName + ");";
final String finallyCode = "com.alibaba.ttl.TransmittableThreadLocal.Transmitter.restore(backup);";
doTryFinallyForMethod(doExecMethod, doExec_renamed_method_name, beforeCode, finallyCode);
}
private void updateConstructorDisableInheritable(@NonNull final CtClass clazz) throws NotFoundException, CannotCompileException {
// 如果是 ForkJoin的 Pool或者 ThreadFactory 则同线程池的第三方实现逻辑
// 我们需要屏蔽掉ttl的逻辑。
// 为什么要这么做,因为ForkJoin本身是一个很特殊的线程池
// 1. 初始化线程后不一定为谁工作,其实理论上所有初始化线程做TTL操作都没什么意义,存在复用
// 2. 通过线程池执行任务,对于ForkJoin来说并没有真正的执行,是先包装为一个ForkJoinTask然后再进行ForkJoinPool#externalPush(ForkJoinTask<?> task)
// 3. 存在窃取的可能但是也是以ForkJoinTask 维度
for (CtConstructor constructor : clazz.getDeclaredConstructors()) {
final CtClass[] parameterTypes = constructor.getParameterTypes();
final StringBuilder insertCode = new StringBuilder();
for (int i = 0; i < parameterTypes.length; i++) {
final String paramTypeName = parameterTypes[i].getName();
if (FORK_JOIN_WORKER_THREAD_FACTORY_CLASS_NAME.equals(paramTypeName)) {
String code = String.format("$%d = com.alibaba.ttl.threadpool.TtlForkJoinPoolHelper.getDisableInheritableForkJoinWorkerThreadFactory($%<d);", i + 1);
insertCode.append(code);
}
}
if (insertCode.length() > 0) {
logger.info("insert code before method " + signatureOfMethod(constructor) + " of class " +
constructor.getDeclaringClass().getName() + ": " + insertCode);
constructor.insertBefore(insertCode.toString());
}
}
}
}
如上可以看到对于ForkJoin来说我们只对 ForkJoinTask进行增强
TtlRecursiveTask<V> 和TtlRecursiveAction区别是可返回结果的区别,返回结果可以用于CompletableFuture
通过查看源码我们还发现了TtlWrappers可以的对java常见的函数式接口进行修饰。相当于把装饰器维度使用最小的,类似于对Runnable和Callable这种任务执行单元一样
我们就看一个TtlWrappers使用案例
@Async
public void test(Integer integer) {
// 首先这里@Async我们已经将异步线程池使用了TTL的装饰器线程池
int num = ThreadLocalRandom.current().nextInt(5, 500);
try {
Thread.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 这里数字忽略只是为了使用forkJoin
Arrays.asList(1, 2, 3, 4, 5, 6, 1, 2132, 234, 234, 23, 2, 2, 2, 2)
.parallelStream().forEach(TtlWrappers.wrapConsumer(o -> {
// 如果不适用TtlWrappers 这里会不相等,代表这里线程没有正确传递
//如果使用TtlWrappers 则也是我们手动做装饰器了,只不过是针对代码块执行单元的装饰器
if (!Objects.equals(TestController.LOCAL.get(), integer)) {
System.out.println("bbbbbbbbbbb");
}
}));
}

我们再看看作者提到的TtlRecursiveTask,简单点我们只看TtlRecursiveAction 没有返回值的
public abstract class TtlRecursiveAction extends ForkJoinTask<Void> implements TtlEnhanced {
private static final long serialVersionUID = -5753568484583412377L;
// 子类初始化时捕获
private final Object captured = capture();
protected TtlRecursiveAction() {
}
// 延迟到子类进行 任务单元执行
protected abstract void compute();
/**
* see {@link ForkJoinTask#getRawResult()}
*/
public final Void getRawResult() {
return null;
}
/**
* see {@link ForkJoinTask#setRawResult(Object)}
*/
protected final void setRawResult(Void mustBeNull) {
}
/**
* Implements execution conventions for RecursiveActions.
*/
// 熟悉的配方,但是当前方法是重写了 ForkJoinTask的执行方法
protected final boolean exec() {
final Object backup = replay(captured);
try {
compute();
return true;
} finally {
restore(backup);
}
}
}
如何使用
@Async
public void test(Integer integer) {
// 首先这里@Async我们已经将异步线程池使用了TTL的装饰器线程池
int num = ThreadLocalRandom.current().nextInt(5, 500);
try {
Thread.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 10 ; i++) {
// 只能这样使用,有点麻烦啊
TestFork testFork = new TestFork(integer);
// 获取默认的 ForkJoin线程池
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
forkJoinPool.execute(testFork);
}
}
private static class TestFork extends TtlRecursiveAction {
// 这里只是用来做对比的变量
private final Integer test;
public TestFork(Integer test) {
this.test = test;
}
@Override
protected void compute() {
// 这里是你的任务单元的业务代码
if (!Objects.equals(TestController.LOCAL.get(), test)) {
// 不会进到这里,是可用的
System.out.println("bbbbbbbbbbb");
}
}
}
尝试通过RVM将RubyGems升级到版本1.8.10并出现此错误:$rvmrubygemslatestRemovingoldRubygemsfiles...Installingrubygems-1.8.10forruby-1.9.2-p180...ERROR:Errorrunning'GEM_PATH="/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/ruby-1.9.2-p180@global:/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/rub
我正在使用puppet为ruby程序提供一组常量。我需要提供一组主机名,我的程序将对其进行迭代。在我之前使用的bash脚本中,我只是将它作为一个puppet变量hosts=>"host1,host2"我将其提供给bash脚本作为HOSTS=显然这对ruby不太适用——我需要它的格式hosts=["host1","host2"]自从phosts和putsmy_array.inspect提供输出["host1","host2"]我希望使用其中之一。不幸的是,我终其一生都无法弄清楚如何让它发挥作用。我尝试了以下各项:我发现某处他们指出我需要在函数调用前放置“function_”……这
我正在编写一个gem,我必须在其中fork两个启动两个webrick服务器的进程。我想通过基类的类方法启动这个服务器,因为应该只有这两个服务器在运行,而不是多个。在运行时,我想调用这两个服务器上的一些方法来更改变量。我的问题是,我无法通过基类的类方法访问fork的实例变量。此外,我不能在我的基类中使用线程,因为在幕后我正在使用另一个不是线程安全的库。所以我必须将每个服务器派生到它自己的进程。我用类变量试过了,比如@@server。但是当我试图通过基类访问这个变量时,它是nil。我读到在Ruby中不可能在分支之间共享类变量,对吗?那么,还有其他解决办法吗?我考虑过使用单例,但我不确定这是
我的最终目标是安装当前版本的RubyonRails。我在OSXMountainLion上运行。到目前为止,这是我的过程:已安装的RVM$\curl-Lhttps://get.rvm.io|bash-sstable检查已知(我假设已批准)安装$rvmlistknown我看到当前的稳定版本可用[ruby-]2.0.0[-p247]输入命令安装$rvminstall2.0.0-p247注意:我也试过这些安装命令$rvminstallruby-2.0.0-p247$rvminstallruby=2.0.0-p247我很快就无处可去了。结果:$rvminstall2.0.0-p247Search
我在理解Enumerator.new方法的工作原理时遇到了一些困难。假设文档中的示例:fib=Enumerator.newdo|y|a=b=1loopdoy[1,1,2,3,5,8,13,21,34,55]循环中断条件在哪里,它如何知道循环应该迭代多少次(因为它没有任何明确的中断条件并且看起来像无限循环)? 最佳答案 Enumerator使用Fibers在内部。您的示例等效于:require'fiber'fiber=Fiber.newdoa=b=1loopdoFiber.yieldaa,b=b,a+bendend10.times.m
我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("
我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
几个月前,我读了一篇关于rubygem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:
从MB升级到新的MBP后,Apple的迁移助手没有移动我的gem。我这次是通过macports安装rubygems,希望在下次升级时避免这种情况。有什么我应该注意的陷阱吗? 最佳答案 如果你想把你的gems安装在你的主目录中(在传输过程中应该复制过来,作为一个附带的好处,会让你以你自己的身份运行geminstall,而不是root),将gemhome:键设置为您在~/.gemrc中的主目录中的路径. 关于通过MacPorts的RubyGems是个好主意吗?,我们在StackOverf
我没有找到太多关于如何执行此操作的信息,尽管有很多关于如何使用像这样的redirect_to将参数传递给重定向的建议:action=>'something',:controller=>'something'在我的应用程序中,我在路由文件中有以下内容match'profile'=>'User#show'我的表演Action是这样的defshow@user=User.find(params[:user])@title=@user.first_nameend重定向发生在同一个用户Controller中,就像这样defregister@title="Registration"@user=Use