linkedblockingqueue是线程安全的么
1个回答
展开全部
数据结构
链表节点
既然是链表,那么肯定少不了节点,节点自然包括节点内容和next指针。jdk开发人员,设计的节点是这样的:
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
在这里,用到了java范型的机制,用来保存不同类型的对象。
上述节点,提供了一个构造函数,用来传入需要保存的内容,这里的构造函数没有判断传入参数是否合法,因为在所有public方法中,已经判断过了,这里无需进行再次判断。
链表的指针
LInkedBlockingQueue中的链表,包含头指针和尾指针,其中:
头指针用来管理元素出队,和 take(), poll(), peek() 三个操作关联
尾指针用来管理元素入队,和 put(), offer() 两个操作关联
具体的数据结构定义如下:
private transient Node<E> head; /* 头结点, 头节点不保存数据信息 */
private transient Node<E> last; /* 尾节点, 尾节点保存最新入队的数据信息 */
链表的容量和大小
LinkedBlockingQueue是有大小限制的,当队列满后不能继续入队,同时,也有一个变量记录当前队列中的元素数量:
private final int capacity; /* 队列容量一般使用中,构造LinkedBlockingQueue时,需要传入当前队列大小,如果不传入,默认是Integer.MAX_VALUE */
private final AtomicInteger count = new AtomicInteger(0); // 队列当前大小
注意:这里的count对象,是原子类型,而不是一般的int类型,与ArrayBlockingQueue中的不符,这是因为LinkedBlockingQueue使用读和写两把锁来控制并发操作,读和写可能同时修改count字段的值,而ArrayBlockingQueue只有一把锁用于控制读写操作,所以count对象是普通的,线程不安全的类型
控制并发的lock和condition
LinkedBlockingQueue中,读和写分别由两把锁控制,两把锁分别管理head节点和last节点的操作,如下所示:
private final ReentrantLock takeLock = new ReentrantLock(); /* 读锁 */
private final Condition notEmpty = takeLock.newCondition(); /* 读锁对应的条件 */
private final ReentrantLock putLock = new ReentrantLock(); /* 写锁 */
private final Condition notFull = putLock.newCondition(); /* 写锁对应的条件 */
jdk文档中,解释说,这两把锁的控制,是“two lock queue”算法的一种实现,但具体操作与其有些差异(A variant of the "two lock queue" algorithm.)
关于two lock queue可以参考:http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
关键代码解读
入队和出队的核心操作
入队和出队的核心操作,就是对于链表头结点和尾节点的操作,与我们大学学习的数据结构基本一致。因为这些操作,都是private方法,外部已经进行了正确的同步,所以这些方法中,不带任何加锁和解锁的操作。
入队的代码如下所示:
private void enqueue(E x) {
last = last.next = new Node<E>(x);
}
上述代码其实是将三行写成了一行,为了方便学习,这里把其拆开:
private void enqueue(E x) {
Node<E> newNode = new Node<E>(x); /* 新建一个Node对象,此对象的数据部分是新元素的指针,next指针为null */
last.next = newNode ; /* 将目前list节点的next指针指向新对象 */
last =last.next; /* 将last指针向后移动一个元素,指向新的尾端 */
}
出队的代码如下所示:
private E dequeue() {
Node<E> h = head; /* 记录目前头节点的指针 */
Node<E> first = h.next; /* 得到头结点后的第一个节点,即需要出队的数据节点 */
/* 将需要出队的数据的尾节点设置为自己,让此对象变为孤立对象,GC可以进行回收,更重要的是,如果 迭代器引用此节点,迭代器可以通过判断next是否等于自己,来了解迭代器的下一个节点是否应该重定向为头节点 */
h.next = h;
head = first; /* 将头指针指向新的头节点 */
E x = first.item; /* 获取数据元素的内容 */
/* 将头节点所在数据元素设置为null,因为头节点的数据已出队,如果此时再持有其引用,可能造成内存泄漏 */
first.item = null;
return x;
}
入队的public方法
入队的方法有两种,一种是阻塞的方法,另一种是非阻塞的方法,其中:
put()算法,为阻塞算法,直到队列有空余时,才能为队列加入新元素
offer()算法为非阻塞算法,如果队列已满,立即返回或等待一会再返回,通过返回值ture或false,标记本次入队操作是否成功
put的操作算法如下所示:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); /* 如果完成当前入队操作后,队列依然有剩余的空间,那么再唤醒另一个等待入队的线程 */
} finally {
putLock.unlock();
}
if (c == 0) /* 如果入队前,队列大小为空,那么唤醒一个等待出队的线程 */
signalNotEmpty();
}
offer的算法与put类似,这里不再赘述。
出队的public方法
出队的方法与入队类似,也分为阻塞和非阻塞两种,其中:
take()算法为阻塞算法,直到队列有非空时,才将允许调用线程取出数据
poll()算法为非阻塞算法,如果队列为空,立即返回或等待一会再返回,通过返回值ture或false,标记本次出队操作是否成功
peek()算法比较特殊,只返回队列中的第一个元素,既不出队,也不阻塞,如果没有元素,就返回null
task操作的算法如下:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal(); /* 如果完成当前入队操作后,队列依然有剩余的元素,那么再唤醒另一个等待出队的线程 */
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull(); /* 如果出队前,队列是满的,那么出队后,队列就空了,需要通知一个等待入队的线程 */
return x;
}
其余算法,与上述算法类似,这里不再赘述。
多线程安全的迭代器
LinkedBlockingQueue的迭代器,是多线程安全的,在获取元素之前,会对上述读锁和写锁同时加锁,同时,为了防止死锁,读锁和写锁的加解锁顺序,也是经过设计的,代码如下:
void fullyLock() {
putLock.lock(); // 先加写锁
takeLock.lock(); // 再加读锁
}
void fullyUnlock() {
takeLock.unlock(); /* 先解锁读锁 */
putLock.unlock(); /* 再解锁写锁 */
}
LinkedBlockingQueue的迭代器中,保存了以下内容:
private Node<E> current; /* 迭代器的下一个位置 */
private Node<E> lastRet; /* 当前迭代器的位置 */
private E currentElement; /* 当前需要返回的元素内容 */
刚看到代码时,觉得好像只要一个指向当前位置的指针就行了,干嘛这么麻烦呢,但JDK的开发人员考虑的比我们周全多了,这三个参数,足以应付任何多线程的问题:
首先,保存了当前需要返回的内容,可以保证在当前节点移除的情况下,迭代器的next()方法,也能返回当前指向的内容,即使先调用hasNext()方法,其他线程删除了当前对象,那么next()方法也可以保证返回正确对象
其次,如果在迭代器中,调用remove()方法,删除了当前对象,那么 lastRet方法就用上了,可以通过再次遍历列表,找到需要删除的对象,并将其删除,同时为了防止remove()方法被调用两次,在删除时,会将 lastRet设置为null,如果只有这一个指针,那么remove()之后,这个迭代器就啥也干不了了
最后,current保存了迭代器的下一个指向的位置,调用hasNext()时,可以立即直到是否还有空余对象,更重要的是,如果在迭代器创建后,其他线程多次调用了出队的方法,可能导致 lastRet和current都变成悬挂的指针了,这时,只要判断current的next是否为自己,就可以知道自己是否已经被出队,是否需要重定向current的位置
关于迭代器的代码精髓,就是上面的描述了,具体代码,不再赘述。
对锁的精巧使用和思考
LinkedBlockingQueue将读和写操作分离,可以让读写操作在不干扰对方的情况下,完成各自的功能,提高并发吞吐量。
在写这篇文章时,我曾经考虑过,如果使用java内置的同步机制,即 synchronized 关键字进行此类读写锁控制,但实际上实现不了,因为java对象在wait和notify时,需要对lock变量加锁,这样就失去了双锁的优势,同时会导致死锁。
防止内存泄漏
设计链表,最大的一点,就是不能出现内存泄漏。
LinkedBlockingQueue在这点上已经做的很优秀,每次移除节点,都将节点的内容字段设置为null,迭代器也是如此,确保不会发生内存泄漏。
链表节点
既然是链表,那么肯定少不了节点,节点自然包括节点内容和next指针。jdk开发人员,设计的节点是这样的:
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
在这里,用到了java范型的机制,用来保存不同类型的对象。
上述节点,提供了一个构造函数,用来传入需要保存的内容,这里的构造函数没有判断传入参数是否合法,因为在所有public方法中,已经判断过了,这里无需进行再次判断。
链表的指针
LInkedBlockingQueue中的链表,包含头指针和尾指针,其中:
头指针用来管理元素出队,和 take(), poll(), peek() 三个操作关联
尾指针用来管理元素入队,和 put(), offer() 两个操作关联
具体的数据结构定义如下:
private transient Node<E> head; /* 头结点, 头节点不保存数据信息 */
private transient Node<E> last; /* 尾节点, 尾节点保存最新入队的数据信息 */
链表的容量和大小
LinkedBlockingQueue是有大小限制的,当队列满后不能继续入队,同时,也有一个变量记录当前队列中的元素数量:
private final int capacity; /* 队列容量一般使用中,构造LinkedBlockingQueue时,需要传入当前队列大小,如果不传入,默认是Integer.MAX_VALUE */
private final AtomicInteger count = new AtomicInteger(0); // 队列当前大小
注意:这里的count对象,是原子类型,而不是一般的int类型,与ArrayBlockingQueue中的不符,这是因为LinkedBlockingQueue使用读和写两把锁来控制并发操作,读和写可能同时修改count字段的值,而ArrayBlockingQueue只有一把锁用于控制读写操作,所以count对象是普通的,线程不安全的类型
控制并发的lock和condition
LinkedBlockingQueue中,读和写分别由两把锁控制,两把锁分别管理head节点和last节点的操作,如下所示:
private final ReentrantLock takeLock = new ReentrantLock(); /* 读锁 */
private final Condition notEmpty = takeLock.newCondition(); /* 读锁对应的条件 */
private final ReentrantLock putLock = new ReentrantLock(); /* 写锁 */
private final Condition notFull = putLock.newCondition(); /* 写锁对应的条件 */
jdk文档中,解释说,这两把锁的控制,是“two lock queue”算法的一种实现,但具体操作与其有些差异(A variant of the "two lock queue" algorithm.)
关于two lock queue可以参考:http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
关键代码解读
入队和出队的核心操作
入队和出队的核心操作,就是对于链表头结点和尾节点的操作,与我们大学学习的数据结构基本一致。因为这些操作,都是private方法,外部已经进行了正确的同步,所以这些方法中,不带任何加锁和解锁的操作。
入队的代码如下所示:
private void enqueue(E x) {
last = last.next = new Node<E>(x);
}
上述代码其实是将三行写成了一行,为了方便学习,这里把其拆开:
private void enqueue(E x) {
Node<E> newNode = new Node<E>(x); /* 新建一个Node对象,此对象的数据部分是新元素的指针,next指针为null */
last.next = newNode ; /* 将目前list节点的next指针指向新对象 */
last =last.next; /* 将last指针向后移动一个元素,指向新的尾端 */
}
出队的代码如下所示:
private E dequeue() {
Node<E> h = head; /* 记录目前头节点的指针 */
Node<E> first = h.next; /* 得到头结点后的第一个节点,即需要出队的数据节点 */
/* 将需要出队的数据的尾节点设置为自己,让此对象变为孤立对象,GC可以进行回收,更重要的是,如果 迭代器引用此节点,迭代器可以通过判断next是否等于自己,来了解迭代器的下一个节点是否应该重定向为头节点 */
h.next = h;
head = first; /* 将头指针指向新的头节点 */
E x = first.item; /* 获取数据元素的内容 */
/* 将头节点所在数据元素设置为null,因为头节点的数据已出队,如果此时再持有其引用,可能造成内存泄漏 */
first.item = null;
return x;
}
入队的public方法
入队的方法有两种,一种是阻塞的方法,另一种是非阻塞的方法,其中:
put()算法,为阻塞算法,直到队列有空余时,才能为队列加入新元素
offer()算法为非阻塞算法,如果队列已满,立即返回或等待一会再返回,通过返回值ture或false,标记本次入队操作是否成功
put的操作算法如下所示:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); /* 如果完成当前入队操作后,队列依然有剩余的空间,那么再唤醒另一个等待入队的线程 */
} finally {
putLock.unlock();
}
if (c == 0) /* 如果入队前,队列大小为空,那么唤醒一个等待出队的线程 */
signalNotEmpty();
}
offer的算法与put类似,这里不再赘述。
出队的public方法
出队的方法与入队类似,也分为阻塞和非阻塞两种,其中:
take()算法为阻塞算法,直到队列有非空时,才将允许调用线程取出数据
poll()算法为非阻塞算法,如果队列为空,立即返回或等待一会再返回,通过返回值ture或false,标记本次出队操作是否成功
peek()算法比较特殊,只返回队列中的第一个元素,既不出队,也不阻塞,如果没有元素,就返回null
task操作的算法如下:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal(); /* 如果完成当前入队操作后,队列依然有剩余的元素,那么再唤醒另一个等待出队的线程 */
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull(); /* 如果出队前,队列是满的,那么出队后,队列就空了,需要通知一个等待入队的线程 */
return x;
}
其余算法,与上述算法类似,这里不再赘述。
多线程安全的迭代器
LinkedBlockingQueue的迭代器,是多线程安全的,在获取元素之前,会对上述读锁和写锁同时加锁,同时,为了防止死锁,读锁和写锁的加解锁顺序,也是经过设计的,代码如下:
void fullyLock() {
putLock.lock(); // 先加写锁
takeLock.lock(); // 再加读锁
}
void fullyUnlock() {
takeLock.unlock(); /* 先解锁读锁 */
putLock.unlock(); /* 再解锁写锁 */
}
LinkedBlockingQueue的迭代器中,保存了以下内容:
private Node<E> current; /* 迭代器的下一个位置 */
private Node<E> lastRet; /* 当前迭代器的位置 */
private E currentElement; /* 当前需要返回的元素内容 */
刚看到代码时,觉得好像只要一个指向当前位置的指针就行了,干嘛这么麻烦呢,但JDK的开发人员考虑的比我们周全多了,这三个参数,足以应付任何多线程的问题:
首先,保存了当前需要返回的内容,可以保证在当前节点移除的情况下,迭代器的next()方法,也能返回当前指向的内容,即使先调用hasNext()方法,其他线程删除了当前对象,那么next()方法也可以保证返回正确对象
其次,如果在迭代器中,调用remove()方法,删除了当前对象,那么 lastRet方法就用上了,可以通过再次遍历列表,找到需要删除的对象,并将其删除,同时为了防止remove()方法被调用两次,在删除时,会将 lastRet设置为null,如果只有这一个指针,那么remove()之后,这个迭代器就啥也干不了了
最后,current保存了迭代器的下一个指向的位置,调用hasNext()时,可以立即直到是否还有空余对象,更重要的是,如果在迭代器创建后,其他线程多次调用了出队的方法,可能导致 lastRet和current都变成悬挂的指针了,这时,只要判断current的next是否为自己,就可以知道自己是否已经被出队,是否需要重定向current的位置
关于迭代器的代码精髓,就是上面的描述了,具体代码,不再赘述。
对锁的精巧使用和思考
LinkedBlockingQueue将读和写操作分离,可以让读写操作在不干扰对方的情况下,完成各自的功能,提高并发吞吐量。
在写这篇文章时,我曾经考虑过,如果使用java内置的同步机制,即 synchronized 关键字进行此类读写锁控制,但实际上实现不了,因为java对象在wait和notify时,需要对lock变量加锁,这样就失去了双锁的优势,同时会导致死锁。
防止内存泄漏
设计链表,最大的一点,就是不能出现内存泄漏。
LinkedBlockingQueue在这点上已经做的很优秀,每次移除节点,都将节点的内容字段设置为null,迭代器也是如此,确保不会发生内存泄漏。
本回答被提问者采纳
已赞过
已踩过<
评论
收起
你对这个回答的评价是?
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询