1*b1cdbd2cSJim Jagielski /************************************************************** 2*b1cdbd2cSJim Jagielski * 3*b1cdbd2cSJim Jagielski * Licensed to the Apache Software Foundation (ASF) under one 4*b1cdbd2cSJim Jagielski * or more contributor license agreements. See the NOTICE file 5*b1cdbd2cSJim Jagielski * distributed with this work for additional information 6*b1cdbd2cSJim Jagielski * regarding copyright ownership. The ASF licenses this file 7*b1cdbd2cSJim Jagielski * to you under the Apache License, Version 2.0 (the 8*b1cdbd2cSJim Jagielski * "License"); you may not use this file except in compliance 9*b1cdbd2cSJim Jagielski * with the License. You may obtain a copy of the License at 10*b1cdbd2cSJim Jagielski * 11*b1cdbd2cSJim Jagielski * http://www.apache.org/licenses/LICENSE-2.0 12*b1cdbd2cSJim Jagielski * 13*b1cdbd2cSJim Jagielski * Unless required by applicable law or agreed to in writing, 14*b1cdbd2cSJim Jagielski * software distributed under the License is distributed on an 15*b1cdbd2cSJim Jagielski * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16*b1cdbd2cSJim Jagielski * KIND, either express or implied. See the License for the 17*b1cdbd2cSJim Jagielski * specific language governing permissions and limitations 18*b1cdbd2cSJim Jagielski * under the License. 19*b1cdbd2cSJim Jagielski * 20*b1cdbd2cSJim Jagielski *************************************************************/ 21*b1cdbd2cSJim Jagielski 22*b1cdbd2cSJim Jagielski 23*b1cdbd2cSJim Jagielski 24*b1cdbd2cSJim Jagielski package com.sun.star.lib.uno.environments.remote; 25*b1cdbd2cSJim Jagielski 26*b1cdbd2cSJim Jagielski import com.sun.star.lang.DisposedException; 27*b1cdbd2cSJim Jagielski 28*b1cdbd2cSJim Jagielski /** 29*b1cdbd2cSJim Jagielski * The <code>JobQueue</code> implements a queue for jobs. 30*b1cdbd2cSJim Jagielski * For every jobs thread id exists a job queue which is registered 31*b1cdbd2cSJim Jagielski * at the <code>ThreadPool</code>. 32*b1cdbd2cSJim Jagielski * A JobQueue is splitted in a sync job queue and an async job queue. 33*b1cdbd2cSJim Jagielski * The sync job queue is the registerd queue, it delegates async jobs 34*b1cdbd2cSJim Jagielski * (put by <code>putjob</code>) into the async queue, which is only 35*b1cdbd2cSJim Jagielski * known by the sync queue. 36*b1cdbd2cSJim Jagielski * <p> 37*b1cdbd2cSJim Jagielski * @version $Revision: 1.19 $ $ $Date: 2008-04-11 11:21:18 $ 38*b1cdbd2cSJim Jagielski * @author Kay Ramme 39*b1cdbd2cSJim Jagielski * @see com.sun.star.lib.uno.environments.remote.ThreadPool 40*b1cdbd2cSJim Jagielski * @see com.sun.star.lib.uno.environments.remote.Job 41*b1cdbd2cSJim Jagielski * @see com.sun.star.lib.uno.environments.remote.ThreadId 42*b1cdbd2cSJim Jagielski * @since UDK1.0 43*b1cdbd2cSJim Jagielski */ 44*b1cdbd2cSJim Jagielski public class JobQueue { 45*b1cdbd2cSJim Jagielski /** 46*b1cdbd2cSJim Jagielski * When set to true, enables various debugging output. 47*b1cdbd2cSJim Jagielski */ 48*b1cdbd2cSJim Jagielski private static final boolean DEBUG = false; 49*b1cdbd2cSJim Jagielski 50*b1cdbd2cSJim Jagielski protected Job _head; // the head of the job list 51*b1cdbd2cSJim Jagielski protected Job _tail; // the tail of the job list 52*b1cdbd2cSJim Jagielski 53*b1cdbd2cSJim Jagielski protected ThreadId _threadId; // the thread id of the queue 54*b1cdbd2cSJim Jagielski protected int _ref_count = 0; // the stack deepness 55*b1cdbd2cSJim Jagielski protected boolean _createThread; // create a worker thread, if needed 56*b1cdbd2cSJim Jagielski protected boolean _createThread_now; // create a worker thread, if needed 57*b1cdbd2cSJim Jagielski protected Thread _worker_thread; // the thread that does the jobs 58*b1cdbd2cSJim Jagielski 59*b1cdbd2cSJim Jagielski protected Object _disposeId; // the active dispose id 60*b1cdbd2cSJim Jagielski protected Object _doDispose = null; 61*b1cdbd2cSJim Jagielski protected Throwable _throwable; 62*b1cdbd2cSJim Jagielski 63*b1cdbd2cSJim Jagielski protected JobQueue _async_jobQueue; // chaining job qeueus for asyncs 64*b1cdbd2cSJim Jagielski protected JobQueue _sync_jobQueue; // chaining job qeueus for syncs 65*b1cdbd2cSJim Jagielski 66*b1cdbd2cSJim Jagielski protected boolean _active = false; 67*b1cdbd2cSJim Jagielski 68*b1cdbd2cSJim Jagielski protected JavaThreadPoolFactory _javaThreadPoolFactory; 69*b1cdbd2cSJim Jagielski 70*b1cdbd2cSJim Jagielski /** 71*b1cdbd2cSJim Jagielski * A thread for dispatching jobs 72*b1cdbd2cSJim Jagielski */ 73*b1cdbd2cSJim Jagielski class JobDispatcher extends Thread { 74*b1cdbd2cSJim Jagielski Object _disposeId; 75*b1cdbd2cSJim Jagielski JobDispatcher(Object disposeId)76*b1cdbd2cSJim Jagielski JobDispatcher(Object disposeId) { 77*b1cdbd2cSJim Jagielski if(DEBUG) System.err.println("JobQueue$JobDispatcher.<init>:" + _threadId); 78*b1cdbd2cSJim Jagielski 79*b1cdbd2cSJim Jagielski _disposeId = disposeId; 80*b1cdbd2cSJim Jagielski } 81*b1cdbd2cSJim Jagielski getThreadId()82*b1cdbd2cSJim Jagielski ThreadId getThreadId() { 83*b1cdbd2cSJim Jagielski return _threadId; 84*b1cdbd2cSJim Jagielski } 85*b1cdbd2cSJim Jagielski run()86*b1cdbd2cSJim Jagielski public void run() { 87*b1cdbd2cSJim Jagielski if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run: " + Thread.currentThread()); 88*b1cdbd2cSJim Jagielski 89*b1cdbd2cSJim Jagielski try { 90*b1cdbd2cSJim Jagielski enter(2000, _disposeId); 91*b1cdbd2cSJim Jagielski } 92*b1cdbd2cSJim Jagielski catch(Throwable throwable) { 93*b1cdbd2cSJim Jagielski if(_head != null || _active) { // there was a job in progress, so give a stack 94*b1cdbd2cSJim Jagielski System.err.println(getClass().getName() + " - exception occurred:" + throwable); 95*b1cdbd2cSJim Jagielski throwable.printStackTrace(System.err); 96*b1cdbd2cSJim Jagielski } 97*b1cdbd2cSJim Jagielski } 98*b1cdbd2cSJim Jagielski finally { 99*b1cdbd2cSJim Jagielski release(); 100*b1cdbd2cSJim Jagielski } 101*b1cdbd2cSJim Jagielski 102*b1cdbd2cSJim Jagielski if(DEBUG) System.err.println("##### " + getClass().getName() + ".run - exit:" + _threadId); 103*b1cdbd2cSJim Jagielski 104*b1cdbd2cSJim Jagielski // try { 105*b1cdbd2cSJim Jagielski // Object object = new Object(); 106*b1cdbd2cSJim Jagielski // synchronized(object) { 107*b1cdbd2cSJim Jagielski // object.wait(); 108*b1cdbd2cSJim Jagielski // } 109*b1cdbd2cSJim Jagielski // } 110*b1cdbd2cSJim Jagielski // catch(InterruptedException interruptedException) { 111*b1cdbd2cSJim Jagielski // } 112*b1cdbd2cSJim Jagielski } 113*b1cdbd2cSJim Jagielski } 114*b1cdbd2cSJim Jagielski 115*b1cdbd2cSJim Jagielski 116*b1cdbd2cSJim Jagielski /** 117*b1cdbd2cSJim Jagielski * Constructs a async job queue with the given thread id 118*b1cdbd2cSJim Jagielski * which belongs to the given sync job queue. 119*b1cdbd2cSJim Jagielski * <p> 120*b1cdbd2cSJim Jagielski * @param threadId the thread id 121*b1cdbd2cSJim Jagielski * @param sync_jobQueue the sync queue this async queue belongs to 122*b1cdbd2cSJim Jagielski * @see com.sun.star.lib.uno.environments.remote.ThreadID 123*b1cdbd2cSJim Jagielski */ JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId)124*b1cdbd2cSJim Jagielski JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId) { 125*b1cdbd2cSJim Jagielski _javaThreadPoolFactory = javaThreadPoolFactory; 126*b1cdbd2cSJim Jagielski _threadId = ThreadId.createFresh(); 127*b1cdbd2cSJim Jagielski 128*b1cdbd2cSJim Jagielski _sync_jobQueue = javaThreadPoolFactory.getJobQueue(threadId); 129*b1cdbd2cSJim Jagielski if(_sync_jobQueue == null) { 130*b1cdbd2cSJim Jagielski _sync_jobQueue = new JobQueue(javaThreadPoolFactory, threadId, true); 131*b1cdbd2cSJim Jagielski _sync_jobQueue.acquire(); 132*b1cdbd2cSJim Jagielski } 133*b1cdbd2cSJim Jagielski 134*b1cdbd2cSJim Jagielski _sync_jobQueue._async_jobQueue = this; 135*b1cdbd2cSJim Jagielski 136*b1cdbd2cSJim Jagielski _createThread = true; 137*b1cdbd2cSJim Jagielski _createThread_now = true; 138*b1cdbd2cSJim Jagielski 139*b1cdbd2cSJim Jagielski acquire(); 140*b1cdbd2cSJim Jagielski 141*b1cdbd2cSJim Jagielski if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId); 142*b1cdbd2cSJim Jagielski } 143*b1cdbd2cSJim Jagielski 144*b1cdbd2cSJim Jagielski /** 145*b1cdbd2cSJim Jagielski * Constructs a sync job queue with the given thread id and the given thread. 146*b1cdbd2cSJim Jagielski * <p> 147*b1cdbd2cSJim Jagielski * @param threadId the thread id 148*b1cdbd2cSJim Jagielski * @param createThread if true, the queue creates a worker thread if needed 149*b1cdbd2cSJim Jagielski * @see com.sun.star.lib.uno.environments.remote.ThreadID 150*b1cdbd2cSJim Jagielski */ JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread)151*b1cdbd2cSJim Jagielski JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread){ 152*b1cdbd2cSJim Jagielski _javaThreadPoolFactory = javaThreadPoolFactory; 153*b1cdbd2cSJim Jagielski _threadId = threadId; 154*b1cdbd2cSJim Jagielski _createThread = createThread; 155*b1cdbd2cSJim Jagielski _createThread_now = createThread; 156*b1cdbd2cSJim Jagielski 157*b1cdbd2cSJim Jagielski if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId + " " + createThread); 158*b1cdbd2cSJim Jagielski } 159*b1cdbd2cSJim Jagielski 160*b1cdbd2cSJim Jagielski /** 161*b1cdbd2cSJim Jagielski * Gives the thread id of this queue 162*b1cdbd2cSJim Jagielski * <p> 163*b1cdbd2cSJim Jagielski * @return the thread id 164*b1cdbd2cSJim Jagielski * @see com.sun.star.lib.uno.environments.remote.ThreadID 165*b1cdbd2cSJim Jagielski */ getThreadId()166*b1cdbd2cSJim Jagielski ThreadId getThreadId() { 167*b1cdbd2cSJim Jagielski return _threadId; 168*b1cdbd2cSJim Jagielski } 169*b1cdbd2cSJim Jagielski acquire()170*b1cdbd2cSJim Jagielski synchronized void acquire() { 171*b1cdbd2cSJim Jagielski // add only synchronous queues . 172*b1cdbd2cSJim Jagielski if(_ref_count <= 0 && _sync_jobQueue == null ) 173*b1cdbd2cSJim Jagielski _javaThreadPoolFactory.addJobQueue(this); 174*b1cdbd2cSJim Jagielski 175*b1cdbd2cSJim Jagielski ++ _ref_count; 176*b1cdbd2cSJim Jagielski } 177*b1cdbd2cSJim Jagielski release()178*b1cdbd2cSJim Jagielski synchronized void release() { 179*b1cdbd2cSJim Jagielski -- _ref_count; 180*b1cdbd2cSJim Jagielski 181*b1cdbd2cSJim Jagielski if(_ref_count <= 0) { 182*b1cdbd2cSJim Jagielski // only synchronous queues needs to be removed . 183*b1cdbd2cSJim Jagielski if( _sync_jobQueue == null ) 184*b1cdbd2cSJim Jagielski _javaThreadPoolFactory.removeJobQueue(this); 185*b1cdbd2cSJim Jagielski 186*b1cdbd2cSJim Jagielski 187*b1cdbd2cSJim Jagielski if(_sync_jobQueue != null) { 188*b1cdbd2cSJim Jagielski _sync_jobQueue._async_jobQueue = null; 189*b1cdbd2cSJim Jagielski _sync_jobQueue.release(); 190*b1cdbd2cSJim Jagielski } 191*b1cdbd2cSJim Jagielski } 192*b1cdbd2cSJim Jagielski } 193*b1cdbd2cSJim Jagielski 194*b1cdbd2cSJim Jagielski /** 195*b1cdbd2cSJim Jagielski * Removes a job from the queue. 196*b1cdbd2cSJim Jagielski * <p> 197*b1cdbd2cSJim Jagielski * @return a job or null if timed out 198*b1cdbd2cSJim Jagielski * @param waitTime the maximum amount of time to wait for a job 199*b1cdbd2cSJim Jagielski */ removeJob(int waitTime)200*b1cdbd2cSJim Jagielski private Job removeJob(int waitTime) { 201*b1cdbd2cSJim Jagielski if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeJob:" + _head + " " + _threadId); 202*b1cdbd2cSJim Jagielski 203*b1cdbd2cSJim Jagielski Job job = null; 204*b1cdbd2cSJim Jagielski synchronized (this) { 205*b1cdbd2cSJim Jagielski // wait max. waitTime time for a job to enter the queue 206*b1cdbd2cSJim Jagielski boolean waited = false; 207*b1cdbd2cSJim Jagielski while(_head == null && (waitTime == 0 || !waited)) { 208*b1cdbd2cSJim Jagielski if(_doDispose == _disposeId) { 209*b1cdbd2cSJim Jagielski _doDispose = null; 210*b1cdbd2cSJim Jagielski throw (DisposedException) 211*b1cdbd2cSJim Jagielski new DisposedException().initCause(_throwable); 212*b1cdbd2cSJim Jagielski } 213*b1cdbd2cSJim Jagielski 214*b1cdbd2cSJim Jagielski // notify sync queues 215*b1cdbd2cSJim Jagielski notifyAll(); 216*b1cdbd2cSJim Jagielski 217*b1cdbd2cSJim Jagielski try { 218*b1cdbd2cSJim Jagielski // wait for new job 219*b1cdbd2cSJim Jagielski wait(waitTime); 220*b1cdbd2cSJim Jagielski } 221*b1cdbd2cSJim Jagielski catch(InterruptedException interruptedException) { 222*b1cdbd2cSJim Jagielski throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException); 223*b1cdbd2cSJim Jagielski } 224*b1cdbd2cSJim Jagielski 225*b1cdbd2cSJim Jagielski // signal that we have already waited once 226*b1cdbd2cSJim Jagielski waited = true; 227*b1cdbd2cSJim Jagielski } 228*b1cdbd2cSJim Jagielski 229*b1cdbd2cSJim Jagielski 230*b1cdbd2cSJim Jagielski if(_head != null) { 231*b1cdbd2cSJim Jagielski Job current = _head; 232*b1cdbd2cSJim Jagielski _head = _head._next; 233*b1cdbd2cSJim Jagielski 234*b1cdbd2cSJim Jagielski if(_head == null) 235*b1cdbd2cSJim Jagielski _tail = null; 236*b1cdbd2cSJim Jagielski 237*b1cdbd2cSJim Jagielski job = current; 238*b1cdbd2cSJim Jagielski _active = true; 239*b1cdbd2cSJim Jagielski } 240*b1cdbd2cSJim Jagielski } 241*b1cdbd2cSJim Jagielski 242*b1cdbd2cSJim Jagielski // always wait for asynchron jobqueue to be finished ! 243*b1cdbd2cSJim Jagielski if(job != null && _async_jobQueue != null) { 244*b1cdbd2cSJim Jagielski synchronized(_async_jobQueue) { 245*b1cdbd2cSJim Jagielski // wait for async queue to be empty and last job to be done 246*b1cdbd2cSJim Jagielski while(_async_jobQueue._active || _async_jobQueue._head != null) { 247*b1cdbd2cSJim Jagielski if(DEBUG) System.err.println("waiting for async:" + _async_jobQueue._head + " " + _async_jobQueue._worker_thread); 248*b1cdbd2cSJim Jagielski 249*b1cdbd2cSJim Jagielski if(_doDispose == _disposeId) { 250*b1cdbd2cSJim Jagielski _doDispose = null; 251*b1cdbd2cSJim Jagielski throw (DisposedException) 252*b1cdbd2cSJim Jagielski new DisposedException().initCause(_throwable); 253*b1cdbd2cSJim Jagielski } 254*b1cdbd2cSJim Jagielski 255*b1cdbd2cSJim Jagielski try { 256*b1cdbd2cSJim Jagielski _async_jobQueue.wait(); 257*b1cdbd2cSJim Jagielski } 258*b1cdbd2cSJim Jagielski catch(InterruptedException interruptedException) { 259*b1cdbd2cSJim Jagielski throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException); 260*b1cdbd2cSJim Jagielski } 261*b1cdbd2cSJim Jagielski } 262*b1cdbd2cSJim Jagielski } 263*b1cdbd2cSJim Jagielski } 264*b1cdbd2cSJim Jagielski 265*b1cdbd2cSJim Jagielski return job; 266*b1cdbd2cSJim Jagielski } 267*b1cdbd2cSJim Jagielski 268*b1cdbd2cSJim Jagielski /** 269*b1cdbd2cSJim Jagielski * Puts a job into the queue. 270*b1cdbd2cSJim Jagielski * <p> 271*b1cdbd2cSJim Jagielski * @param job the job 272*b1cdbd2cSJim Jagielski * @param disposeId a dispose id 273*b1cdbd2cSJim Jagielski */ putJob(Job job, Object disposeId)274*b1cdbd2cSJim Jagielski synchronized void putJob(Job job, Object disposeId) { 275*b1cdbd2cSJim Jagielski if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job); 276*b1cdbd2cSJim Jagielski 277*b1cdbd2cSJim Jagielski if(_tail != null) 278*b1cdbd2cSJim Jagielski _tail._next = job; 279*b1cdbd2cSJim Jagielski else 280*b1cdbd2cSJim Jagielski _head = job; 281*b1cdbd2cSJim Jagielski 282*b1cdbd2cSJim Jagielski _tail = job; 283*b1cdbd2cSJim Jagielski 284*b1cdbd2cSJim Jagielski if(_worker_thread == null && _createThread && _createThread_now) { // if there is no thread, which dispatches and if shall create one, create one 285*b1cdbd2cSJim Jagielski 286*b1cdbd2cSJim Jagielski acquire(); 287*b1cdbd2cSJim Jagielski 288*b1cdbd2cSJim Jagielski _createThread_now = false; 289*b1cdbd2cSJim Jagielski new JobDispatcher(disposeId).start(); 290*b1cdbd2cSJim Jagielski } 291*b1cdbd2cSJim Jagielski 292*b1cdbd2cSJim Jagielski // always notify possible waiters 293*b1cdbd2cSJim Jagielski notifyAll(); 294*b1cdbd2cSJim Jagielski } 295*b1cdbd2cSJim Jagielski 296*b1cdbd2cSJim Jagielski /** 297*b1cdbd2cSJim Jagielski * Enters the job queue. 298*b1cdbd2cSJim Jagielski * <p> 299*b1cdbd2cSJim Jagielski * @return the result of the final job (reply) 300*b1cdbd2cSJim Jagielski * @param disposeId a dispose id 301*b1cdbd2cSJim Jagielski */ enter(Object disposeId)302*b1cdbd2cSJim Jagielski Object enter(Object disposeId) throws Throwable { 303*b1cdbd2cSJim Jagielski return enter(0, disposeId); // wait infinitly 304*b1cdbd2cSJim Jagielski } 305*b1cdbd2cSJim Jagielski 306*b1cdbd2cSJim Jagielski /** 307*b1cdbd2cSJim Jagielski * Enters the job queue. 308*b1cdbd2cSJim Jagielski * <p> 309*b1cdbd2cSJim Jagielski * @return the result of the final job (reply) 310*b1cdbd2cSJim Jagielski * @param waitTime the maximum amount of time to wait for a job (0 means wait infinitly) 311*b1cdbd2cSJim Jagielski * @param disposeId a dispose id 312*b1cdbd2cSJim Jagielski */ enter(int waitTime, Object disposeId)313*b1cdbd2cSJim Jagielski Object enter(int waitTime, Object disposeId) throws Throwable { 314*b1cdbd2cSJim Jagielski if(DEBUG) System.err.println("#####" + getClass().getName() + ".enter: " + _threadId); 315*b1cdbd2cSJim Jagielski 316*b1cdbd2cSJim Jagielski boolean quit = false; 317*b1cdbd2cSJim Jagielski 318*b1cdbd2cSJim Jagielski Object hold_disposeId = _disposeId; 319*b1cdbd2cSJim Jagielski _disposeId = disposeId; 320*b1cdbd2cSJim Jagielski 321*b1cdbd2cSJim Jagielski Object result = null; 322*b1cdbd2cSJim Jagielski 323*b1cdbd2cSJim Jagielski Thread hold_worker_thread = _worker_thread; 324*b1cdbd2cSJim Jagielski _worker_thread = Thread.currentThread(); 325*b1cdbd2cSJim Jagielski 326*b1cdbd2cSJim Jagielski while(!quit) { 327*b1cdbd2cSJim Jagielski Job job = null; 328*b1cdbd2cSJim Jagielski 329*b1cdbd2cSJim Jagielski try { 330*b1cdbd2cSJim Jagielski job = removeJob(waitTime); 331*b1cdbd2cSJim Jagielski 332*b1cdbd2cSJim Jagielski if(job != null) { 333*b1cdbd2cSJim Jagielski try { 334*b1cdbd2cSJim Jagielski result = job.execute(); 335*b1cdbd2cSJim Jagielski } 336*b1cdbd2cSJim Jagielski finally { 337*b1cdbd2cSJim Jagielski _active = false; 338*b1cdbd2cSJim Jagielski } 339*b1cdbd2cSJim Jagielski 340*b1cdbd2cSJim Jagielski if (!job.isRequest()) { 341*b1cdbd2cSJim Jagielski job.dispose(); 342*b1cdbd2cSJim Jagielski 343*b1cdbd2cSJim Jagielski quit = true; 344*b1cdbd2cSJim Jagielski } 345*b1cdbd2cSJim Jagielski 346*b1cdbd2cSJim Jagielski job = null; 347*b1cdbd2cSJim Jagielski } 348*b1cdbd2cSJim Jagielski else 349*b1cdbd2cSJim Jagielski quit = true; 350*b1cdbd2cSJim Jagielski 351*b1cdbd2cSJim Jagielski 352*b1cdbd2cSJim Jagielski } 353*b1cdbd2cSJim Jagielski finally { // ensure that this queue becomes disposed, if necessary 354*b1cdbd2cSJim Jagielski if(DEBUG) System.err.println("##### " + getClass().getName() + ".enter leaving: " + _threadId + " " + _worker_thread + " " + hold_worker_thread + " " + result); 355*b1cdbd2cSJim Jagielski 356*b1cdbd2cSJim Jagielski synchronized(this) { 357*b1cdbd2cSJim Jagielski if(job != null || (quit && _head == null)) { 358*b1cdbd2cSJim Jagielski _worker_thread = hold_worker_thread; 359*b1cdbd2cSJim Jagielski 360*b1cdbd2cSJim Jagielski _createThread_now = true; 361*b1cdbd2cSJim Jagielski 362*b1cdbd2cSJim Jagielski _disposeId = hold_disposeId; 363*b1cdbd2cSJim Jagielski 364*b1cdbd2cSJim Jagielski if(_sync_jobQueue != null) 365*b1cdbd2cSJim Jagielski notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting) 366*b1cdbd2cSJim Jagielski } 367*b1cdbd2cSJim Jagielski else 368*b1cdbd2cSJim Jagielski quit = false; 369*b1cdbd2cSJim Jagielski 370*b1cdbd2cSJim Jagielski } 371*b1cdbd2cSJim Jagielski } 372*b1cdbd2cSJim Jagielski } 373*b1cdbd2cSJim Jagielski 374*b1cdbd2cSJim Jagielski return result; 375*b1cdbd2cSJim Jagielski } 376*b1cdbd2cSJim Jagielski 377*b1cdbd2cSJim Jagielski /** 378*b1cdbd2cSJim Jagielski * If the given disposeId is registered, 379*b1cdbd2cSJim Jagielski * interrups the worker thread. 380*b1cdbd2cSJim Jagielski * <p> 381*b1cdbd2cSJim Jagielski * @param disposeId the dispose id 382*b1cdbd2cSJim Jagielski */ dispose(Object disposeId, Throwable throwable)383*b1cdbd2cSJim Jagielski synchronized void dispose(Object disposeId, Throwable throwable) { 384*b1cdbd2cSJim Jagielski if(_sync_jobQueue == null) { // dispose only sync queues 385*b1cdbd2cSJim Jagielski _doDispose = disposeId; 386*b1cdbd2cSJim Jagielski _throwable = throwable; 387*b1cdbd2cSJim Jagielski 388*b1cdbd2cSJim Jagielski // get thread out of wait and let it throw the throwable 389*b1cdbd2cSJim Jagielski if(DEBUG) System.err.println(getClass().getName() + ".dispose - notifying thread"); 390*b1cdbd2cSJim Jagielski 391*b1cdbd2cSJim Jagielski notifyAll(); 392*b1cdbd2cSJim Jagielski } 393*b1cdbd2cSJim Jagielski } 394*b1cdbd2cSJim Jagielski } 395*b1cdbd2cSJim Jagielski 396