import java.util.concurrent.atomic.AtomicReference;
public class LockFreeQueue<T> {
private static class Node<E> {
final E value;
volatile Node<E> next;
Node(E value) {
this.value = value;
}
}
private AtomicReference<Node<T>> head, tail;
public LockFreeQueue() {
// have both head and tail point to a dummy node
Node<T> dummyNode = new Node<T>(null);
head = new AtomicReference<Node<T>>(dummyNode);
tail = new AtomicReference<Node<T>>(dummyNode);
}
/**
* Puts an object at the end of the queue.
*/
public void putObject(T value) {
Node<T> newNode = new Node<T>(value);
Node<T> prevTailNode = tail.getAndSet(newNode);
prevTailNode.next = newNode;
}
/**
* Gets an object from the beginning of the queue. The object is removed
* from the queue. If there are no objects in the queue, returns null.
*/
public T getObject() {
Node<T> headNode, valueNode;
// move head node to the next node using atomic semantics
// as long as next node is not null
do {
headNode = head.get();
valueNode = headNode.next;
// try until the whole loop executes pseudo-atomically
// (i.e. unaffected by modifications done by other threads)
} while (valueNode != null && !head.compareAndSet(headNode, valueNode));
T value = (valueNode != null ? valueNode.value : null);
// release the value pointed to by head, keeping the head node dummy
if (valueNode != null)
valueNode.value = null;
return value;
}
}
#1 楼
是的。易失性和比较和交换操作的组合足以确保安全地发布Node对象。
比较和交换必须是在这两种方法中都将其分配给
volatile
变量之前,所以您可以在那很好。它们不需要原子发生。队列表现出奇怪的行为。假设线程1在CAS之后但在最后一次分配之前在
putObject()
中停止。下一个线程2整体执行putObject()
。下一个线程3调用getObject()
,即使线程2完全完成,它也看不到前两个对象中的任何一个。在内存中建立了一个小链。只有在线程1完成putObject()
之后,两个对象才在队列中可见,这有些令人惊讶,并且比大多数非阻塞数据结构具有更弱的语义。拥有一个非阻塞的put()
很高兴,但是拥有一个非阻塞的get()
很奇怪。这意味着该队列必须与重复的轮询和睡眠一起使用。评论
\ $ \ begingroup \ $
感谢您的评论。我没有将value字段设置为final,以便能够将其设置为null以避免潜在的内存泄漏。 (检查getObject()中的最后一个if语句)。
\ $ \ endgroup \ $
– Hosam Aly
2011年1月27日13:45
\ $ \ begingroup \ $
对于奇怪的行为,我同意你的看法,但这是设计决策,目的是简化代码并提高性能。我认为将其用于简单的生产者-消费者方案并不重要。你怎么看?
\ $ \ endgroup \ $
– Hosam Aly
2011年1月27日13:47
\ $ \ begingroup \ $
@Hosam我确实认为这在消费者方面是有问题的。
\ $ \ endgroup \ $
– Craig
2011年1月27日在18:30
\ $ \ begingroup \ $
@Hosam:我的经验法则是:最终的或易变的。如果您不做最后决定,至少要使其可变。 (这正是克雷格所说的,我想对此加以强调。)
\ $ \ endgroup \ $
–克里斯·杰斯特·杨(Chris Jester-Young)
2011年1月27日20:26
\ $ \ begingroup \ $
@Craig:谢谢。对不起,我的评论很晚,但是您可能知道过去一段时间埃及的天气很热。的确,此队列将需要重复轮询和休眠。 (IMHO)适用于需要简单消息队列般行为的情况。不过,我喜欢您关于非阻塞get()的想法。我将考虑如何实施。再次感谢。
\ $ \ endgroup \ $
– Hosam Aly
2011-2-13在19:06
#2 楼
我不喜欢你的“放假”习惯。其他人注意到的“奇怪行为”意味着该算法失去了“无锁”的主要优势之一:不受优先级倒置或异步终止线程的影响。如果线程在队列写操作的中间被放置,除非或直到队列完成其工作,否则队列将被完全破坏。在更新“ tail”指针之前;如果“最后一个节点”的“下一个”指针非空,则意味着它不再是最后一个节点,必须遍历链接以找到真正的最后一个节点,在其上重复CompareAndSet,并希望它仍然是最后一个节点。评论
\ $ \ begingroup \ $
感谢@supercat。您关于异步终止线程的观点当然很有价值,但是我不确定您的建议如何解决它。您能用一些代码进一步解释吗?
\ $ \ endgroup \ $
– Hosam Aly
2011-2-13在19:13
\ $ \ begingroup \ $
@Hosam:队列的最后一项是“下一个”指针为null的项。要将一个项目添加到队列中,请使真正的最后一个项目(即“下一个”指针为空的项目)指向新项目。尽管队列保留了“最后一项”指针,但它并不总是指向最后一项。它可能指向以前是最后一个项目的项目,但现在不再指向。只要准备添加试图添加项目的代码来搜索真正的最后一个项目,正确性所需要的就是“最后一个项目”指针指向队列中的某个位置。
\ $ \ endgroup \ $
–超级猫
2011-2-14在0:48
\ $ \ begingroup \ $
@Hosam:将项目添加到队列有两件事:-1-使真正的最后一个项目的“下一个”指针指向新项目; -2-尝试更新队列的“最后一项”指针。如果同时尝试执行步骤#1,则其中之一的CompareExchange将成功,而另一方将失败。 CompareExchange失败的那一个将搜索队列的新结尾,并在此之后添加自己。注意,在某种意义上,尝试更新队列结束指针是可选的。如果从未更新过,队列将变得越来越慢,但是偶尔的失败也不会成为问题。
\ $ \ endgroup \ $
–超级猫
2011-2-14在0:55
#3 楼
尽管OP的解决方案很适合我,但Ron的改进却发明了一种竞争条件:线程1(在getObject()中):
1在这里被挂起,因此它尚未执行
= null
线程2(在getObject()中): 1个继续。
#4 楼
我稍微调整了一下代码,但是我认为您的方法很好。如果有很多消费者的话,会花一会儿。 NPE。我对getObject进行了一些重构,以删除冗余的空检查。
import java.util.concurrent.atomic.AtomicReference;
public class LockFreeQueue<T> {
private static class Node<E> {
E value;
volatile Node<E> next;
Node(E value) {
this.value = value;
}
}
private final AtomicReference<Node<T>> refHead, refTail;
public LockFreeQueue() {
// have both head and tail point to a dummy node
Node<T> dummy = new Node<T>(null);
refHead = new AtomicReference<Node<T>>(dummy);
refTail = new AtomicReference<Node<T>>(dummy);
}
/**
* Puts an object at the end of the queue.
*/
public void putObject(T value) {
if (value == null) throw new NullPointerException();
Node<T> node = new Node<T>(value);
Node<T> prevTail = refTail.getAndSet(node);
prevTail.next = node;
}
/**
* Gets an object from the beginning of the queue. The object is removed
* from the queue. If there are no objects in the queue, returns null.
*/
public T getObject() {
Node<T> head, next;
// move head node to the next node using atomic semantics
// as long as next node is not null
do {
head = refHead.get();
next = head.next;
if (next == null) {
// empty list
return null;
}
// try until the whole loop executes pseudo-atomically
// (i.e. unaffected by modifications done by other threads)
} while (!refHead.compareAndSet(head, next));
T value = next.value;
// release the value pointed to by head, keeping the head node dummy
next.value = null;
return value;
}
}
评论
\ $ \ begingroup \ $
与原始代码相同的问题...值需要易变。
\ $ \ endgroup \ $
– Craig
2011年1月27日在20:13
\ $ \ begingroup \ $
我声称它不需要不稳定。写入和读取value字段的线程将通过AtomicReferences遇到内存隔离。
\ $ \ endgroup \ $
–罗恩
11年1月27日在21:07
\ $ \ begingroup \ $
哦,是的!我需要再次阅读JCIP!
\ $ \ endgroup \ $
– Craig
11年1月27日在21:42
\ $ \ begingroup \ $
谢谢。抛出NPE是一个好主意,删除冗余的空检查是一个受欢迎的选择。但是正如@Tino所指出的,断言可能在竞争条件下失败(codereview.stackexchange.com/questions/224/…)。
\ $ \ endgroup \ $
– Hosam Aly
2011-2-13在19:35
评论
getObject()的head.get()。next与putObject()的prevTailNode.next = newNode之间存在一场竞赛。我希望codereview网站有行号。在tail.getAndSet(newNode)发生之后但在prevTailNode.next分配通过之前,getObject可以调用head.get(),然后读取headNode.next,此时该值将为null,导致getObject()错误地返回null 。@Ron我在回答中谈到这种行为是多么奇怪,但至少是线程安全的。
@克雷格好吧,我明白了。 getObject返回null可以。我猜这是一个性能缺陷,即对getObject中head.compareAndSet的调用是否可以保证向前取得进展。
@Ron是的。通常,编写非阻塞结构要比阻塞结构提供更好的性能,但是不能保证在这种情况下。特别是因为执行CAS操作的线程无法帮助其他线程向前发展。