xref: /trunk/main/io/source/stm/opump.cxx (revision 3716f815)
1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_io.hxx"
26 
27 #include <stdio.h>
28 
29 #include <osl/diagnose.h>
30 
31 #include <com/sun/star/io/XActiveDataSource.hpp>
32 #include <com/sun/star/io/XActiveDataSink.hpp>
33 #include <com/sun/star/io/XActiveDataControl.hpp>
34 #include <com/sun/star/io/XConnectable.hpp>
35 #include <com/sun/star/lang/XSingleServiceFactory.hpp>
36 #include <com/sun/star/lang/XMultiServiceFactory.hpp>
37 #include <com/sun/star/lang/XServiceInfo.hpp>
38 #include <com/sun/star/registry/XRegistryKey.hpp>
39 
40 #include <uno/dispatcher.h>
41 #include <uno/mapping.hxx>
42 #include <cppuhelper/implbase5.hxx>
43 #include <cppuhelper/factory.hxx>
44 #include <cppuhelper/interfacecontainer.hxx>
45 #include <osl/mutex.hxx>
46 #include <osl/thread.h>
47 
48 
49 using namespace osl;
50 using namespace std;
51 using namespace rtl;
52 using namespace cppu;
53 using namespace com::sun::star::uno;
54 using namespace com::sun::star::lang;
55 using namespace com::sun::star::registry;
56 using namespace com::sun::star::io;
57 
58 #include "factreg.hxx"
59 
60 namespace io_stm {
61 
62 	class Pump : public WeakImplHelper5<
63 		  XActiveDataSource, XActiveDataSink, XActiveDataControl, XConnectable, XServiceInfo >
64 	{
65 		Mutex									m_aMutex;
66 		oslThread								m_aThread;
67 
68 		Reference< XConnectable >				m_xPred;
69 		Reference< XConnectable >				m_xSucc;
70 		Reference< XInputStream >				m_xInput;
71 		Reference< XOutputStream >				m_xOutput;
72         OInterfaceContainerHelper               m_cnt;
73         sal_Bool                                m_closeFired;
74 
75 		void run();
76 		static void static_run( void* pObject );
77 
78         void close();
79 		void fireClose();
80 		void fireStarted();
81 		void fireTerminated();
82 		void fireError( const Any &a );
83 
84 	public:
85 		Pump();
86 		virtual ~Pump();
87 
88 		// XActiveDataSource
89 		virtual void SAL_CALL setOutputStream( const Reference< ::com::sun::star::io::XOutputStream >& xOutput ) throw();
90 		virtual Reference< ::com::sun::star::io::XOutputStream > SAL_CALL getOutputStream() throw();
91 
92 		// XActiveDataSink
93 		virtual void SAL_CALL setInputStream( const Reference< ::com::sun::star::io::XInputStream >& xStream ) throw();
94 		virtual Reference< ::com::sun::star::io::XInputStream > SAL_CALL getInputStream() throw();
95 
96 		// XActiveDataControl
97 		virtual void SAL_CALL addListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw();
98 		virtual void SAL_CALL removeListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw();
99 		virtual void SAL_CALL start() throw( RuntimeException );
100 		virtual void SAL_CALL terminate() throw();
101 
102 		// XConnectable
103 		virtual void SAL_CALL setPredecessor( const Reference< ::com::sun::star::io::XConnectable >& xPred ) throw();
104 		virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getPredecessor() throw();
105 		virtual void SAL_CALL setSuccessor( const Reference< ::com::sun::star::io::XConnectable >& xSucc ) throw();
106 		virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getSuccessor() throw();
107 
108 	public: // XServiceInfo
109 		virtual OUString    SAL_CALL getImplementationName() throw(  );
110 		virtual Sequence< OUString > SAL_CALL getSupportedServiceNames(void) throw(  );
111 		virtual sal_Bool     SAL_CALL supportsService(const OUString& ServiceName) throw(  );
112 	};
113 
Pump()114 Pump::Pump() : m_aThread( 0 ),
115                m_cnt( m_aMutex ),
116                m_closeFired( sal_False )
117 {
118 	g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt );
119 }
120 
~Pump()121 Pump::~Pump()
122 {
123 	// exit gracefully
124     if( m_aThread )
125     {
126         osl_joinWithThread( m_aThread );
127         osl_destroyThread( m_aThread );
128     }
129 	g_moduleCount.modCnt.release( &g_moduleCount.modCnt );
130 }
131 
fireError(const Any & exception)132 void Pump::fireError( const  Any & exception )
133 {
134     OInterfaceIteratorHelper iter( m_cnt );
135     while( iter.hasMoreElements() )
136     {
137         try
138 		{
139 			static_cast< XStreamListener * > ( iter.next() )->error( exception );
140 		}
141 		catch ( RuntimeException &e )
142 		{
143 			OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
144 			OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
145 		}
146 	}
147 }
148 
fireClose()149 void Pump::fireClose()
150 {
151     sal_Bool bFire = sal_False;
152     {
153         MutexGuard guard( m_aMutex );
154         if( ! m_closeFired  )
155         {
156             m_closeFired = sal_True;
157             bFire = sal_True;
158         }
159     }
160 
161     if( bFire )
162     {
163         OInterfaceIteratorHelper iter( m_cnt );
164         while( iter.hasMoreElements() )
165         {
166             try
167             {
168                 static_cast< XStreamListener * > ( iter.next() )->closed( );
169             }
170             catch ( RuntimeException &e )
171             {
172                 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
173                 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
174             }
175         }
176     }
177 }
178 
fireStarted()179 void Pump::fireStarted()
180 {
181     OInterfaceIteratorHelper iter( m_cnt );
182     while( iter.hasMoreElements() )
183     {
184         try
185 		{
186 			static_cast< XStreamListener * > ( iter.next() )->started( );
187 		}
188 		catch ( RuntimeException &e )
189 		{
190 			OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
191 			OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
192 		}
193 	}
194 }
195 
fireTerminated()196 void Pump::fireTerminated()
197 {
198     OInterfaceIteratorHelper iter( m_cnt );
199     while( iter.hasMoreElements() )
200     {
201         try
202 		{
203 			static_cast< XStreamListener * > ( iter.next() )->terminated();
204 		}
205 		catch ( RuntimeException &e )
206 		{
207 			OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
208 			OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
209 		}
210 	}
211 }
212 
213 
214 
close()215 void Pump::close()
216 {
217 	// close streams and release references
218     Reference< XInputStream > rInput;
219     Reference< XOutputStream > rOutput;
220     {
221         MutexGuard guard( m_aMutex );
222         rInput = m_xInput;
223 		m_xInput.clear();
224 
225         rOutput = m_xOutput;
226         m_xOutput.clear();
227         m_xSucc.clear();
228         m_xPred.clear();
229     }
230 	if( rInput.is() )
231 	{
232 		try
233 		{
234 			rInput->closeInput();
235 		}
236 		catch( Exception & )
237 		{
238 			// go down calm
239 		}
240 	}
241 	if( rOutput.is() )
242 	{
243 		try
244 		{
245 			rOutput->closeOutput();
246 		}
247 		catch( Exception & )
248 		{
249 			// go down calm
250 		}
251 	}
252 }
253 
static_run(void * pObject)254 void Pump::static_run( void* pObject )
255 {
256 	((Pump*)pObject)->run();
257 	((Pump*)pObject)->release();
258 }
259 
run()260 void Pump::run()
261 {
262 	try
263 	{
264         fireStarted();
265 		try
266 		{
267             Reference< XInputStream > rInput;
268             Reference< XOutputStream > rOutput;
269             {
270                 Guard< Mutex > aGuard( m_aMutex );
271                 rInput = m_xInput;
272                 rOutput = m_xOutput;
273             }
274 
275 			if( ! rInput.is() )
276 			{
277 				NotConnectedException exception(
278 					OUString::createFromAscii( "no input stream set" ) , Reference<XInterface>((OWeakObject*)this) );
279 				throw exception;
280 			}
281 			Sequence< sal_Int8 > aData;
282 			while( rInput->readSomeBytes( aData, 65536 ) )
283 			{
284 				if( ! rOutput.is() )
285 				{
286 					NotConnectedException exception(
287 						OUString::createFromAscii( "no output stream set" ) , Reference<XInterface>( (OWeakObject*)this) );
288 					throw exception;
289 				}
290 				rOutput->writeBytes( aData );
291 				osl_yieldThread();
292 			}
293 		}
294 		catch ( IOException & e )
295 		{
296 			fireError( makeAny( e ) );
297 		}
298 		catch ( RuntimeException & e )
299 		{
300 			fireError( makeAny( e ) );
301 		}
302 		catch ( Exception & e )
303 		{
304 			fireError( makeAny( e ) );
305 		}
306 
307 		close();
308         fireClose();
309 	}
310 	catch ( com::sun::star::uno::Exception &e )
311 	{
312 		// we are the last on the stack.
313 		// this is to avoid crashing the program, when e.g. a bridge crashes
314 		OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
315 		OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception", sMessage.getStr() );
316 	}
317 }
318 
319 // ------------------------------------------------------------
320 
321 /*
322  * XConnectable
323  */
324 
setPredecessor(const Reference<XConnectable> & xPred)325 void Pump::setPredecessor( const Reference< XConnectable >& xPred ) throw()
326 {
327 	Guard< Mutex > aGuard( m_aMutex );
328 	m_xPred = xPred;
329 }
330 
331 // ------------------------------------------------------------
332 
getPredecessor()333 Reference< XConnectable > Pump::getPredecessor() throw()
334 {
335 	Guard< Mutex > aGuard( m_aMutex );
336 	return m_xPred;
337 }
338 
339 // ------------------------------------------------------------
340 
setSuccessor(const Reference<XConnectable> & xSucc)341 void Pump::setSuccessor( const Reference< XConnectable >& xSucc ) throw()
342 {
343 	Guard< Mutex > aGuard( m_aMutex );
344 	m_xSucc = xSucc;
345 }
346 
347 // ------------------------------------------------------------
348 
getSuccessor()349 Reference< XConnectable > Pump::getSuccessor() throw()
350 {
351 	Guard< Mutex > aGuard( m_aMutex );
352 	return m_xSucc;
353 }
354 
355 // -----------------------------------------------------------------
356 
357 /*
358  * XActiveDataControl
359  */
360 
addListener(const Reference<XStreamListener> & xListener)361 void Pump::addListener( const Reference< XStreamListener >& xListener ) throw()
362 {
363     m_cnt.addInterface( xListener );
364 }
365 
366 // ------------------------------------------------------------
367 
removeListener(const Reference<XStreamListener> & xListener)368 void Pump::removeListener( const Reference< XStreamListener >& xListener ) throw()
369 {
370     m_cnt.removeInterface( xListener );
371 }
372 
373 // ------------------------------------------------------------
374 
start()375 void Pump::start() throw( RuntimeException )
376 {
377 	Guard< Mutex > aGuard( m_aMutex );
378 	m_aThread = osl_createSuspendedThread((oslWorkerFunction)Pump::static_run,this);
379     if( m_aThread )
380     {
381         // will be released by OPump::static_run
382         acquire();
383         osl_resumeThread( m_aThread );
384     }
385     else
386     {
387         throw RuntimeException(
388             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pump::start Couldn't create worker thread" )),
389             *this);
390     }
391 }
392 
393 // ------------------------------------------------------------
394 
terminate()395 void Pump::terminate() throw()
396 {
397 	close();
398 
399 	// wait for the worker to die
400     if( m_aThread )
401         osl_joinWithThread( m_aThread );
402 
403     fireTerminated();
404     fireClose();
405 }
406 
407 // ------------------------------------------------------------
408 
409 /*
410  * XActiveDataSink
411  */
412 
setInputStream(const Reference<XInputStream> & xStream)413 void Pump::setInputStream( const Reference< XInputStream >& xStream ) throw()
414 {
415 	Guard< Mutex > aGuard( m_aMutex );
416 	m_xInput = xStream;
417 	Reference< XConnectable > xConnect( xStream, UNO_QUERY );
418 	if( xConnect.is() )
419 		xConnect->setSuccessor( this );
420 	// data transfer starts in XActiveDataControl::start
421 }
422 
423 // ------------------------------------------------------------
424 
getInputStream()425 Reference< XInputStream > Pump::getInputStream() throw()
426 {
427 	Guard< Mutex > aGuard( m_aMutex );
428 	return m_xInput;
429 }
430 
431 // ------------------------------------------------------------
432 
433 /*
434  * XActiveDataSource
435  */
436 
setOutputStream(const Reference<XOutputStream> & xOut)437 void Pump::setOutputStream( const Reference< XOutputStream >& xOut ) throw()
438 {
439 	Guard< Mutex > aGuard( m_aMutex );
440 	m_xOutput = xOut;
441 	Reference< XConnectable > xConnect( xOut, UNO_QUERY );
442 	if( xConnect.is() )
443 		xConnect->setPredecessor( this );
444 	// data transfer starts in XActiveDataControl::start
445 }
446 
447 // ------------------------------------------------------------
448 
getOutputStream()449 Reference< XOutputStream > Pump::getOutputStream() throw()
450 {
451 	Guard< Mutex > aGuard( m_aMutex );
452 	return m_xOutput;
453 }
454 
455 
456 // XServiceInfo
getImplementationName()457 OUString Pump::getImplementationName() throw(  )
458 {
459     return OPumpImpl_getImplementationName();
460 }
461 
462 // XServiceInfo
supportsService(const OUString & ServiceName)463 sal_Bool Pump::supportsService(const OUString& ServiceName) throw(  )
464 {
465     Sequence< OUString > aSNL = getSupportedServiceNames();
466     const OUString * pArray = aSNL.getConstArray();
467 
468     for( sal_Int32 i = 0; i < aSNL.getLength(); i++ )
469         if( pArray[i] == ServiceName )
470             return sal_True;
471 
472     return sal_False;
473 }
474 
475 // XServiceInfo
getSupportedServiceNames(void)476 Sequence< OUString > Pump::getSupportedServiceNames(void) throw(  )
477 {
478     return OPumpImpl_getSupportedServiceNames();
479 }
480 
481 
OPumpImpl_CreateInstance(const Reference<XComponentContext> &)482 Reference< XInterface > SAL_CALL OPumpImpl_CreateInstance( const Reference< XComponentContext > & ) throw (Exception)
483 {
484 	return Reference< XInterface >( *new Pump );
485 }
486 
OPumpImpl_getImplementationName()487 OUString OPumpImpl_getImplementationName()
488 {
489 	return OUString( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.comp.io.Pump") );
490 }
491 
OPumpImpl_getSupportedServiceNames(void)492 Sequence<OUString> OPumpImpl_getSupportedServiceNames(void)
493 {
494 	OUString s( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.io.Pump" ) );
495 	Sequence< OUString > seq( &s , 1 );
496 	return seq;
497 }
498 
499 }
500 
501