1 /**************************************************************
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 *
20 *************************************************************/
21
22
23
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_cppu.hxx"
26 #include <hash_set>
27 #include <stdio.h>
28
29 #include <osl/diagnose.h>
30 #include <osl/mutex.hxx>
31 #include <osl/thread.h>
32 #include <rtl/instance.hxx>
33
34 #include <uno/threadpool.h>
35
36 #include "threadpool.hxx"
37 #include "thread.hxx"
38
39 using namespace ::std;
40 using namespace ::osl;
41
42 namespace cppu_threadpool
43 {
44 struct theDisposedCallerAdmin :
45 public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
46 {
operator ()cppu_threadpool::theDisposedCallerAdmin47 DisposedCallerAdminHolder operator () () {
48 return DisposedCallerAdminHolder(new DisposedCallerAdmin());
49 }
50 };
51
getInstance()52 DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
53 {
54 return theDisposedCallerAdmin::get();
55 }
56
~DisposedCallerAdmin()57 DisposedCallerAdmin::~DisposedCallerAdmin()
58 {
59 #if OSL_DEBUG_LEVEL > 1
60 if( !m_lst.empty() )
61 {
62 printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( )));
63 }
64 #endif
65 }
66
dispose(sal_Int64 nDisposeId)67 void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
68 {
69 MutexGuard guard( m_mutex );
70 m_lst.push_back( nDisposeId );
71 }
72
stopDisposing(sal_Int64 nDisposeId)73 void DisposedCallerAdmin::stopDisposing( sal_Int64 nDisposeId )
74 {
75 MutexGuard guard( m_mutex );
76 for( DisposedCallerList::iterator ii = m_lst.begin() ;
77 ii != m_lst.end() ;
78 ++ ii )
79 {
80 if( (*ii) == nDisposeId )
81 {
82 m_lst.erase( ii );
83 break;
84 }
85 }
86 }
87
isDisposed(sal_Int64 nDisposeId)88 sal_Bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
89 {
90 MutexGuard guard( m_mutex );
91 for( DisposedCallerList::iterator ii = m_lst.begin() ;
92 ii != m_lst.end() ;
93 ++ ii )
94 {
95 if( (*ii) == nDisposeId )
96 {
97 return sal_True;
98 }
99 }
100 return sal_False;
101 }
102
103
104 //-------------------------------------------------------------------------------
105
106 struct theThreadPool :
107 public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool >
108 {
operator ()cppu_threadpool::theThreadPool109 ThreadPoolHolder operator () () {
110 ThreadPoolHolder aRet(new ThreadPool());
111 return aRet;
112 }
113 };
114
ThreadPool()115 ThreadPool::ThreadPool()
116 {
117 m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
118 }
119
~ThreadPool()120 ThreadPool::~ThreadPool()
121 {
122 #if OSL_DEBUG_LEVEL > 1
123 if( m_mapQueue.size() )
124 {
125 printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) );
126 }
127 #endif
128 }
getInstance()129 ThreadPoolHolder ThreadPool::getInstance()
130 {
131 return theThreadPool::get();
132 }
133
134
dispose(sal_Int64 nDisposeId)135 void ThreadPool::dispose( sal_Int64 nDisposeId )
136 {
137 if( nDisposeId )
138 {
139 m_DisposedCallerAdmin->dispose( nDisposeId );
140
141 MutexGuard guard( m_mutex );
142 for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
143 ii != m_mapQueue.end();
144 ++ii)
145 {
146 if( (*ii).second.first )
147 {
148 (*ii).second.first->dispose( nDisposeId );
149 }
150 if( (*ii).second.second )
151 {
152 (*ii).second.second->dispose( nDisposeId );
153 }
154 }
155 }
156 else
157 {
158 {
159 MutexGuard guard( m_mutexWaitingThreadList );
160 for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
161 ii != m_lstThreads.end() ;
162 ++ ii )
163 {
164 // wake the threads up
165 osl_setCondition( (*ii)->condition );
166 }
167 }
168 ThreadAdmin::getInstance()->join();
169 }
170 }
171
stopDisposing(sal_Int64 nDisposeId)172 void ThreadPool::stopDisposing( sal_Int64 nDisposeId )
173 {
174 m_DisposedCallerAdmin->stopDisposing( nDisposeId );
175 }
176
177 /******************
178 * This methods lets the thread wait a certain amount of time. If within this timespan
179 * a new request comes in, this thread is reused. This is done only to improve performance,
180 * it is not required for threadpool functionality.
181 ******************/
waitInPool(ORequestThread * pThread)182 void ThreadPool::waitInPool( ORequestThread * pThread )
183 {
184 struct WaitingThread waitingThread;
185 waitingThread.condition = osl_createCondition();
186 waitingThread.thread = pThread;
187 {
188 MutexGuard guard( m_mutexWaitingThreadList );
189 m_lstThreads.push_front( &waitingThread );
190 }
191
192 // let the thread wait 2 seconds
193 TimeValue time = { 2 , 0 };
194 osl_waitCondition( waitingThread.condition , &time );
195
196 {
197 MutexGuard guard ( m_mutexWaitingThreadList );
198 if( waitingThread.thread )
199 {
200 // thread wasn't reused, remove it from the list
201 WaitingThreadList::iterator ii = find(
202 m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
203 OSL_ASSERT( ii != m_lstThreads.end() );
204 m_lstThreads.erase( ii );
205 }
206 }
207
208 osl_destroyCondition( waitingThread.condition );
209 }
210
createThread(JobQueue * pQueue,const ByteSequence & aThreadId,sal_Bool bAsynchron)211 void ThreadPool::createThread( JobQueue *pQueue ,
212 const ByteSequence &aThreadId,
213 sal_Bool bAsynchron )
214 {
215 sal_Bool bCreate = sal_True;
216 {
217 // Can a thread be reused ?
218 MutexGuard guard( m_mutexWaitingThreadList );
219 if( ! m_lstThreads.empty() )
220 {
221 // inform the thread and let it go
222 struct WaitingThread *pWaitingThread = m_lstThreads.back();
223 pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
224 pWaitingThread->thread = 0;
225
226 // remove from list
227 m_lstThreads.pop_back();
228
229 // let the thread go
230 osl_setCondition( pWaitingThread->condition );
231 bCreate = sal_False;
232 }
233 }
234
235 if( bCreate )
236 {
237 ORequestThread *pThread =
238 new ORequestThread( pQueue , aThreadId, bAsynchron);
239 // deletes itself !
240 pThread->create();
241 }
242 }
243
revokeQueue(const ByteSequence & aThreadId,sal_Bool bAsynchron)244 sal_Bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, sal_Bool bAsynchron )
245 {
246 MutexGuard guard( m_mutex );
247
248 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
249 OSL_ASSERT( ii != m_mapQueue.end() );
250
251 if( bAsynchron )
252 {
253 if( ! (*ii).second.second->isEmpty() )
254 {
255 // another thread has put something into the queue
256 return sal_False;
257 }
258
259 (*ii).second.second = 0;
260 if( (*ii).second.first )
261 {
262 // all oneway request have been processed, now
263 // synchronus requests may go on
264 (*ii).second.first->resume();
265 }
266 }
267 else
268 {
269 if( ! (*ii).second.first->isEmpty() )
270 {
271 // another thread has put something into the queue
272 return sal_False;
273 }
274 (*ii).second.first = 0;
275 }
276
277 if( 0 == (*ii).second.first && 0 == (*ii).second.second )
278 {
279 m_mapQueue.erase( ii );
280 }
281
282 return sal_True;
283 }
284
285
addJob(const ByteSequence & aThreadId,sal_Bool bAsynchron,void * pThreadSpecificData,RequestFun * doRequest)286 void ThreadPool::addJob(
287 const ByteSequence &aThreadId ,
288 sal_Bool bAsynchron,
289 void *pThreadSpecificData,
290 RequestFun * doRequest )
291 {
292 sal_Bool bCreateThread = sal_False;
293 JobQueue *pQueue = 0;
294 {
295 MutexGuard guard( m_mutex );
296
297 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
298
299 if( ii == m_mapQueue.end() )
300 {
301 m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( 0 , 0 );
302 ii = m_mapQueue.find( aThreadId );
303 OSL_ASSERT( ii != m_mapQueue.end() );
304 }
305
306 if( bAsynchron )
307 {
308 if( ! (*ii).second.second )
309 {
310 (*ii).second.second = new JobQueue();
311 bCreateThread = sal_True;
312 }
313 pQueue = (*ii).second.second;
314 }
315 else
316 {
317 if( ! (*ii).second.first )
318 {
319 (*ii).second.first = new JobQueue();
320 bCreateThread = sal_True;
321 }
322 pQueue = (*ii).second.first;
323
324 if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
325 {
326 pQueue->suspend();
327 }
328 }
329 pQueue->add( pThreadSpecificData , doRequest );
330 }
331
332 if( bCreateThread )
333 {
334 createThread( pQueue , aThreadId , bAsynchron);
335 }
336 }
337
prepare(const ByteSequence & aThreadId)338 void ThreadPool::prepare( const ByteSequence &aThreadId )
339 {
340 MutexGuard guard( m_mutex );
341
342 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
343
344 if( ii == m_mapQueue.end() )
345 {
346 JobQueue *p = new JobQueue();
347 m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , 0 );
348 }
349 else if( 0 == (*ii).second.first )
350 {
351 (*ii).second.first = new JobQueue();
352 }
353 }
354
enter(const ByteSequence & aThreadId,sal_Int64 nDisposeId)355 void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
356 {
357 JobQueue *pQueue = 0;
358 {
359 MutexGuard guard( m_mutex );
360
361 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
362
363 OSL_ASSERT( ii != m_mapQueue.end() );
364 pQueue = (*ii).second.first;
365 }
366
367 OSL_ASSERT( pQueue );
368 void *pReturn = pQueue->enter( nDisposeId );
369
370 if( pQueue->isCallstackEmpty() )
371 {
372 if( revokeQueue( aThreadId , sal_False) )
373 {
374 // remove queue
375 delete pQueue;
376 }
377 }
378 return pReturn;
379 }
380 }
381
382
383 using namespace cppu_threadpool;
384
385 struct uno_ThreadPool_Equal
386 {
operator ()uno_ThreadPool_Equal387 sal_Bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
388 {
389 return a == b;
390 }
391 };
392
393 struct uno_ThreadPool_Hash
394 {
operator ()uno_ThreadPool_Hash395 sal_Size operator () ( const uno_ThreadPool &a ) const
396 {
397 return (sal_Size) a;
398 }
399 };
400
401
402
403 typedef ::std::hash_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
404
405 static ThreadpoolHashSet *g_pThreadpoolHashSet;
406
407 struct _uno_ThreadPool
408 {
409 sal_Int32 dummy;
410 };
411
412 extern "C" uno_ThreadPool SAL_CALL
uno_threadpool_create()413 uno_threadpool_create() SAL_THROW_EXTERN_C()
414 {
415 MutexGuard guard( Mutex::getGlobalMutex() );
416 if( ! g_pThreadpoolHashSet )
417 {
418 g_pThreadpoolHashSet = new ThreadpoolHashSet();
419 }
420
421 // Just ensure that the handle is unique in the process (via heap)
422 uno_ThreadPool h = new struct _uno_ThreadPool;
423 g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) );
424 return h;
425 }
426
427 extern "C" void SAL_CALL
uno_threadpool_attach(uno_ThreadPool)428 uno_threadpool_attach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
429 {
430 sal_Sequence *pThreadId = 0;
431 uno_getIdOfCurrentThread( &pThreadId );
432 ThreadPool::getInstance()->prepare( pThreadId );
433 rtl_byte_sequence_release( pThreadId );
434 uno_releaseIdFromCurrentThread();
435 }
436
437 extern "C" void SAL_CALL
uno_threadpool_enter(uno_ThreadPool hPool,void ** ppJob)438 uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
439 SAL_THROW_EXTERN_C()
440 {
441 sal_Sequence *pThreadId = 0;
442 uno_getIdOfCurrentThread( &pThreadId );
443 *ppJob =
444 ThreadPool::getInstance()->enter(
445 pThreadId,
446 sal::static_int_cast< sal_Int64 >(
447 reinterpret_cast< sal_IntPtr >(hPool)) );
448 rtl_byte_sequence_release( pThreadId );
449 uno_releaseIdFromCurrentThread();
450 }
451
452 extern "C" void SAL_CALL
uno_threadpool_detach(uno_ThreadPool)453 uno_threadpool_detach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
454 {
455 // we might do here some tiding up in case a thread called attach but never detach
456 }
457
458 extern "C" void SAL_CALL
uno_threadpool_putJob(uno_ThreadPool,sal_Sequence * pThreadId,void * pJob,void (SAL_CALL * doRequest)(void * pThreadSpecificData),sal_Bool bIsOneway)459 uno_threadpool_putJob(
460 uno_ThreadPool,
461 sal_Sequence *pThreadId,
462 void *pJob,
463 void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
464 sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
465 {
466 ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest );
467 }
468
469 extern "C" void SAL_CALL
uno_threadpool_dispose(uno_ThreadPool hPool)470 uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
471 {
472 ThreadPool::getInstance()->dispose(
473 sal::static_int_cast< sal_Int64 >(
474 reinterpret_cast< sal_IntPtr >(hPool)) );
475 }
476
477 extern "C" void SAL_CALL
uno_threadpool_destroy(uno_ThreadPool hPool)478 uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
479 {
480 ThreadPool::getInstance()->stopDisposing(
481 sal::static_int_cast< sal_Int64 >(
482 reinterpret_cast< sal_IntPtr >(hPool)) );
483
484 if( hPool )
485 {
486 // special treatment for 0 !
487 OSL_ASSERT( g_pThreadpoolHashSet );
488
489 MutexGuard guard( Mutex::getGlobalMutex() );
490
491 ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
492 OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
493 g_pThreadpoolHashSet->erase( ii );
494 delete hPool;
495
496 if( g_pThreadpoolHashSet->empty() )
497 {
498 delete g_pThreadpoolHashSet;
499 g_pThreadpoolHashSet = 0;
500 }
501 }
502 }
503