編者按:筆者在使用PriorityBlockingQueue實現(xiàn)按照優(yōu)先級處理任務(wù)時遇到一類NPE問題,經(jīng)過分析發(fā)現(xiàn)根本原因是在任務(wù)出隊列時調(diào)用比較器異常,進而導致后續(xù)任務(wù)出隊列拋出NullPointerException。本文通過完整的案例復(fù)現(xiàn)來演示在什么情況會觸發(fā)該問題,同時給出了處理建議。希望讀者在編程時加以借鑒,避免再次遇到此類問題。
背景知識
PriorityBlockingQueue是一個無界的基于數(shù)組的優(yōu)先級阻塞隊列,使用一個全局ReentrantLock來控制某一時刻只有一個線程可以進行元素出隊和入隊操作,并且每次出隊都返回優(yōu)先級別最高的或者最低的元素。PriorityBlockingQueue通過以下兩種方式實現(xiàn)元素優(yōu)先級排序:
- 入隊元素實現(xiàn)Comparable接口來比較元素優(yōu)先級;
- PriorityBlockingQueue構(gòu)造函數(shù)指定Comparator來比較元素優(yōu)先級;
關(guān)于PriorityBlockingQueue中隊列操作的部分,基本和PriorityQueue邏輯一致,只不過在操作時加鎖了。在本文中我們主要關(guān)注PriorityBlockingQueue出隊的take方法,該方法通過調(diào)用dequeue方法將元素出隊列。當沒有元素可以出隊的時候,線程就會阻塞等待。
publicEtake()throwsInterruptedException{
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
Eresult;
try{
//嘗試獲取最小元素,即小頂堆第一個元素,然后重新排序,如果不存在表示隊列暫無元素,進行阻塞等待。
while((result=dequeue())==null)
notEmpty.await();
}finally{
lock.unlock();
}
returnresult;
}
現(xiàn)象
在某個業(yè)務(wù)服務(wù)中使用PriorityBlockingQueue實現(xiàn)按照優(yōu)先級處理任務(wù),某一天環(huán)境中的服務(wù)突然間不處理任務(wù)了,查看后臺日志,發(fā)現(xiàn)一直拋出NullPointerException。將進程堆dump出來,使用MAT發(fā)現(xiàn)某個PriorityBlockingQueue中的size值比實際元素個數(shù)多1個(入隊時已經(jīng)對任務(wù)進行非空校驗)。
異常堆棧如下:
java.lang.NullPointerException
atjava.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
atjava.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
atjava.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
...
MAT結(jié)果:

