<字符串、ConcurrentLinkedQueue> 对的 ConcurrentHashMap 不一致地注册添加



背景

我有一个程序,它通过使用Java代理和ASM,添加到包含Object阵列的ConcurrentLinkedQueues的静态ConcurrentHashMap中(在下面的代码段中称为ThreadMarkers(。映射的键是线程ID,其值是ThreadMarkers的队列。每个队列都与给定.class文件中遇到的每一条新的LINENUMBER字节码指令一起添加,其中遇到该行的线程使用其id属性来标识要添加到映射中的哪个队列。

代码(注意,QueueMapMediator的映射的队列不必是ConcurrentLinkedQueues,每个队列只能由一个线程访问(。

public class QueueMapMediator {
private static final ConcurrentHashMap<String, ConcurrentLinkedQueue<ThreadMarker>> queueMap;
private static final int CAPACITY;
private static final ConcurrentStack<ConcurrentLinkedQueue<ThreadMarker>> queueStack;
static {
CAPACITY = 16; // arbitrary limit - may be assigned any power of 2
queueMap = new ConcurrentHashMap<>(CAPACITY << 4, 0.75f, CAPACITY << 4);
queueStack = new ConcurrentStack<>();
for (int i = 0; i < CAPACITY; i++) {
queueStack.push(new ConcurrentLinkedQueue<>());
}
}
/**
* Instantiation disallowed
*/
private QueueMapMediator() {
}
private static ConcurrentLinkedQueue<ThreadMarker> newQueue(String id) {
ConcurrentLinkedQueue<ThreadMarker> q = queueStack.pop();
queueMap.put(id, q);
return q;
}
/**
* Used by java agent to get map entry pertaining to
* currently executing thread.
*
* @param id    Thread id stored as a key in queueMap
* @return      value associated with key of value id
*/
public static ConcurrentLinkedQueue<ThreadMarker> getByThreadId(String id) {
ConcurrentLinkedQueue<ThreadMarker> q = queueMap.get(id);
return q != null ? q : newQueue(id);
}
public static String[][] output() {
return queueMap.entrySet().stream()
.map(m -> m.getValue().stream().map(e -> new String[]{
String.valueOf(e.getElements()[0]),
String.valueOf(e.getElements()[1]),
String.valueOf(e.getElements()[2]),
m.getKey()})
.toArray(String[][]::new))
.flatMap(Arrays::stream)
.sorted(QueueMapMediator::compareThreadNanos)
.toArray(String[][]::new);
}
/**
* @param curr      A String array containing ThreadMarker information
* @param other     A String array containing ThreadMarker information
* @return          int result of comparison
*/
private static int compareThreadNanos(String[] curr, String[] other) {
long time1 = Long.parseLong(curr[0]);
long time2 = Long.parseLong(other[0]);
return Long.compare(time1, time2);
}
public static void printOutput() {
for (String[] arr : output()) {
System.out.println(Arrays.toString(arr));
}
}
}
public class Agent {
static final long START_TIME = System.nanoTime();
public static void premain(String args, Instrumentation instrumentation) {
instrumentation.addTransformer((new ClassFileTransformer() {
@Override
public byte[] transform(ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws IllegalClassFormatException {
if (loader == null) { // bootstrap loader caught by this check - will not load user's code
return null;
}
if (className.startsWith("application/")) {
return classfileBuffer;
}
ClassNode cn = new ClassNode(ASM9);
ClassReader cr1 = new ClassReader(classfileBuffer);
cr1.accept(cn, 0);
for (MethodNode mn : cn.methods) {
InsnList insns = mn.instructions;
if (insns.size() == 0) {
continue;
}
int lineNum = -1;
int l1 = -1;
int l2 = -1;
AbstractInsnNode node;
int numAdded;
for (int i = 0; i < insns.size(); i++) {
node = insns.get(i);
if (node instanceof LineNumberNode) {
lineNum = ((LineNumberNode) node).line;
} else if (node instanceof LabelNode) {
if (l1 == -1) {
l1 = i;
} else {
l2 = i;
}
} else if (node instanceof FrameNode) {
l1 = i;
}
if (lineNum > -1 && l1 < l2) {
InsnList addedInsns = new InsnList();
addedInsns.add(new MethodInsnNode(INVOKESTATIC, "java/lang/Thread",
"currentThread", "()Ljava/lang/Thread;", false));
addedInsns.add(new MethodInsnNode(INVOKEVIRTUAL, "java/lang/Thread",
"getId", "()J", false));
addedInsns.add(new MethodInsnNode(INVOKESTATIC, "java/lang/String",
"valueOf", "(J)Ljava/lang/String;", false));
addedInsns.add(new MethodInsnNode(INVOKESTATIC, "application/QueueMapMediator",
"getByThreadId",
"(Ljava/lang/String;)Ljava/util/concurrent/ConcurrentLinkedQueue;",
false));
addedInsns.add(new TypeInsnNode(NEW, "application/ThreadMarker"));
addedInsns.add(new InsnNode(DUP));
addedInsns.add(new MethodInsnNode(INVOKESTATIC, "java/lang/System",
"nanoTime", "()J", false));
addedInsns.add(new LdcInsnNode(START_TIME));
addedInsns.add(new InsnNode(LSUB));
addedInsns.add(new IntInsnNode(BIPUSH, lineNum));
addedInsns.add(new LdcInsnNode(className));
addedInsns.add(new MethodInsnNode(INVOKESPECIAL, "application/ThreadMarker",
"<init>", "(JILjava/lang/String;)V"));
addedInsns.add(new MethodInsnNode(INVOKEVIRTUAL,
"java/util/concurrent/ConcurrentLinkedQueue",
"add", "(Ljava/lang/Object;)Z", false));
addedInsns.add(new InsnNode(POP));
numAdded = addedInsns.size();
insns.insert(insns.get(l1), addedInsns);
lineNum = -1;
i += numAdded - 1; // -1 to counteract i incrementing with next iteration
l1 = -1;
l2 = -1;
}
}
}
ClassWriter cw1 = new ClassWriter(ClassWriter.COMPUTE_MAXS);
cn.accept(cw1);
return cw1.toByteArray();
}
}));
}
}
/**
* @param <E>   type contained in Stack
* @author      Brian Goetz and Tim Peierls
*/
public class ConcurrentStack <E> {
AtomicReference<Node<E>> top = new AtomicReference<>();
public void push(E item) {
Node<E> newHead = new Node<E>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null)
return null;
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}
private static class Node <E> {
public final E item;
public Node<E> next;
public Node(E item) {
this.item = item;
}
}
}

问题

我通常会得到预期的输出(即,考虑到每个线程分别遇到的源代码行的数量,每个线程应该有多少个条目(,但偶尔会丢失一些条目,通常是在一段时间后第一次运行。

示例

如果我有以下简单的多线程程序:

1  package test.usercode;
2 
3  import application.QueueMapMediator;
4
5  public class BasicMultithreadedPrinting {
6
7      public static void main(String[] args) throws InterruptedException {
8          for (int i = 0; i < 3; i++) {
9              Thread th = new Thread(() -> {
10                 System.out.println("Hello");
11                 System.out.println("Hello again");
12                 System.out.println("Hello once more");
13             });
14             th.start();
15         }
16         QueueMapMediator.printOutput(); // included to view result of instrumentation
17     }
18 }

如果我使用上面的代理进行检测,我可能会得到以下意外输出:

Hello
Hello
Hello
Hello again
Hello again
Hello once more
Hello again
Hello once more
Hello once more
[44959100, 8, test/usercode/BasicMultithreadedPrinting, 1]
[45050000, 9, test/usercode/BasicMultithreadedPrinting, 1]
[45701600, 14, test/usercode/BasicMultithreadedPrinting, 1]
[45762500, 8, test/usercode/BasicMultithreadedPrinting, 1]
[45767300, 9, test/usercode/BasicMultithreadedPrinting, 1]
[45776400, 14, test/usercode/BasicMultithreadedPrinting, 1]
[45813400, 8, test/usercode/BasicMultithreadedPrinting, 1]
[45817400, 9, test/usercode/BasicMultithreadedPrinting, 1]
[45825400, 14, test/usercode/BasicMultithreadedPrinting, 1]
[45862800, 8, test/usercode/BasicMultithreadedPrinting, 1]
[45866700, 9, test/usercode/BasicMultithreadedPrinting, 1]
[45868500, 16, test/usercode/BasicMultithreadedPrinting, 1]
[45892200, 10, test/usercode/BasicMultithreadedPrinting, 23]
[45938500, 11, test/usercode/BasicMultithreadedPrinting, 23]
[46329900, 11, test/usercode/BasicMultithreadedPrinting, 25]
[46500300, 12, test/usercode/BasicMultithreadedPrinting, 23]
[46656400, 12, test/usercode/BasicMultithreadedPrinting, 25]

其中一个派生线程根本没有注册,另一个使用id25,只注册了它遇到的3行中的2行。此后,我尝试添加战略性放置的print线路,以确定故障发生的位置,但我还无法重现该问题。

提前感谢所有能回答这个相当长的问题的人。我已经包含了相当多的代码,希望它能产生更有意义的建议。我已经包含了javaagents标签,以防代理与所描述的问题有关,但我没有看到。

如果getByThreadId由不同的线程调用,则使用重叠的get-put将失败并丢失其中一个值。最好替换为原子操作,以确保每个条目都有一个队列:

queueMap.computeIfAbsent(id, k -> new ConcurrentLinkedQueue<>() /* or queueStack.pop() */);