xref: /trunk/main/io/source/stm/opump.cxx (revision cdf0e10c)
1 /*************************************************************************
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * Copyright 2000, 2010 Oracle and/or its affiliates.
6  *
7  * OpenOffice.org - a multi-platform office productivity suite
8  *
9  * This file is part of OpenOffice.org.
10  *
11  * OpenOffice.org is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser General Public License version 3
13  * only, as published by the Free Software Foundation.
14  *
15  * OpenOffice.org is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU Lesser General Public License version 3 for more details
19  * (a copy is included in the LICENSE file that accompanied this code).
20  *
21  * You should have received a copy of the GNU Lesser General Public License
22  * version 3 along with OpenOffice.org.  If not, see
23  * <http://www.openoffice.org/license.html>
24  * for a copy of the LGPLv3 License.
25  *
26  ************************************************************************/
27 
28 // MARKER(update_precomp.py): autogen include statement, do not remove
29 #include "precompiled_io.hxx"
30 
31 #include <stdio.h>
32 
33 #include <osl/diagnose.h>
34 
35 #include <com/sun/star/io/XActiveDataSource.hpp>
36 #include <com/sun/star/io/XActiveDataSink.hpp>
37 #include <com/sun/star/io/XActiveDataControl.hpp>
38 #include <com/sun/star/io/XConnectable.hpp>
39 #include <com/sun/star/lang/XSingleServiceFactory.hpp>
40 #include <com/sun/star/lang/XMultiServiceFactory.hpp>
41 #include <com/sun/star/lang/XServiceInfo.hpp>
42 #include <com/sun/star/registry/XRegistryKey.hpp>
43 
44 #include <uno/dispatcher.h>
45 #include <uno/mapping.hxx>
46 #include <cppuhelper/implbase5.hxx>
47 #include <cppuhelper/factory.hxx>
48 #include <cppuhelper/interfacecontainer.hxx>
49 #include <osl/mutex.hxx>
50 #include <osl/thread.h>
51 
52 
53 using namespace osl;
54 using namespace std;
55 using namespace rtl;
56 using namespace cppu;
57 using namespace com::sun::star::uno;
58 using namespace com::sun::star::lang;
59 using namespace com::sun::star::registry;
60 using namespace com::sun::star::io;
61 
62 #include "factreg.hxx"
63 
64 namespace io_stm {
65 
66 	class Pump : public WeakImplHelper5<
67 		  XActiveDataSource, XActiveDataSink, XActiveDataControl, XConnectable, XServiceInfo >
68 	{
69 		Mutex									m_aMutex;
70 		oslThread								m_aThread;
71 
72 		Reference< XConnectable >				m_xPred;
73 		Reference< XConnectable >				m_xSucc;
74 		Reference< XInputStream >				m_xInput;
75 		Reference< XOutputStream >				m_xOutput;
76         OInterfaceContainerHelper               m_cnt;
77         sal_Bool                                m_closeFired;
78 
79 		void run();
80 		static void static_run( void* pObject );
81 
82         void close();
83 		void fireClose();
84 		void fireStarted();
85 		void fireTerminated();
86 		void fireError( const Any &a );
87 
88 	public:
89 		Pump();
90 		virtual ~Pump();
91 
92 		// XActiveDataSource
93 		virtual void SAL_CALL setOutputStream( const Reference< ::com::sun::star::io::XOutputStream >& xOutput ) throw();
94 		virtual Reference< ::com::sun::star::io::XOutputStream > SAL_CALL getOutputStream() throw();
95 
96 		// XActiveDataSink
97 		virtual void SAL_CALL setInputStream( const Reference< ::com::sun::star::io::XInputStream >& xStream ) throw();
98 		virtual Reference< ::com::sun::star::io::XInputStream > SAL_CALL getInputStream() throw();
99 
100 		// XActiveDataControl
101 		virtual void SAL_CALL addListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw();
102 		virtual void SAL_CALL removeListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw();
103 		virtual void SAL_CALL start() throw( RuntimeException );
104 		virtual void SAL_CALL terminate() throw();
105 
106 		// XConnectable
107 		virtual void SAL_CALL setPredecessor( const Reference< ::com::sun::star::io::XConnectable >& xPred ) throw();
108 		virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getPredecessor() throw();
109 		virtual void SAL_CALL setSuccessor( const Reference< ::com::sun::star::io::XConnectable >& xSucc ) throw();
110 		virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getSuccessor() throw();
111 
112 	public: // XServiceInfo
113 		virtual OUString    SAL_CALL getImplementationName() throw(  );
114 		virtual Sequence< OUString > SAL_CALL getSupportedServiceNames(void) throw(  );
115 		virtual sal_Bool     SAL_CALL supportsService(const OUString& ServiceName) throw(  );
116 	};
117 
118 Pump::Pump() : m_aThread( 0 ),
119                m_cnt( m_aMutex ),
120                m_closeFired( sal_False )
121 {
122 	g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt );
123 }
124 
125 Pump::~Pump()
126 {
127 	// exit gracefully
128     if( m_aThread )
129     {
130         osl_joinWithThread( m_aThread );
131         osl_destroyThread( m_aThread );
132     }
133 	g_moduleCount.modCnt.release( &g_moduleCount.modCnt );
134 }
135 
136 void Pump::fireError( const  Any & exception )
137 {
138     OInterfaceIteratorHelper iter( m_cnt );
139     while( iter.hasMoreElements() )
140     {
141         try
142 		{
143 			static_cast< XStreamListener * > ( iter.next() )->error( exception );
144 		}
145 		catch ( RuntimeException &e )
146 		{
147 			OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
148 			OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
149 		}
150 	}
151 }
152 
153 void Pump::fireClose()
154 {
155     sal_Bool bFire = sal_False;
156     {
157         MutexGuard guard( m_aMutex );
158         if( ! m_closeFired  )
159         {
160             m_closeFired = sal_True;
161             bFire = sal_True;
162         }
163     }
164 
165     if( bFire )
166     {
167         OInterfaceIteratorHelper iter( m_cnt );
168         while( iter.hasMoreElements() )
169         {
170             try
171             {
172                 static_cast< XStreamListener * > ( iter.next() )->closed( );
173             }
174             catch ( RuntimeException &e )
175             {
176                 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
177                 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
178             }
179         }
180     }
181 }
182 
183 void Pump::fireStarted()
184 {
185     OInterfaceIteratorHelper iter( m_cnt );
186     while( iter.hasMoreElements() )
187     {
188         try
189 		{
190 			static_cast< XStreamListener * > ( iter.next() )->started( );
191 		}
192 		catch ( RuntimeException &e )
193 		{
194 			OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
195 			OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
196 		}
197 	}
198 }
199 
200 void Pump::fireTerminated()
201 {
202     OInterfaceIteratorHelper iter( m_cnt );
203     while( iter.hasMoreElements() )
204     {
205         try
206 		{
207 			static_cast< XStreamListener * > ( iter.next() )->terminated();
208 		}
209 		catch ( RuntimeException &e )
210 		{
211 			OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
212 			OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
213 		}
214 	}
215 }
216 
217 
218 
219 void Pump::close()
220 {
221 	// close streams and release references
222     Reference< XInputStream > rInput;
223     Reference< XOutputStream > rOutput;
224     {
225         MutexGuard guard( m_aMutex );
226         rInput = m_xInput;
227 		m_xInput.clear();
228 
229         rOutput = m_xOutput;
230         m_xOutput.clear();
231         m_xSucc.clear();
232         m_xPred.clear();
233     }
234 	if( rInput.is() )
235 	{
236 		try
237 		{
238 			rInput->closeInput();
239 		}
240 		catch( Exception & )
241 		{
242 			// go down calm
243 		}
244 	}
245 	if( rOutput.is() )
246 	{
247 		try
248 		{
249 			rOutput->closeOutput();
250 		}
251 		catch( Exception & )
252 		{
253 			// go down calm
254 		}
255 	}
256 }
257 
258 void Pump::static_run( void* pObject )
259 {
260 	((Pump*)pObject)->run();
261 	((Pump*)pObject)->release();
262 }
263 
264 void Pump::run()
265 {
266 	try
267 	{
268         fireStarted();
269 		try
270 		{
271             Reference< XInputStream > rInput;
272             Reference< XOutputStream > rOutput;
273             {
274                 Guard< Mutex > aGuard( m_aMutex );
275                 rInput = m_xInput;
276                 rOutput = m_xOutput;
277             }
278 
279 			if( ! rInput.is() )
280 			{
281 				NotConnectedException exception(
282 					OUString::createFromAscii( "no input stream set" ) , Reference<XInterface>((OWeakObject*)this) );
283 				throw exception;
284 			}
285 			Sequence< sal_Int8 > aData;
286 			while( rInput->readSomeBytes( aData, 65536 ) )
287 			{
288 				if( ! rOutput.is() )
289 				{
290 					NotConnectedException exception(
291 						OUString::createFromAscii( "no output stream set" ) , Reference<XInterface>( (OWeakObject*)this) );
292 					throw exception;
293 				}
294 				rOutput->writeBytes( aData );
295 				osl_yieldThread();
296 			}
297 		}
298 		catch ( IOException & e )
299 		{
300 			fireError( makeAny( e ) );
301 		}
302 		catch ( RuntimeException & e )
303 		{
304 			fireError( makeAny( e ) );
305 		}
306 		catch ( Exception & e )
307 		{
308 			fireError( makeAny( e ) );
309 		}
310 
311 		close();
312         fireClose();
313 	}
314 	catch ( com::sun::star::uno::Exception &e )
315 	{
316 		// we are the last on the stack.
317 		// this is to avoid crashing the program, when e.g. a bridge crashes
318 		OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
319 		OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception", sMessage.getStr() );
320 	}
321 }
322 
323 // ------------------------------------------------------------
324 
325 /*
326  * XConnectable
327  */
328 
329 void Pump::setPredecessor( const Reference< XConnectable >& xPred ) throw()
330 {
331 	Guard< Mutex > aGuard( m_aMutex );
332 	m_xPred = xPred;
333 }
334 
335 // ------------------------------------------------------------
336 
337 Reference< XConnectable > Pump::getPredecessor() throw()
338 {
339 	Guard< Mutex > aGuard( m_aMutex );
340 	return m_xPred;
341 }
342 
343 // ------------------------------------------------------------
344 
345 void Pump::setSuccessor( const Reference< XConnectable >& xSucc ) throw()
346 {
347 	Guard< Mutex > aGuard( m_aMutex );
348 	m_xSucc = xSucc;
349 }
350 
351 // ------------------------------------------------------------
352 
353 Reference< XConnectable > Pump::getSuccessor() throw()
354 {
355 	Guard< Mutex > aGuard( m_aMutex );
356 	return m_xSucc;
357 }
358 
359 // -----------------------------------------------------------------
360 
361 /*
362  * XActiveDataControl
363  */
364 
365 void Pump::addListener( const Reference< XStreamListener >& xListener ) throw()
366 {
367     m_cnt.addInterface( xListener );
368 }
369 
370 // ------------------------------------------------------------
371 
372 void Pump::removeListener( const Reference< XStreamListener >& xListener ) throw()
373 {
374     m_cnt.removeInterface( xListener );
375 }
376 
377 // ------------------------------------------------------------
378 
379 void Pump::start() throw( RuntimeException )
380 {
381 	Guard< Mutex > aGuard( m_aMutex );
382 	m_aThread = osl_createSuspendedThread((oslWorkerFunction)Pump::static_run,this);
383     if( m_aThread )
384     {
385         // will be released by OPump::static_run
386         acquire();
387         osl_resumeThread( m_aThread );
388     }
389     else
390     {
391         throw RuntimeException(
392             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pump::start Couldn't create worker thread" )),
393             *this);
394     }
395 }
396 
397 // ------------------------------------------------------------
398 
399 void Pump::terminate() throw()
400 {
401 	close();
402 
403 	// wait for the worker to die
404     if( m_aThread )
405         osl_joinWithThread( m_aThread );
406 
407     fireTerminated();
408     fireClose();
409 }
410 
411 // ------------------------------------------------------------
412 
413 /*
414  * XActiveDataSink
415  */
416 
417 void Pump::setInputStream( const Reference< XInputStream >& xStream ) throw()
418 {
419 	Guard< Mutex > aGuard( m_aMutex );
420 	m_xInput = xStream;
421 	Reference< XConnectable > xConnect( xStream, UNO_QUERY );
422 	if( xConnect.is() )
423 		xConnect->setSuccessor( this );
424 	// data transfer starts in XActiveDataControl::start
425 }
426 
427 // ------------------------------------------------------------
428 
429 Reference< XInputStream > Pump::getInputStream() throw()
430 {
431 	Guard< Mutex > aGuard( m_aMutex );
432 	return m_xInput;
433 }
434 
435 // ------------------------------------------------------------
436 
437 /*
438  * XActiveDataSource
439  */
440 
441 void Pump::setOutputStream( const Reference< XOutputStream >& xOut ) throw()
442 {
443 	Guard< Mutex > aGuard( m_aMutex );
444 	m_xOutput = xOut;
445 	Reference< XConnectable > xConnect( xOut, UNO_QUERY );
446 	if( xConnect.is() )
447 		xConnect->setPredecessor( this );
448 	// data transfer starts in XActiveDataControl::start
449 }
450 
451 // ------------------------------------------------------------
452 
453 Reference< XOutputStream > Pump::getOutputStream() throw()
454 {
455 	Guard< Mutex > aGuard( m_aMutex );
456 	return m_xOutput;
457 }
458 
459 
460 // XServiceInfo
461 OUString Pump::getImplementationName() throw(  )
462 {
463     return OPumpImpl_getImplementationName();
464 }
465 
466 // XServiceInfo
467 sal_Bool Pump::supportsService(const OUString& ServiceName) throw(  )
468 {
469     Sequence< OUString > aSNL = getSupportedServiceNames();
470     const OUString * pArray = aSNL.getConstArray();
471 
472     for( sal_Int32 i = 0; i < aSNL.getLength(); i++ )
473         if( pArray[i] == ServiceName )
474             return sal_True;
475 
476     return sal_False;
477 }
478 
479 // XServiceInfo
480 Sequence< OUString > Pump::getSupportedServiceNames(void) throw(  )
481 {
482     return OPumpImpl_getSupportedServiceNames();
483 }
484 
485 
486 Reference< XInterface > SAL_CALL OPumpImpl_CreateInstance( const Reference< XComponentContext > & ) throw (Exception)
487 {
488 	return Reference< XInterface >( *new Pump );
489 }
490 
491 OUString OPumpImpl_getImplementationName()
492 {
493 	return OUString( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.comp.io.Pump") );
494 }
495 
496 Sequence<OUString> OPumpImpl_getSupportedServiceNames(void)
497 {
498 	OUString s( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.io.Pump" ) );
499 	Sequence< OUString > seq( &s , 1 );
500 	return seq;
501 }
502 
503 }
504 
505