原因分析
在此我們分析下PriorityBlockingQueue是如何出隊列的,PriorityBlockingQueue最終通過調(diào)用dequeue方法出隊列,dequeue方法處理邏輯如下:
- 將根節(jié)點(array[0])賦值給result;
- array[n] 賦值給 arrary[0];
- 將 array[n] 設(shè)置為 null;
- 調(diào)用siftDownComparable或siftDownUsingComparator對隊列元素重新排序;
- size大小減1;
- 返回result;
如果在第4步中出現(xiàn)異常,就會出現(xiàn)隊列中的元素個數(shù)比實際的元素個數(shù)多1個的現(xiàn)象。此時size未發(fā)生改變,arry[n]已經(jīng)被置為null,再進行siftDown操作時就會拋出NullPointerException。繼續(xù)分析第4步中在什么情況下會出現(xiàn)異常,通過代碼走讀我們可以發(fā)現(xiàn)只有在調(diào)用Comparable#compareTo或者Comparator#compare方法進行元素比較的時候才可能出現(xiàn)異常。這塊代碼的處理邏輯和業(yè)務(wù)相關(guān),如果業(yè)務(wù)代碼處理不當拋出異常,就會導致上述現(xiàn)象。
/**
*Mechanicsforpoll().Callonlywhileholdinglock.
*/
privateEdequeue(){
intn=size-1;
if(n0)
returnnull;
else{
Object[]array=queue;
Eresult=(E)array[0];//step1
Ex=(E)array[n];//step2
array[n]=null;//step3
Comparator?superE>cmp=comparator;
if(cmp==null)//step4 如果指定了comparator,就按照指定的comparator來比較。否則就按照默認的
siftDownComparable(0,x,array,n);
else
siftDownUsingComparator(0,x,array,n,cmp);
size=n;//step5
returnresult;//step6
}
}
privatestaticvoidsiftDownComparable(intk,Tx,Object[]array,intn){
if(n>0){
Comparable?superT>key=(Comparable?superT>)x;
inthalf=n>>>1;
while(kintchild=(k<1)+1;
Objectc=array[child];
intright=child+1;
if(rightsuperT>)c).compareTo((T)array[right])>0)
c=array[child=right];
if(key.compareTo((T)c)<=?0)
break;
array[k]=c;
k=child;
}
array[k]=key;
}
}
privatestaticvoidsiftDownUsingComparator(intk,Tx,Object[]array,intn,
Comparator?superT>cmp){
if(n>0){
inthalf=n>>>1;
while(kintchild=(k<1)+1;
Objectc=array[child];
intright=child+1;
if(right0)
c=array[child=right];
if(cmp.compare(x,(T)c)<=?0)
break;
array[k]=c;
k=child;
}
array[k]=x;
}
}
復(fù)現(xiàn)代碼
importjava.util.ArrayList;
importjava.util.List;
importjava.util.concurrent.PriorityBlockingQueue;
publicclassPriorityBlockingQueueTest{
staticclassEntityimplementsComparable<Entity>{
privateintid;
privateStringname;
privatebooleanflag;
publicvoidsetFlag(booleanflag){
this.flag=flag;
}
publicEntity(intid,Stringname){
this.id=id;
this.name=name;
}
@Override
publicintcompareTo(Entityentity){
if(flag){
thrownewRuntimeException("TestException");
}
if(entity==null||this.id>entity.id){
return1;
}
returnthis.id==entity.id?0:-1;
}
}
publicstaticvoidmain(String[]args){
intnum=5;
PriorityBlockingQueuepriorityBlockingQueue=newPriorityBlockingQueue<>();
Listentities=newArrayList<>();
for(inti=0;inewEntity(i,"entity"+i);
entities.add(entity);
priorityBlockingQueue.offer(entity);
}
entities.get(num-1).setFlag(true);
intsize=entities.size();
for(inti=0;itry{
priorityBlockingQueue.take();
}catch(Exceptione){
e.printStackTrace();
}
}
}
執(zhí)行結(jié)果如下:
java.lang.RuntimeException:TestException
atPriorityBlockingQueueTest$Entity.compareTo(PriorityBlockingQueueTest.java:31)
atPriorityBlockingQueueTest$Entity.compareTo(PriorityBlockingQueueTest.java:8)
atjava.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
atjava.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
atjava.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
atPriorityBlockingQueueTest.main(PriorityBlockingQueueTest.java:71)
java.lang.NullPointerException
atjava.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
atjava.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
atjava.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
atPriorityBlockingQueueTest.main(PriorityBlockingQueueTest.java:71)
規(guī)避方案
可以通過以下兩種方法規(guī)避:
- 在take方法出現(xiàn)NPE時,清除隊列元素,將未處理的元素重新進入隊列;
- 在 Comparable#compareTo 或 Comparator#compare 方法中做好異常處理,對異常情況進行默認操作;
建議使用后者。
案例引申
使用PriorityBlockingQueue作為緩存隊列來創(chuàng)建線程池時,使用submit提交任務(wù)會出現(xiàn) java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to 異常,而使用execute沒有問題。
觀察submit源碼可以發(fā)現(xiàn)在submit內(nèi)部代碼會將Runable封裝成RunnableFuture對象,然后調(diào)用execute提交任務(wù)。
publicFuture>submit(Runnabletask){
if(task==null)thrownewNullPointerException();
RunnableFutureftask=newTaskFor(task,null);
execute(ftask);
returnftask;
}
以Comparable為例,任務(wù)入隊列時,最終會調(diào)用siftUpComparable方法。該方法第一步將RunnableFuture強轉(zhuǎn)為Comparable類型,而RunnableFuture類未實現(xiàn)Comparable接口,進而拋出ClassCastException異常。
publicbooleanoffer(Ee){
if(e==null)
thrownewNullPointerException();
finalReentrantLocklock=this.lock;
lock.lock();
intn,cap;
Object[]array;
while((n=size)>=(cap=(array=queue).length))
tryGrow(array,cap);
try{
Comparator?superE>cmp=comparator;
if(cmp==null)
siftUpComparable(n,e,array);
else
siftUpUsingComparator(n,e,array,cmp);
size=n+1;
notEmpty.signal();
}finally{
lock.unlock();
}
returntrue;
}
privatestaticvoidsiftUpComparable(intk,Tx,Object[]array){
Comparable?superT>key=(Comparable?superT>)x;
while(k>0){
intparent=(k-1)>>>1;
Objecte=array[parent];
if(key.compareTo((T)e)>=0)
break;
array[k]=e;
k=parent;
}
array[k]=key;
}
這也是常見的比較器調(diào)用異常案例,本文不再贅述,可自行參考其他文章。
總結(jié)
在使用PriorityBlockingQueue時,注意在比較器中做好異常處理,避免出現(xiàn)類似問題。
審核編輯 :李倩
-
比較器
+關(guān)注
關(guān)注
14文章
1936瀏覽量
112115 -
數(shù)組
+關(guān)注
關(guān)注
1文章
420瀏覽量
27483
原文標題:畢昇 JDK | PriorityBlockingQueue比較器異常導致的NPE問題分析
文章出處:【微信號:openEulercommunity,微信公眾號:openEuler】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
RDMA設(shè)計43:隊列刪除及連接斷開功能測試
RDMA設(shè)計41:隊列管理及連接建立功能驗證與分析2
RDMA設(shè)計40:隊列管理及連接建立功能驗證與分析
RDMA設(shè)計25:隊列管理模塊之發(fā)送模塊詳細設(shè)計分析
RDMA設(shè)計26:隊列管理模塊設(shè)計之接收隊列模塊詳細分析
RDMA設(shè)計24:隊列管理模塊設(shè)計
RDMA設(shè)計18:隊列管理模塊設(shè)計3
RDMA設(shè)計17:隊列管理模塊設(shè)計2
NVMe高速傳輸之擺脫XDMA設(shè)計54:如何測試隊列管理功能2
優(yōu)先級隊列介紹
基于環(huán)形隊列的UART收發(fā)回顯實驗
關(guān)于PriorityBlockingQueue中隊列操作
評論