xref: /trunk/main/cppu/source/threadpool/thread.cxx (revision 129fa3d1)
1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_cppu.hxx"
26 #include <stdio.h>
27 #include <osl/diagnose.h>
28 #include <uno/threadpool.h>
29 
30 #include <rtl/instance.hxx>
31 
32 #include "thread.hxx"
33 #include "jobqueue.hxx"
34 #include "threadpool.hxx"
35 
36 
37 using namespace osl;
38 extern "C" {
39 
cppu_requestThreadWorker(void * pVoid)40 void SAL_CALL cppu_requestThreadWorker( void *pVoid )
41 {
42 	::cppu_threadpool::ORequestThread *pThread = ( ::cppu_threadpool::ORequestThread * ) pVoid;
43 
44 	pThread->run();
45 	pThread->onTerminated();
46 }
47 
48 }
49 
50 namespace cppu_threadpool {
51 
52 // ----------------------------------------------------------------------------------
~ThreadAdmin()53 	ThreadAdmin::~ThreadAdmin()
54 	{
55 #if OSL_DEBUG_LEVEL > 1
56 		if( m_lst.size() )
57 		{
58 			fprintf( stderr, "%lu Threads left\n" , static_cast<unsigned long>(m_lst.size()) );
59 		}
60 #endif
61 	}
62 
add(ORequestThread * p)63 	void ThreadAdmin::add( ORequestThread *p )
64 	{
65 		MutexGuard aGuard( m_mutex );
66 		m_lst.push_back( p );
67 	}
68 
remove(ORequestThread * p)69 	void ThreadAdmin::remove( ORequestThread * p )
70 	{
71 		MutexGuard aGuard( m_mutex );
72 		::std::list< ORequestThread * >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p );
73 		OSL_ASSERT( ii != m_lst.end() );
74 		m_lst.erase( ii );
75 	}
76 
join()77 	void ThreadAdmin::join()
78 	{
79 		ORequestThread *pCurrent;
80 		do
81 		{
82 			pCurrent = 0;
83 			{
84 				MutexGuard aGuard( m_mutex );
85 				if( ! m_lst.empty() )
86 				{
87 					pCurrent = m_lst.front();
88 					pCurrent->setDeleteSelf( sal_False );
89 				}
90 			}
91 			if ( pCurrent )
92 			{
93 				pCurrent->join();
94 				delete pCurrent;
95 			}
96 		} while( pCurrent );
97 	}
98 
99 	struct theThreadAdmin : public rtl::StaticWithInit< ThreadAdminHolder, theThreadAdmin >
100 	{
operator ()cppu_threadpool::theThreadAdmin101 		ThreadAdminHolder operator () () {
102 			ThreadAdminHolder aRet(new ThreadAdmin());
103 			return aRet;
104 		}
105 	};
106 
getInstance()107 	ThreadAdminHolder& ThreadAdmin::getInstance()
108 	{
109 		return theThreadAdmin::get();
110 	}
111 
112 // ----------------------------------------------------------------------------------
ORequestThread(JobQueue * pQueue,const ByteSequence & aThreadId,sal_Bool bAsynchron)113 	ORequestThread::ORequestThread( JobQueue *pQueue,
114 									const ByteSequence &aThreadId,
115 									sal_Bool bAsynchron )
116 		: m_thread( 0 )
117 		, m_aThreadAdmin( ThreadAdmin::getInstance() )
118 		, m_pQueue( pQueue )
119 		, m_aThreadId( aThreadId )
120 		, m_bAsynchron( bAsynchron )
121 		, m_bDeleteSelf( sal_True )
122 	{
123 		m_aThreadAdmin->add( this );
124 	}
125 
126 
~ORequestThread()127 	ORequestThread::~ORequestThread()
128 	{
129 		if (m_thread != 0)
130 		{
131 			osl_destroyThread(m_thread);
132 		}
133 	}
134 
135 
setTask(JobQueue * pQueue,const ByteSequence & aThreadId,sal_Bool bAsynchron)136 	void ORequestThread::setTask( JobQueue *pQueue,
137 								  const ByteSequence &aThreadId,
138 								  sal_Bool bAsynchron )
139 	{
140 		m_pQueue = pQueue;
141 		m_aThreadId = aThreadId;
142 		m_bAsynchron = bAsynchron;
143 	}
144 
create()145 	sal_Bool ORequestThread::create()
146 	{
147 		OSL_ASSERT(m_thread == 0);	// only one running thread per instance
148 
149 		m_thread = osl_createSuspendedThread( cppu_requestThreadWorker, (void*)this);
150 		if ( m_thread )
151 		{
152 			osl_resumeThread( m_thread );
153 		}
154 
155 		return m_thread != 0;
156 	}
157 
join()158 	void ORequestThread::join()
159 	{
160 		osl_joinWithThread( m_thread );
161 	}
162 
onTerminated()163 	void ORequestThread::onTerminated()
164 	{
165 		m_aThreadAdmin->remove( this );
166 		if( m_bDeleteSelf )
167 		{
168 			delete this;
169 		}
170 	}
171 
run()172 	void ORequestThread::run()
173 	{
174 		ThreadPoolHolder theThreadPool = cppu_threadpool::ThreadPool::getInstance();
175 
176 		while ( m_pQueue )
177 		{
178 			if( ! m_bAsynchron )
179 			{
180                 if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) )
181                 {
182                     OSL_ASSERT( false );
183                 }
184 			}
185 
186 			while( ! m_pQueue->isEmpty() )
187 			{
188 				// Note : Oneways should not get a disposable disposeid,
189 				//        It does not make sense to dispose a call in this state.
190 				//        That's way we put it an disposeid, that can't be used otherwise.
191 				m_pQueue->enter(
192                     sal::static_int_cast< sal_Int64 >(
193                         reinterpret_cast< sal_IntPtr >(this)),
194                     sal_True );
195 
196 				if( m_pQueue->isEmpty() )
197 				{
198 					theThreadPool->revokeQueue( m_aThreadId , m_bAsynchron );
199 					// Note : revokeQueue might have failed because m_pQueue.isEmpty()
200 					//        may be false (race).
201 				}
202 			}
203 
204 			delete m_pQueue;
205 			m_pQueue = 0;
206 
207 			if( ! m_bAsynchron )
208 			{
209 				uno_releaseIdFromCurrentThread();
210 			}
211 
212 			theThreadPool->waitInPool( this );
213 		}
214 	}
215 }
216