xref: /trunk/main/cppu/source/threadpool/thread.cxx (revision cdf0e10c)
1 /*************************************************************************
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * Copyright 2000, 2010 Oracle and/or its affiliates.
6  *
7  * OpenOffice.org - a multi-platform office productivity suite
8  *
9  * This file is part of OpenOffice.org.
10  *
11  * OpenOffice.org is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser General Public License version 3
13  * only, as published by the Free Software Foundation.
14  *
15  * OpenOffice.org is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU Lesser General Public License version 3 for more details
19  * (a copy is included in the LICENSE file that accompanied this code).
20  *
21  * You should have received a copy of the GNU Lesser General Public License
22  * version 3 along with OpenOffice.org.  If not, see
23  * <http://www.openoffice.org/license.html>
24  * for a copy of the LGPLv3 License.
25  *
26  ************************************************************************/
27 
28 // MARKER(update_precomp.py): autogen include statement, do not remove
29 #include "precompiled_cppu.hxx"
30 #include <stdio.h>
31 #include <osl/diagnose.h>
32 #include <uno/threadpool.h>
33 
34 #include <rtl/instance.hxx>
35 
36 #include "thread.hxx"
37 #include "jobqueue.hxx"
38 #include "threadpool.hxx"
39 
40 
41 using namespace osl;
42 extern "C" {
43 
44 void SAL_CALL cppu_requestThreadWorker( void *pVoid )
45 {
46 	::cppu_threadpool::ORequestThread *pThread = ( ::cppu_threadpool::ORequestThread * ) pVoid;
47 
48 	pThread->run();
49 	pThread->onTerminated();
50 }
51 
52 }
53 
54 namespace cppu_threadpool {
55 
56 // ----------------------------------------------------------------------------------
57 	ThreadAdmin::~ThreadAdmin()
58 	{
59 #if OSL_DEBUG_LEVEL > 1
60 		if( m_lst.size() )
61 		{
62 			fprintf( stderr, "%lu Threads left\n" , static_cast<unsigned long>(m_lst.size()) );
63 		}
64 #endif
65 	}
66 
67 	void ThreadAdmin::add( ORequestThread *p )
68 	{
69 		MutexGuard aGuard( m_mutex );
70 		m_lst.push_back( p );
71 	}
72 
73 	void ThreadAdmin::remove( ORequestThread * p )
74 	{
75 		MutexGuard aGuard( m_mutex );
76 		::std::list< ORequestThread * >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p );
77 		OSL_ASSERT( ii != m_lst.end() );
78 		m_lst.erase( ii );
79 	}
80 
81 	void ThreadAdmin::join()
82 	{
83 		ORequestThread *pCurrent;
84 		do
85 		{
86 			pCurrent = 0;
87 			{
88 				MutexGuard aGuard( m_mutex );
89 				if( ! m_lst.empty() )
90 				{
91 					pCurrent = m_lst.front();
92 					pCurrent->setDeleteSelf( sal_False );
93 				}
94 			}
95 			if ( pCurrent )
96 			{
97 				pCurrent->join();
98 				delete pCurrent;
99 			}
100 		} while( pCurrent );
101 	}
102 
103 	struct theThreadAdmin : public rtl::StaticWithInit< ThreadAdminHolder, theThreadAdmin >
104 	{
105 		ThreadAdminHolder operator () () {
106 			ThreadAdminHolder aRet(new ThreadAdmin());
107 			return aRet;
108 		}
109 	};
110 
111 	ThreadAdminHolder& ThreadAdmin::getInstance()
112 	{
113 		return theThreadAdmin::get();
114 	}
115 
116 // ----------------------------------------------------------------------------------
117 	ORequestThread::ORequestThread( JobQueue *pQueue,
118 									const ByteSequence &aThreadId,
119 									sal_Bool bAsynchron )
120 		: m_thread( 0 )
121 		, m_aThreadAdmin( ThreadAdmin::getInstance() )
122 		, m_pQueue( pQueue )
123 		, m_aThreadId( aThreadId )
124 		, m_bAsynchron( bAsynchron )
125 		, m_bDeleteSelf( sal_True )
126 	{
127 		m_aThreadAdmin->add( this );
128 	}
129 
130 
131 	ORequestThread::~ORequestThread()
132 	{
133 		if (m_thread != 0)
134 		{
135 			osl_destroyThread(m_thread);
136 		}
137 	}
138 
139 
140 	void ORequestThread::setTask( JobQueue *pQueue,
141 								  const ByteSequence &aThreadId,
142 								  sal_Bool bAsynchron )
143 	{
144 		m_pQueue = pQueue;
145 		m_aThreadId = aThreadId;
146 		m_bAsynchron = bAsynchron;
147 	}
148 
149 	sal_Bool ORequestThread::create()
150 	{
151 		OSL_ASSERT(m_thread == 0);	// only one running thread per instance
152 
153 		m_thread = osl_createSuspendedThread( cppu_requestThreadWorker, (void*)this);
154 		if ( m_thread )
155 		{
156 			osl_resumeThread( m_thread );
157 		}
158 
159 		return m_thread != 0;
160 	}
161 
162 	void ORequestThread::join()
163 	{
164 		osl_joinWithThread( m_thread );
165 	}
166 
167 	void ORequestThread::onTerminated()
168 	{
169 		m_aThreadAdmin->remove( this );
170 		if( m_bDeleteSelf )
171 		{
172 			delete this;
173 		}
174 	}
175 
176 	void ORequestThread::run()
177 	{
178 		ThreadPoolHolder theThreadPool = cppu_threadpool::ThreadPool::getInstance();
179 
180 		while ( m_pQueue )
181 		{
182 			if( ! m_bAsynchron )
183 			{
184                 if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) )
185                 {
186                     OSL_ASSERT( false );
187                 }
188 			}
189 
190 			while( ! m_pQueue->isEmpty() )
191 			{
192 				// Note : Oneways should not get a disposable disposeid,
193 				//        It does not make sense to dispose a call in this state.
194 				//        That's way we put it an disposeid, that can't be used otherwise.
195 				m_pQueue->enter(
196                     sal::static_int_cast< sal_Int64 >(
197                         reinterpret_cast< sal_IntPtr >(this)),
198                     sal_True );
199 
200 				if( m_pQueue->isEmpty() )
201 				{
202 					theThreadPool->revokeQueue( m_aThreadId , m_bAsynchron );
203 					// Note : revokeQueue might have failed because m_pQueue.isEmpty()
204 					//        may be false (race).
205 				}
206 			}
207 
208 			delete m_pQueue;
209 			m_pQueue = 0;
210 
211 			if( ! m_bAsynchron )
212 			{
213 				uno_releaseIdFromCurrentThread();
214 			}
215 
216 			theThreadPool->waitInPool( this );
217 		}
218 	}
219 }
220