1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_cppu.hxx"
26 #include <hash_set>
27 #include <stdio.h>
28 
29 #include <osl/diagnose.h>
30 #include <osl/mutex.hxx>
31 #include <osl/thread.h>
32 #include <rtl/instance.hxx>
33 
34 #include <uno/threadpool.h>
35 
36 #include "threadpool.hxx"
37 #include "thread.hxx"
38 
39 using namespace ::std;
40 using namespace ::osl;
41 
42 namespace cppu_threadpool
43 {
44 	struct theDisposedCallerAdmin :
45 		public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
46 	{
operator ()cppu_threadpool::theDisposedCallerAdmin47 		DisposedCallerAdminHolder operator () () {
48 			return DisposedCallerAdminHolder(new DisposedCallerAdmin());
49 		}
50 	};
51 
getInstance()52 	DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
53 	{
54 		return theDisposedCallerAdmin::get();
55 	}
56 
~DisposedCallerAdmin()57 	DisposedCallerAdmin::~DisposedCallerAdmin()
58 	{
59 #if OSL_DEBUG_LEVEL > 1
60 		if( !m_lst.empty() )
61 		{
62 			printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( )));
63 		}
64 #endif
65 	}
66 
dispose(sal_Int64 nDisposeId)67 	void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
68 	{
69 		MutexGuard guard( m_mutex );
70 		m_lst.push_back( nDisposeId );
71 	}
72 
stopDisposing(sal_Int64 nDisposeId)73 	void DisposedCallerAdmin::stopDisposing( sal_Int64 nDisposeId )
74 	{
75 		MutexGuard guard( m_mutex );
76 		for( DisposedCallerList::iterator ii = m_lst.begin() ;
77 			 ii != m_lst.end() ;
78 			 ++ ii )
79 		{
80 			if( (*ii) == nDisposeId )
81 			{
82 				m_lst.erase( ii );
83 				break;
84 			}
85 		}
86 	}
87 
isDisposed(sal_Int64 nDisposeId)88 	sal_Bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
89 	{
90 		MutexGuard guard( m_mutex );
91 		for( DisposedCallerList::iterator ii = m_lst.begin() ;
92 			 ii != m_lst.end() ;
93 			 ++ ii )
94 		{
95 			if( (*ii) == nDisposeId )
96 			{
97 				return sal_True;
98 			}
99 		}
100 		return sal_False;
101 	}
102 
103 
104 	//-------------------------------------------------------------------------------
105 
106 	struct theThreadPool :
107 		public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool >
108 	{
operator ()cppu_threadpool::theThreadPool109 		ThreadPoolHolder operator () () {
110 			ThreadPoolHolder aRet(new ThreadPool());
111 			return aRet;
112 		}
113 	};
114 
ThreadPool()115 	ThreadPool::ThreadPool()
116 	{
117         	m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
118 	}
119 
~ThreadPool()120 	ThreadPool::~ThreadPool()
121 	{
122 #if OSL_DEBUG_LEVEL > 1
123 		if( m_mapQueue.size() )
124 		{
125 			printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) );
126 		}
127 #endif
128 	}
getInstance()129 	ThreadPoolHolder ThreadPool::getInstance()
130 	{
131 		return theThreadPool::get();
132 	}
133 
134 
dispose(sal_Int64 nDisposeId)135 	void ThreadPool::dispose( sal_Int64 nDisposeId )
136 	{
137 		if( nDisposeId )
138 		{
139 			m_DisposedCallerAdmin->dispose( nDisposeId );
140 
141 			MutexGuard guard( m_mutex );
142 			for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
143 				 ii != m_mapQueue.end();
144 				 ++ii)
145 			{
146 				if( (*ii).second.first )
147 				{
148 					(*ii).second.first->dispose( nDisposeId );
149 				}
150 				if( (*ii).second.second )
151 				{
152 					(*ii).second.second->dispose( nDisposeId );
153 				}
154 			}
155 		}
156 		else
157 		{
158 			{
159 				MutexGuard guard( m_mutexWaitingThreadList );
160 				for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
161 					 ii != m_lstThreads.end() ;
162 					 ++ ii )
163 				{
164 					// wake the threads up
165 					osl_setCondition( (*ii)->condition );
166 				}
167 			}
168 			ThreadAdmin::getInstance()->join();
169 		}
170 	}
171 
stopDisposing(sal_Int64 nDisposeId)172 	void ThreadPool::stopDisposing( sal_Int64 nDisposeId )
173 	{
174 		m_DisposedCallerAdmin->stopDisposing( nDisposeId );
175 	}
176 
177 	/******************
178 	 * This methods lets the thread wait a certain amount of time. If within this timespan
179 	 * a new request comes in, this thread is reused. This is done only to improve performance,
180 	 * it is not required for threadpool functionality.
181 	 ******************/
waitInPool(ORequestThread * pThread)182 	void ThreadPool::waitInPool( ORequestThread * pThread )
183 	{
184 		struct WaitingThread waitingThread;
185 		waitingThread.condition = osl_createCondition();
186 		waitingThread.thread = pThread;
187 		{
188 			MutexGuard guard( m_mutexWaitingThreadList );
189 			m_lstThreads.push_front( &waitingThread );
190 		}
191 
192 		// let the thread wait 2 seconds
193 		TimeValue time = { 2 , 0 };
194 		osl_waitCondition( waitingThread.condition , &time );
195 
196 		{
197 			MutexGuard guard ( m_mutexWaitingThreadList );
198 			if( waitingThread.thread )
199 			{
200 				// thread wasn't reused, remove it from the list
201 				WaitingThreadList::iterator ii = find(
202 					m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
203 				OSL_ASSERT( ii != m_lstThreads.end() );
204 				m_lstThreads.erase( ii );
205 			}
206 		}
207 
208 		osl_destroyCondition( waitingThread.condition );
209 	}
210 
createThread(JobQueue * pQueue,const ByteSequence & aThreadId,sal_Bool bAsynchron)211 	void ThreadPool::createThread( JobQueue *pQueue ,
212 								   const ByteSequence &aThreadId,
213 								   sal_Bool bAsynchron )
214 	{
215 		sal_Bool bCreate = sal_True;
216 		{
217 			// Can a thread be reused ?
218 			MutexGuard guard( m_mutexWaitingThreadList );
219 			if( ! m_lstThreads.empty() )
220 			{
221 				// inform the thread and let it go
222 				struct WaitingThread *pWaitingThread = m_lstThreads.back();
223 				pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
224 				pWaitingThread->thread = 0;
225 
226 				// remove from list
227 				m_lstThreads.pop_back();
228 
229 				// let the thread go
230 				osl_setCondition( pWaitingThread->condition );
231 				bCreate = sal_False;
232 			}
233 		}
234 
235 		if( bCreate )
236 		{
237 			ORequestThread *pThread =
238 				new ORequestThread( pQueue , aThreadId, bAsynchron);
239 			// deletes itself !
240 			pThread->create();
241 		}
242 	}
243 
revokeQueue(const ByteSequence & aThreadId,sal_Bool bAsynchron)244 	sal_Bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, sal_Bool bAsynchron )
245 	{
246 		MutexGuard guard( m_mutex );
247 
248 		ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
249 		OSL_ASSERT( ii != m_mapQueue.end() );
250 
251 		if( bAsynchron )
252 		{
253 			if( ! (*ii).second.second->isEmpty() )
254 			{
255 				// another thread has put something into the queue
256 				return sal_False;
257 			}
258 
259 			(*ii).second.second = 0;
260 			if( (*ii).second.first )
261 			{
262 				// all oneway request have been processed, now
263 				// synchronus requests may go on
264 				(*ii).second.first->resume();
265 			}
266 		}
267 		else
268 		{
269 			if( ! (*ii).second.first->isEmpty() )
270 			{
271 				// another thread has put something into the queue
272 				return sal_False;
273 			}
274 			(*ii).second.first = 0;
275 		}
276 
277 		if( 0 == (*ii).second.first && 0 == (*ii).second.second )
278 		{
279 			m_mapQueue.erase( ii );
280 		}
281 
282 		return sal_True;
283 	}
284 
285 
addJob(const ByteSequence & aThreadId,sal_Bool bAsynchron,void * pThreadSpecificData,RequestFun * doRequest)286 	void ThreadPool::addJob(
287 		const ByteSequence &aThreadId ,
288 		sal_Bool bAsynchron,
289 		void *pThreadSpecificData,
290 		RequestFun * doRequest )
291 	{
292 		sal_Bool bCreateThread = sal_False;
293 		JobQueue *pQueue = 0;
294 		{
295 			MutexGuard guard( m_mutex );
296 
297 			ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
298 
299 			if( ii == m_mapQueue.end() )
300 			{
301 				m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( 0 , 0 );
302 				ii = m_mapQueue.find( aThreadId );
303 				OSL_ASSERT( ii != m_mapQueue.end() );
304 			}
305 
306 			if( bAsynchron )
307 			{
308 				if( ! (*ii).second.second )
309 				{
310 					(*ii).second.second = new JobQueue();
311 					bCreateThread = sal_True;
312 				}
313 				pQueue = (*ii).second.second;
314 			}
315 			else
316 			{
317 				if( ! (*ii).second.first )
318 				{
319 					(*ii).second.first = new JobQueue();
320 					bCreateThread = sal_True;
321 				}
322 				pQueue = (*ii).second.first;
323 
324 				if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
325 				{
326 					pQueue->suspend();
327 				}
328 			}
329 			pQueue->add( pThreadSpecificData , doRequest );
330 		}
331 
332 		if( bCreateThread )
333 		{
334 			createThread( pQueue , aThreadId , bAsynchron);
335 		}
336 	}
337 
prepare(const ByteSequence & aThreadId)338 	void ThreadPool::prepare( const ByteSequence &aThreadId )
339 	{
340 		MutexGuard guard( m_mutex );
341 
342 		ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
343 
344 		if( ii == m_mapQueue.end() )
345 		{
346 			JobQueue *p = new JobQueue();
347 			m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , 0 );
348 		}
349 		else if( 0 == (*ii).second.first )
350 		{
351 			(*ii).second.first = new JobQueue();
352 		}
353 	}
354 
enter(const ByteSequence & aThreadId,sal_Int64 nDisposeId)355 	void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
356 	{
357 		JobQueue *pQueue = 0;
358 		{
359 			MutexGuard guard( m_mutex );
360 
361 			ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
362 
363 			OSL_ASSERT( ii != m_mapQueue.end() );
364 			pQueue = (*ii).second.first;
365 		}
366 
367 		OSL_ASSERT( pQueue );
368 		void *pReturn = pQueue->enter( nDisposeId );
369 
370 		if( pQueue->isCallstackEmpty() )
371 		{
372 			if( revokeQueue( aThreadId , sal_False) )
373 			{
374 				// remove queue
375 				delete pQueue;
376 			}
377 		}
378 		return pReturn;
379 	}
380 }
381 
382 
383 using namespace cppu_threadpool;
384 
385 struct uno_ThreadPool_Equal
386 {
operator ()uno_ThreadPool_Equal387 	sal_Bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
388 		{
389 			return a == b;
390 		}
391 };
392 
393 struct uno_ThreadPool_Hash
394 {
operator ()uno_ThreadPool_Hash395 	sal_Size operator () ( const uno_ThreadPool &a  )  const
396 		{
397 			return (sal_Size) a;
398 		}
399 };
400 
401 
402 
403 typedef ::std::hash_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
404 
405 static ThreadpoolHashSet *g_pThreadpoolHashSet;
406 
407 struct _uno_ThreadPool
408 {
409 	sal_Int32 dummy;
410 };
411 
412 extern "C" uno_ThreadPool SAL_CALL
uno_threadpool_create()413 uno_threadpool_create() SAL_THROW_EXTERN_C()
414 {
415 	MutexGuard guard( Mutex::getGlobalMutex() );
416 	if( ! g_pThreadpoolHashSet )
417 	{
418 		g_pThreadpoolHashSet = new ThreadpoolHashSet();
419 	}
420 
421 	// Just ensure that the handle is unique in the process (via heap)
422 	uno_ThreadPool h = new struct _uno_ThreadPool;
423 	g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) );
424 	return h;
425 }
426 
427 extern "C" void SAL_CALL
uno_threadpool_attach(uno_ThreadPool)428 uno_threadpool_attach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
429 {
430 	sal_Sequence *pThreadId = 0;
431 	uno_getIdOfCurrentThread( &pThreadId );
432 	ThreadPool::getInstance()->prepare( pThreadId );
433 	rtl_byte_sequence_release( pThreadId );
434 	uno_releaseIdFromCurrentThread();
435 }
436 
437 extern "C" void SAL_CALL
uno_threadpool_enter(uno_ThreadPool hPool,void ** ppJob)438 uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
439 	SAL_THROW_EXTERN_C()
440 {
441 	sal_Sequence *pThreadId = 0;
442 	uno_getIdOfCurrentThread( &pThreadId );
443 	*ppJob =
444 		ThreadPool::getInstance()->enter(
445             pThreadId,
446             sal::static_int_cast< sal_Int64 >(
447                 reinterpret_cast< sal_IntPtr >(hPool)) );
448 	rtl_byte_sequence_release( pThreadId );
449 	uno_releaseIdFromCurrentThread();
450 }
451 
452 extern "C" void SAL_CALL
uno_threadpool_detach(uno_ThreadPool)453 uno_threadpool_detach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
454 {
455 	// we might do here some tiding up in case a thread called attach but never detach
456 }
457 
458 extern "C" void SAL_CALL
uno_threadpool_putJob(uno_ThreadPool,sal_Sequence * pThreadId,void * pJob,void (SAL_CALL * doRequest)(void * pThreadSpecificData),sal_Bool bIsOneway)459 uno_threadpool_putJob(
460 	uno_ThreadPool,
461 	sal_Sequence *pThreadId,
462 	void *pJob,
463 	void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
464 	sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
465 {
466 	ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest );
467 }
468 
469 extern "C" void SAL_CALL
uno_threadpool_dispose(uno_ThreadPool hPool)470 uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
471 {
472 	ThreadPool::getInstance()->dispose(
473         sal::static_int_cast< sal_Int64 >(
474             reinterpret_cast< sal_IntPtr >(hPool)) );
475 }
476 
477 extern "C" void SAL_CALL
uno_threadpool_destroy(uno_ThreadPool hPool)478 uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
479 {
480 	ThreadPool::getInstance()->stopDisposing(
481         sal::static_int_cast< sal_Int64 >(
482             reinterpret_cast< sal_IntPtr >(hPool)) );
483 
484 	if( hPool )
485 	{
486 		// special treatment for 0 !
487 		OSL_ASSERT( g_pThreadpoolHashSet );
488 
489 		MutexGuard guard( Mutex::getGlobalMutex() );
490 
491 		ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
492 		OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
493 		g_pThreadpoolHashSet->erase( ii );
494 		delete hPool;
495 
496 		if( g_pThreadpoolHashSet->empty() )
497 		{
498 			delete g_pThreadpoolHashSet;
499 			g_pThreadpoolHashSet = 0;
500 		}
501 	}
502 }
503