1*b1cdbd2cSJim Jagielski /**************************************************************
2*b1cdbd2cSJim Jagielski  *
3*b1cdbd2cSJim Jagielski  * Licensed to the Apache Software Foundation (ASF) under one
4*b1cdbd2cSJim Jagielski  * or more contributor license agreements.  See the NOTICE file
5*b1cdbd2cSJim Jagielski  * distributed with this work for additional information
6*b1cdbd2cSJim Jagielski  * regarding copyright ownership.  The ASF licenses this file
7*b1cdbd2cSJim Jagielski  * to you under the Apache License, Version 2.0 (the
8*b1cdbd2cSJim Jagielski  * "License"); you may not use this file except in compliance
9*b1cdbd2cSJim Jagielski  * with the License.  You may obtain a copy of the License at
10*b1cdbd2cSJim Jagielski  *
11*b1cdbd2cSJim Jagielski  *   http://www.apache.org/licenses/LICENSE-2.0
12*b1cdbd2cSJim Jagielski  *
13*b1cdbd2cSJim Jagielski  * Unless required by applicable law or agreed to in writing,
14*b1cdbd2cSJim Jagielski  * software distributed under the License is distributed on an
15*b1cdbd2cSJim Jagielski  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16*b1cdbd2cSJim Jagielski  * KIND, either express or implied.  See the License for the
17*b1cdbd2cSJim Jagielski  * specific language governing permissions and limitations
18*b1cdbd2cSJim Jagielski  * under the License.
19*b1cdbd2cSJim Jagielski  *
20*b1cdbd2cSJim Jagielski  *************************************************************/
21*b1cdbd2cSJim Jagielski 
22*b1cdbd2cSJim Jagielski 
23*b1cdbd2cSJim Jagielski 
24*b1cdbd2cSJim Jagielski package com.sun.star.lib.uno.environments.remote;
25*b1cdbd2cSJim Jagielski 
26*b1cdbd2cSJim Jagielski import com.sun.star.lang.DisposedException;
27*b1cdbd2cSJim Jagielski 
28*b1cdbd2cSJim Jagielski /**
29*b1cdbd2cSJim Jagielski  * The <code>JobQueue</code> implements a queue for jobs.
30*b1cdbd2cSJim Jagielski  * For every jobs thread id exists a job queue which is registered
31*b1cdbd2cSJim Jagielski  * at the <code>ThreadPool</code>.
32*b1cdbd2cSJim Jagielski  * A JobQueue is splitted in a sync job queue and an async job queue.
33*b1cdbd2cSJim Jagielski  * The sync job queue is the registerd queue, it delegates async jobs
34*b1cdbd2cSJim Jagielski  * (put by <code>putjob</code>) into the async queue, which is only
35*b1cdbd2cSJim Jagielski  * known by the sync queue.
36*b1cdbd2cSJim Jagielski  * <p>
37*b1cdbd2cSJim Jagielski  * @version 	$Revision: 1.19 $ $ $Date: 2008-04-11 11:21:18 $
38*b1cdbd2cSJim Jagielski  * @author 	    Kay Ramme
39*b1cdbd2cSJim Jagielski  * @see         com.sun.star.lib.uno.environments.remote.ThreadPool
40*b1cdbd2cSJim Jagielski  * @see         com.sun.star.lib.uno.environments.remote.Job
41*b1cdbd2cSJim Jagielski  * @see         com.sun.star.lib.uno.environments.remote.ThreadId
42*b1cdbd2cSJim Jagielski  * @since       UDK1.0
43*b1cdbd2cSJim Jagielski  */
44*b1cdbd2cSJim Jagielski public class JobQueue {
45*b1cdbd2cSJim Jagielski 	/**
46*b1cdbd2cSJim Jagielski 	 * When set to true, enables various debugging output.
47*b1cdbd2cSJim Jagielski 	 */
48*b1cdbd2cSJim Jagielski 	private static final boolean DEBUG = false;
49*b1cdbd2cSJim Jagielski 
50*b1cdbd2cSJim Jagielski 	protected Job _head;                 // the head of the job list
51*b1cdbd2cSJim Jagielski 	protected Job _tail;                 // the tail of the job list
52*b1cdbd2cSJim Jagielski 
53*b1cdbd2cSJim Jagielski 	protected ThreadId  _threadId;       // the thread id of the queue
54*b1cdbd2cSJim Jagielski 	protected int       _ref_count = 0;  // the stack deepness
55*b1cdbd2cSJim Jagielski 	protected boolean   _createThread;   // create a worker thread, if needed
56*b1cdbd2cSJim Jagielski 	protected boolean   _createThread_now;   // create a worker thread, if needed
57*b1cdbd2cSJim Jagielski 	protected Thread    _worker_thread;  // the thread that does the jobs
58*b1cdbd2cSJim Jagielski 
59*b1cdbd2cSJim Jagielski 	protected Object    _disposeId; // the active dispose id
60*b1cdbd2cSJim Jagielski 	protected Object    _doDispose = null;
61*b1cdbd2cSJim Jagielski 	protected Throwable _throwable;
62*b1cdbd2cSJim Jagielski 
63*b1cdbd2cSJim Jagielski 	protected JobQueue  _async_jobQueue; // chaining job qeueus for asyncs
64*b1cdbd2cSJim Jagielski 	protected JobQueue  _sync_jobQueue;  // chaining job qeueus for syncs
65*b1cdbd2cSJim Jagielski 
66*b1cdbd2cSJim Jagielski 	protected boolean _active = false;
67*b1cdbd2cSJim Jagielski 
68*b1cdbd2cSJim Jagielski 	protected JavaThreadPoolFactory _javaThreadPoolFactory;
69*b1cdbd2cSJim Jagielski 
70*b1cdbd2cSJim Jagielski 	/**
71*b1cdbd2cSJim Jagielski 	 * A thread for dispatching jobs
72*b1cdbd2cSJim Jagielski 	 */
73*b1cdbd2cSJim Jagielski 	class JobDispatcher extends Thread {
74*b1cdbd2cSJim Jagielski 		Object _disposeId;
75*b1cdbd2cSJim Jagielski 
JobDispatcher(Object disposeId)76*b1cdbd2cSJim Jagielski 		JobDispatcher(Object disposeId) {
77*b1cdbd2cSJim Jagielski 			if(DEBUG) System.err.println("JobQueue$JobDispatcher.<init>:" + _threadId);
78*b1cdbd2cSJim Jagielski 
79*b1cdbd2cSJim Jagielski 			_disposeId = disposeId;
80*b1cdbd2cSJim Jagielski 		}
81*b1cdbd2cSJim Jagielski 
getThreadId()82*b1cdbd2cSJim Jagielski 		ThreadId getThreadId() {
83*b1cdbd2cSJim Jagielski 			return _threadId;
84*b1cdbd2cSJim Jagielski 		}
85*b1cdbd2cSJim Jagielski 
run()86*b1cdbd2cSJim Jagielski 		public void run() {
87*b1cdbd2cSJim Jagielski 			if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run: " + Thread.currentThread());
88*b1cdbd2cSJim Jagielski 
89*b1cdbd2cSJim Jagielski 			try {
90*b1cdbd2cSJim Jagielski   				enter(2000, _disposeId);
91*b1cdbd2cSJim Jagielski 			}
92*b1cdbd2cSJim Jagielski 			catch(Throwable throwable) {
93*b1cdbd2cSJim Jagielski 				if(_head != null || _active) { // there was a job in progress, so give a stack
94*b1cdbd2cSJim Jagielski 					System.err.println(getClass().getName() + " - exception occurred:" + throwable);
95*b1cdbd2cSJim Jagielski 					throwable.printStackTrace(System.err);
96*b1cdbd2cSJim Jagielski 				}
97*b1cdbd2cSJim Jagielski 			}
98*b1cdbd2cSJim Jagielski 			finally {
99*b1cdbd2cSJim Jagielski 				release();
100*b1cdbd2cSJim Jagielski 			}
101*b1cdbd2cSJim Jagielski 
102*b1cdbd2cSJim Jagielski 			if(DEBUG) System.err.println("##### " + getClass().getName() + ".run - exit:" + _threadId);
103*b1cdbd2cSJim Jagielski 
104*b1cdbd2cSJim Jagielski //  			try {
105*b1cdbd2cSJim Jagielski //  				Object object = new Object();
106*b1cdbd2cSJim Jagielski //  				synchronized(object) {
107*b1cdbd2cSJim Jagielski //  					object.wait();
108*b1cdbd2cSJim Jagielski //  				}
109*b1cdbd2cSJim Jagielski //  			}
110*b1cdbd2cSJim Jagielski //  			catch(InterruptedException interruptedException) {
111*b1cdbd2cSJim Jagielski //  			}
112*b1cdbd2cSJim Jagielski 		}
113*b1cdbd2cSJim Jagielski 	}
114*b1cdbd2cSJim Jagielski 
115*b1cdbd2cSJim Jagielski 
116*b1cdbd2cSJim Jagielski 	/**
117*b1cdbd2cSJim Jagielski 	 * Constructs a async job queue with the given thread id
118*b1cdbd2cSJim Jagielski 	 * which belongs to the given sync job queue.
119*b1cdbd2cSJim Jagielski 	 * <p>
120*b1cdbd2cSJim Jagielski 	 * @param threadId         the thread id
121*b1cdbd2cSJim Jagielski 	 * @param sync_jobQueue    the sync queue this async queue belongs to
122*b1cdbd2cSJim Jagielski 	 * @see                    com.sun.star.lib.uno.environments.remote.ThreadID
123*b1cdbd2cSJim Jagielski 	 */
JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId)124*b1cdbd2cSJim Jagielski 	JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId) {
125*b1cdbd2cSJim Jagielski 		_javaThreadPoolFactory = javaThreadPoolFactory;
126*b1cdbd2cSJim Jagielski         _threadId = ThreadId.createFresh();
127*b1cdbd2cSJim Jagielski 
128*b1cdbd2cSJim Jagielski 		_sync_jobQueue    = javaThreadPoolFactory.getJobQueue(threadId);
129*b1cdbd2cSJim Jagielski 		if(_sync_jobQueue == null) {
130*b1cdbd2cSJim Jagielski 			_sync_jobQueue = new JobQueue(javaThreadPoolFactory, threadId, true);
131*b1cdbd2cSJim Jagielski 			_sync_jobQueue.acquire();
132*b1cdbd2cSJim Jagielski 		}
133*b1cdbd2cSJim Jagielski 
134*b1cdbd2cSJim Jagielski 		_sync_jobQueue._async_jobQueue = this;
135*b1cdbd2cSJim Jagielski 
136*b1cdbd2cSJim Jagielski 		_createThread     = true;
137*b1cdbd2cSJim Jagielski 		_createThread_now = true;
138*b1cdbd2cSJim Jagielski 
139*b1cdbd2cSJim Jagielski 		acquire();
140*b1cdbd2cSJim Jagielski 
141*b1cdbd2cSJim Jagielski 		if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" +  _threadId);
142*b1cdbd2cSJim Jagielski 	}
143*b1cdbd2cSJim Jagielski 
144*b1cdbd2cSJim Jagielski 	/**
145*b1cdbd2cSJim Jagielski 	 * Constructs a sync job queue with the given thread id and the given thread.
146*b1cdbd2cSJim Jagielski 	 * <p>
147*b1cdbd2cSJim Jagielski 	 * @param threadId        the thread id
148*b1cdbd2cSJim Jagielski 	 * @param createThread    if true, the queue creates a worker thread if needed
149*b1cdbd2cSJim Jagielski 	 * @see             com.sun.star.lib.uno.environments.remote.ThreadID
150*b1cdbd2cSJim Jagielski 	 */
JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread)151*b1cdbd2cSJim Jagielski 	JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread){
152*b1cdbd2cSJim Jagielski 		_javaThreadPoolFactory   = javaThreadPoolFactory;
153*b1cdbd2cSJim Jagielski 		_threadId         = threadId;
154*b1cdbd2cSJim Jagielski 		_createThread     = createThread;
155*b1cdbd2cSJim Jagielski 		_createThread_now = createThread;
156*b1cdbd2cSJim Jagielski 
157*b1cdbd2cSJim Jagielski 		if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" +  _threadId + " " + createThread);
158*b1cdbd2cSJim Jagielski 	}
159*b1cdbd2cSJim Jagielski 
160*b1cdbd2cSJim Jagielski 	/**
161*b1cdbd2cSJim Jagielski 	 * Gives the thread id of this queue
162*b1cdbd2cSJim Jagielski 	 * <p>
163*b1cdbd2cSJim Jagielski 	 * @return  the thread id
164*b1cdbd2cSJim Jagielski 	 * @see     com.sun.star.lib.uno.environments.remote.ThreadID
165*b1cdbd2cSJim Jagielski 	 */
getThreadId()166*b1cdbd2cSJim Jagielski 	ThreadId getThreadId() {
167*b1cdbd2cSJim Jagielski 		return _threadId;
168*b1cdbd2cSJim Jagielski 	}
169*b1cdbd2cSJim Jagielski 
acquire()170*b1cdbd2cSJim Jagielski 	synchronized void acquire() {
171*b1cdbd2cSJim Jagielski         // add only synchronous queues .
172*b1cdbd2cSJim Jagielski 		if(_ref_count <= 0 && _sync_jobQueue == null )
173*b1cdbd2cSJim Jagielski 			_javaThreadPoolFactory.addJobQueue(this);
174*b1cdbd2cSJim Jagielski 
175*b1cdbd2cSJim Jagielski 		++ _ref_count;
176*b1cdbd2cSJim Jagielski 	}
177*b1cdbd2cSJim Jagielski 
release()178*b1cdbd2cSJim Jagielski 	synchronized void release() {
179*b1cdbd2cSJim Jagielski 		-- _ref_count;
180*b1cdbd2cSJim Jagielski 
181*b1cdbd2cSJim Jagielski 		if(_ref_count <= 0) {
182*b1cdbd2cSJim Jagielski             // only synchronous queues needs to be removed .
183*b1cdbd2cSJim Jagielski             if( _sync_jobQueue == null )
184*b1cdbd2cSJim Jagielski                 _javaThreadPoolFactory.removeJobQueue(this);
185*b1cdbd2cSJim Jagielski 
186*b1cdbd2cSJim Jagielski 
187*b1cdbd2cSJim Jagielski 			if(_sync_jobQueue != null) {
188*b1cdbd2cSJim Jagielski 				_sync_jobQueue._async_jobQueue = null;
189*b1cdbd2cSJim Jagielski 				_sync_jobQueue.release();
190*b1cdbd2cSJim Jagielski 			}
191*b1cdbd2cSJim Jagielski 		}
192*b1cdbd2cSJim Jagielski 	}
193*b1cdbd2cSJim Jagielski 
194*b1cdbd2cSJim Jagielski 	/**
195*b1cdbd2cSJim Jagielski 	 * Removes a job from the queue.
196*b1cdbd2cSJim Jagielski 	 * <p>
197*b1cdbd2cSJim Jagielski 	 * @return a job or null if timed out
198*b1cdbd2cSJim Jagielski 	 * @param  waitTime        the maximum amount of time to wait for a job
199*b1cdbd2cSJim Jagielski 	 */
removeJob(int waitTime)200*b1cdbd2cSJim Jagielski 	private Job removeJob(int waitTime) {
201*b1cdbd2cSJim Jagielski 		if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeJob:" + _head + " " + _threadId);
202*b1cdbd2cSJim Jagielski 
203*b1cdbd2cSJim Jagielski 		Job job = null;
204*b1cdbd2cSJim Jagielski 		synchronized (this) {
205*b1cdbd2cSJim Jagielski 			// wait max. waitTime time for a job to enter the queue
206*b1cdbd2cSJim Jagielski 			boolean waited = false;
207*b1cdbd2cSJim Jagielski 			while(_head == null && (waitTime == 0 || !waited)) {
208*b1cdbd2cSJim Jagielski 				if(_doDispose == _disposeId) {
209*b1cdbd2cSJim Jagielski 					_doDispose = null;
210*b1cdbd2cSJim Jagielski 					throw (DisposedException)
211*b1cdbd2cSJim Jagielski                         new DisposedException().initCause(_throwable);
212*b1cdbd2cSJim Jagielski 				}
213*b1cdbd2cSJim Jagielski 
214*b1cdbd2cSJim Jagielski 				// notify sync queues
215*b1cdbd2cSJim Jagielski 				notifyAll();
216*b1cdbd2cSJim Jagielski 
217*b1cdbd2cSJim Jagielski 				try {
218*b1cdbd2cSJim Jagielski 					// wait for new job
219*b1cdbd2cSJim Jagielski 					wait(waitTime);
220*b1cdbd2cSJim Jagielski 				}
221*b1cdbd2cSJim Jagielski 				catch(InterruptedException interruptedException) {
222*b1cdbd2cSJim Jagielski   					throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
223*b1cdbd2cSJim Jagielski 				}
224*b1cdbd2cSJim Jagielski 
225*b1cdbd2cSJim Jagielski 				// signal that we have already waited once
226*b1cdbd2cSJim Jagielski 				waited = true;
227*b1cdbd2cSJim Jagielski 			}
228*b1cdbd2cSJim Jagielski 
229*b1cdbd2cSJim Jagielski 
230*b1cdbd2cSJim Jagielski 			if(_head != null) {
231*b1cdbd2cSJim Jagielski 				Job current = _head;
232*b1cdbd2cSJim Jagielski 				_head    = _head._next;
233*b1cdbd2cSJim Jagielski 
234*b1cdbd2cSJim Jagielski 				if(_head == null)
235*b1cdbd2cSJim Jagielski 					_tail = null;
236*b1cdbd2cSJim Jagielski 
237*b1cdbd2cSJim Jagielski 				job = current;
238*b1cdbd2cSJim Jagielski 				_active = true;
239*b1cdbd2cSJim Jagielski 			}
240*b1cdbd2cSJim Jagielski 		}
241*b1cdbd2cSJim Jagielski 
242*b1cdbd2cSJim Jagielski 		// always wait for asynchron jobqueue to be finished !
243*b1cdbd2cSJim Jagielski 		if(job != null && _async_jobQueue != null) {
244*b1cdbd2cSJim Jagielski 			synchronized(_async_jobQueue) {
245*b1cdbd2cSJim Jagielski 				// wait for async queue to be empty and last job to be done
246*b1cdbd2cSJim Jagielski 				while(_async_jobQueue._active || _async_jobQueue._head != null) {
247*b1cdbd2cSJim Jagielski 					if(DEBUG) System.err.println("waiting for async:" + _async_jobQueue._head + " " +  _async_jobQueue._worker_thread);
248*b1cdbd2cSJim Jagielski 
249*b1cdbd2cSJim Jagielski 					if(_doDispose == _disposeId) {
250*b1cdbd2cSJim Jagielski 						_doDispose = null;
251*b1cdbd2cSJim Jagielski 						throw (DisposedException)
252*b1cdbd2cSJim Jagielski                             new DisposedException().initCause(_throwable);
253*b1cdbd2cSJim Jagielski 					}
254*b1cdbd2cSJim Jagielski 
255*b1cdbd2cSJim Jagielski 					try {
256*b1cdbd2cSJim Jagielski 						_async_jobQueue.wait();
257*b1cdbd2cSJim Jagielski 					}
258*b1cdbd2cSJim Jagielski 					catch(InterruptedException interruptedException) {
259*b1cdbd2cSJim Jagielski 						throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
260*b1cdbd2cSJim Jagielski 					}
261*b1cdbd2cSJim Jagielski 				}
262*b1cdbd2cSJim Jagielski 			}
263*b1cdbd2cSJim Jagielski 		}
264*b1cdbd2cSJim Jagielski 
265*b1cdbd2cSJim Jagielski 		return job;
266*b1cdbd2cSJim Jagielski 	}
267*b1cdbd2cSJim Jagielski 
268*b1cdbd2cSJim Jagielski 	/**
269*b1cdbd2cSJim Jagielski 	 * Puts a job into the queue.
270*b1cdbd2cSJim Jagielski 	 * <p>
271*b1cdbd2cSJim Jagielski 	 * @param  job        the job
272*b1cdbd2cSJim Jagielski 	 * @param  disposeId  a dispose id
273*b1cdbd2cSJim Jagielski 	 */
putJob(Job job, Object disposeId)274*b1cdbd2cSJim Jagielski 	synchronized void putJob(Job job, Object disposeId) {
275*b1cdbd2cSJim Jagielski 		if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job);
276*b1cdbd2cSJim Jagielski 
277*b1cdbd2cSJim Jagielski 		if(_tail != null)
278*b1cdbd2cSJim Jagielski 			_tail._next = job;
279*b1cdbd2cSJim Jagielski 		else
280*b1cdbd2cSJim Jagielski 			_head = job;
281*b1cdbd2cSJim Jagielski 
282*b1cdbd2cSJim Jagielski 		_tail = job;
283*b1cdbd2cSJim Jagielski 
284*b1cdbd2cSJim Jagielski 		if(_worker_thread == null && _createThread && _createThread_now) { // if there is no thread, which dispatches and if shall create one, create one
285*b1cdbd2cSJim Jagielski 
286*b1cdbd2cSJim Jagielski 			acquire();
287*b1cdbd2cSJim Jagielski 
288*b1cdbd2cSJim Jagielski 			_createThread_now = false;
289*b1cdbd2cSJim Jagielski 			new JobDispatcher(disposeId).start();
290*b1cdbd2cSJim Jagielski 		}
291*b1cdbd2cSJim Jagielski 
292*b1cdbd2cSJim Jagielski 		// always notify possible waiters
293*b1cdbd2cSJim Jagielski 		notifyAll();
294*b1cdbd2cSJim Jagielski 	}
295*b1cdbd2cSJim Jagielski 
296*b1cdbd2cSJim Jagielski 	/**
297*b1cdbd2cSJim Jagielski 	 * Enters the job queue.
298*b1cdbd2cSJim Jagielski 	 * <p>
299*b1cdbd2cSJim Jagielski 	 * @return the result of the final job (reply)
300*b1cdbd2cSJim Jagielski 	 * @param  disposeId  a dispose id
301*b1cdbd2cSJim Jagielski 	 */
enter(Object disposeId)302*b1cdbd2cSJim Jagielski 	Object enter(Object disposeId) throws Throwable {
303*b1cdbd2cSJim Jagielski 		return enter(0, disposeId); // wait infinitly
304*b1cdbd2cSJim Jagielski 	}
305*b1cdbd2cSJim Jagielski 
306*b1cdbd2cSJim Jagielski 	/**
307*b1cdbd2cSJim Jagielski 	 * Enters the job queue.
308*b1cdbd2cSJim Jagielski 	 * <p>
309*b1cdbd2cSJim Jagielski 	 * @return the result of the final job (reply)
310*b1cdbd2cSJim Jagielski 	 * @param  waitTime   the maximum amount of time to wait for a job (0 means wait infinitly)
311*b1cdbd2cSJim Jagielski 	 * @param  disposeId  a dispose id
312*b1cdbd2cSJim Jagielski 	 */
enter(int waitTime, Object disposeId)313*b1cdbd2cSJim Jagielski 	Object enter(int waitTime, Object disposeId) throws Throwable {
314*b1cdbd2cSJim Jagielski 		if(DEBUG) System.err.println("#####" + getClass().getName() + ".enter: " + _threadId);
315*b1cdbd2cSJim Jagielski 
316*b1cdbd2cSJim Jagielski 		boolean quit = false;
317*b1cdbd2cSJim Jagielski 
318*b1cdbd2cSJim Jagielski 		Object hold_disposeId = _disposeId;
319*b1cdbd2cSJim Jagielski 		_disposeId = disposeId;
320*b1cdbd2cSJim Jagielski 
321*b1cdbd2cSJim Jagielski 		Object result = null;
322*b1cdbd2cSJim Jagielski 
323*b1cdbd2cSJim Jagielski 		Thread hold_worker_thread = _worker_thread;
324*b1cdbd2cSJim Jagielski 		_worker_thread = Thread.currentThread();
325*b1cdbd2cSJim Jagielski 
326*b1cdbd2cSJim Jagielski 		while(!quit) {
327*b1cdbd2cSJim Jagielski 			Job job = null;
328*b1cdbd2cSJim Jagielski 
329*b1cdbd2cSJim Jagielski 			try {
330*b1cdbd2cSJim Jagielski 				job = removeJob(waitTime);
331*b1cdbd2cSJim Jagielski 
332*b1cdbd2cSJim Jagielski 				if(job != null) {
333*b1cdbd2cSJim Jagielski 					try {
334*b1cdbd2cSJim Jagielski 						result = job.execute();
335*b1cdbd2cSJim Jagielski 					}
336*b1cdbd2cSJim Jagielski 					finally {
337*b1cdbd2cSJim Jagielski 						_active = false;
338*b1cdbd2cSJim Jagielski 					}
339*b1cdbd2cSJim Jagielski 
340*b1cdbd2cSJim Jagielski 					if (!job.isRequest()) {
341*b1cdbd2cSJim Jagielski 						job.dispose();
342*b1cdbd2cSJim Jagielski 
343*b1cdbd2cSJim Jagielski 						quit = true;
344*b1cdbd2cSJim Jagielski 					}
345*b1cdbd2cSJim Jagielski 
346*b1cdbd2cSJim Jagielski 					job = null;
347*b1cdbd2cSJim Jagielski 				}
348*b1cdbd2cSJim Jagielski 				else
349*b1cdbd2cSJim Jagielski 					quit = true;
350*b1cdbd2cSJim Jagielski 
351*b1cdbd2cSJim Jagielski 
352*b1cdbd2cSJim Jagielski 			}
353*b1cdbd2cSJim Jagielski 			finally { // ensure that this queue becomes disposed, if necessary
354*b1cdbd2cSJim Jagielski 				if(DEBUG) System.err.println("##### " + getClass().getName() + ".enter leaving: " + _threadId + " " + _worker_thread + " " + hold_worker_thread + " " + result);
355*b1cdbd2cSJim Jagielski 
356*b1cdbd2cSJim Jagielski 				synchronized(this) {
357*b1cdbd2cSJim Jagielski 					if(job != null || (quit && _head == null)) {
358*b1cdbd2cSJim Jagielski 						_worker_thread = hold_worker_thread;
359*b1cdbd2cSJim Jagielski 
360*b1cdbd2cSJim Jagielski 						_createThread_now = true;
361*b1cdbd2cSJim Jagielski 
362*b1cdbd2cSJim Jagielski 						_disposeId = hold_disposeId;
363*b1cdbd2cSJim Jagielski 
364*b1cdbd2cSJim Jagielski 						if(_sync_jobQueue != null)
365*b1cdbd2cSJim Jagielski 							notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting)
366*b1cdbd2cSJim Jagielski 					}
367*b1cdbd2cSJim Jagielski 					else
368*b1cdbd2cSJim Jagielski 						quit = false;
369*b1cdbd2cSJim Jagielski 
370*b1cdbd2cSJim Jagielski 				}
371*b1cdbd2cSJim Jagielski 			}
372*b1cdbd2cSJim Jagielski 		}
373*b1cdbd2cSJim Jagielski 
374*b1cdbd2cSJim Jagielski 		return result;
375*b1cdbd2cSJim Jagielski 	}
376*b1cdbd2cSJim Jagielski 
377*b1cdbd2cSJim Jagielski 	/**
378*b1cdbd2cSJim Jagielski 	 * If the given disposeId is registered,
379*b1cdbd2cSJim Jagielski 	 * interrups the worker thread.
380*b1cdbd2cSJim Jagielski 	 * <p>
381*b1cdbd2cSJim Jagielski 	 * @param disposeId    the dispose id
382*b1cdbd2cSJim Jagielski 	 */
dispose(Object disposeId, Throwable throwable)383*b1cdbd2cSJim Jagielski 	synchronized void dispose(Object disposeId, Throwable throwable) {
384*b1cdbd2cSJim Jagielski 		if(_sync_jobQueue == null) { // dispose only sync queues
385*b1cdbd2cSJim Jagielski 			_doDispose = disposeId;
386*b1cdbd2cSJim Jagielski 			_throwable = throwable;
387*b1cdbd2cSJim Jagielski 
388*b1cdbd2cSJim Jagielski 			// get thread out of wait and let it throw the throwable
389*b1cdbd2cSJim Jagielski 			if(DEBUG) System.err.println(getClass().getName() + ".dispose - notifying thread");
390*b1cdbd2cSJim Jagielski 
391*b1cdbd2cSJim Jagielski 			notifyAll();
392*b1cdbd2cSJim Jagielski 		}
393*b1cdbd2cSJim Jagielski 	}
394*b1cdbd2cSJim Jagielski }
395*b1cdbd2cSJim Jagielski 
396