1 /**************************************************************
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 *
20 *************************************************************/
21
22
23
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_io.hxx"
26
27 #include <stdio.h>
28
29 #include <osl/diagnose.h>
30
31 #include <com/sun/star/io/XActiveDataSource.hpp>
32 #include <com/sun/star/io/XActiveDataSink.hpp>
33 #include <com/sun/star/io/XActiveDataControl.hpp>
34 #include <com/sun/star/io/XConnectable.hpp>
35 #include <com/sun/star/lang/XSingleServiceFactory.hpp>
36 #include <com/sun/star/lang/XMultiServiceFactory.hpp>
37 #include <com/sun/star/lang/XServiceInfo.hpp>
38 #include <com/sun/star/registry/XRegistryKey.hpp>
39
40 #include <uno/dispatcher.h>
41 #include <uno/mapping.hxx>
42 #include <cppuhelper/implbase5.hxx>
43 #include <cppuhelper/factory.hxx>
44 #include <cppuhelper/interfacecontainer.hxx>
45 #include <osl/mutex.hxx>
46 #include <osl/thread.h>
47
48
49 using namespace osl;
50 using namespace std;
51 using namespace rtl;
52 using namespace cppu;
53 using namespace com::sun::star::uno;
54 using namespace com::sun::star::lang;
55 using namespace com::sun::star::registry;
56 using namespace com::sun::star::io;
57
58 #include "factreg.hxx"
59
60 namespace io_stm {
61
62 class Pump : public WeakImplHelper5<
63 XActiveDataSource, XActiveDataSink, XActiveDataControl, XConnectable, XServiceInfo >
64 {
65 Mutex m_aMutex;
66 oslThread m_aThread;
67
68 Reference< XConnectable > m_xPred;
69 Reference< XConnectable > m_xSucc;
70 Reference< XInputStream > m_xInput;
71 Reference< XOutputStream > m_xOutput;
72 OInterfaceContainerHelper m_cnt;
73 sal_Bool m_closeFired;
74
75 void run();
76 static void static_run( void* pObject );
77
78 void close();
79 void fireClose();
80 void fireStarted();
81 void fireTerminated();
82 void fireError( const Any &a );
83
84 public:
85 Pump();
86 virtual ~Pump();
87
88 // XActiveDataSource
89 virtual void SAL_CALL setOutputStream( const Reference< ::com::sun::star::io::XOutputStream >& xOutput ) throw();
90 virtual Reference< ::com::sun::star::io::XOutputStream > SAL_CALL getOutputStream() throw();
91
92 // XActiveDataSink
93 virtual void SAL_CALL setInputStream( const Reference< ::com::sun::star::io::XInputStream >& xStream ) throw();
94 virtual Reference< ::com::sun::star::io::XInputStream > SAL_CALL getInputStream() throw();
95
96 // XActiveDataControl
97 virtual void SAL_CALL addListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw();
98 virtual void SAL_CALL removeListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw();
99 virtual void SAL_CALL start() throw( RuntimeException );
100 virtual void SAL_CALL terminate() throw();
101
102 // XConnectable
103 virtual void SAL_CALL setPredecessor( const Reference< ::com::sun::star::io::XConnectable >& xPred ) throw();
104 virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getPredecessor() throw();
105 virtual void SAL_CALL setSuccessor( const Reference< ::com::sun::star::io::XConnectable >& xSucc ) throw();
106 virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getSuccessor() throw();
107
108 public: // XServiceInfo
109 virtual OUString SAL_CALL getImplementationName() throw( );
110 virtual Sequence< OUString > SAL_CALL getSupportedServiceNames(void) throw( );
111 virtual sal_Bool SAL_CALL supportsService(const OUString& ServiceName) throw( );
112 };
113
Pump()114 Pump::Pump() : m_aThread( 0 ),
115 m_cnt( m_aMutex ),
116 m_closeFired( sal_False )
117 {
118 g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt );
119 }
120
~Pump()121 Pump::~Pump()
122 {
123 // exit gracefully
124 if( m_aThread )
125 {
126 osl_joinWithThread( m_aThread );
127 osl_destroyThread( m_aThread );
128 }
129 g_moduleCount.modCnt.release( &g_moduleCount.modCnt );
130 }
131
fireError(const Any & exception)132 void Pump::fireError( const Any & exception )
133 {
134 OInterfaceIteratorHelper iter( m_cnt );
135 while( iter.hasMoreElements() )
136 {
137 try
138 {
139 static_cast< XStreamListener * > ( iter.next() )->error( exception );
140 }
141 catch ( RuntimeException &e )
142 {
143 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
144 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
145 }
146 }
147 }
148
fireClose()149 void Pump::fireClose()
150 {
151 sal_Bool bFire = sal_False;
152 {
153 MutexGuard guard( m_aMutex );
154 if( ! m_closeFired )
155 {
156 m_closeFired = sal_True;
157 bFire = sal_True;
158 }
159 }
160
161 if( bFire )
162 {
163 OInterfaceIteratorHelper iter( m_cnt );
164 while( iter.hasMoreElements() )
165 {
166 try
167 {
168 static_cast< XStreamListener * > ( iter.next() )->closed( );
169 }
170 catch ( RuntimeException &e )
171 {
172 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
173 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
174 }
175 }
176 }
177 }
178
fireStarted()179 void Pump::fireStarted()
180 {
181 OInterfaceIteratorHelper iter( m_cnt );
182 while( iter.hasMoreElements() )
183 {
184 try
185 {
186 static_cast< XStreamListener * > ( iter.next() )->started( );
187 }
188 catch ( RuntimeException &e )
189 {
190 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
191 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
192 }
193 }
194 }
195
fireTerminated()196 void Pump::fireTerminated()
197 {
198 OInterfaceIteratorHelper iter( m_cnt );
199 while( iter.hasMoreElements() )
200 {
201 try
202 {
203 static_cast< XStreamListener * > ( iter.next() )->terminated();
204 }
205 catch ( RuntimeException &e )
206 {
207 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
208 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
209 }
210 }
211 }
212
213
214
close()215 void Pump::close()
216 {
217 // close streams and release references
218 Reference< XInputStream > rInput;
219 Reference< XOutputStream > rOutput;
220 {
221 MutexGuard guard( m_aMutex );
222 rInput = m_xInput;
223 m_xInput.clear();
224
225 rOutput = m_xOutput;
226 m_xOutput.clear();
227 m_xSucc.clear();
228 m_xPred.clear();
229 }
230 if( rInput.is() )
231 {
232 try
233 {
234 rInput->closeInput();
235 }
236 catch( Exception & )
237 {
238 // go down calm
239 }
240 }
241 if( rOutput.is() )
242 {
243 try
244 {
245 rOutput->closeOutput();
246 }
247 catch( Exception & )
248 {
249 // go down calm
250 }
251 }
252 }
253
static_run(void * pObject)254 void Pump::static_run( void* pObject )
255 {
256 ((Pump*)pObject)->run();
257 ((Pump*)pObject)->release();
258 }
259
run()260 void Pump::run()
261 {
262 try
263 {
264 fireStarted();
265 try
266 {
267 Reference< XInputStream > rInput;
268 Reference< XOutputStream > rOutput;
269 {
270 Guard< Mutex > aGuard( m_aMutex );
271 rInput = m_xInput;
272 rOutput = m_xOutput;
273 }
274
275 if( ! rInput.is() )
276 {
277 NotConnectedException exception(
278 OUString::createFromAscii( "no input stream set" ) , Reference<XInterface>((OWeakObject*)this) );
279 throw exception;
280 }
281 Sequence< sal_Int8 > aData;
282 while( rInput->readSomeBytes( aData, 65536 ) )
283 {
284 if( ! rOutput.is() )
285 {
286 NotConnectedException exception(
287 OUString::createFromAscii( "no output stream set" ) , Reference<XInterface>( (OWeakObject*)this) );
288 throw exception;
289 }
290 rOutput->writeBytes( aData );
291 osl_yieldThread();
292 }
293 }
294 catch ( IOException & e )
295 {
296 fireError( makeAny( e ) );
297 }
298 catch ( RuntimeException & e )
299 {
300 fireError( makeAny( e ) );
301 }
302 catch ( Exception & e )
303 {
304 fireError( makeAny( e ) );
305 }
306
307 close();
308 fireClose();
309 }
310 catch ( com::sun::star::uno::Exception &e )
311 {
312 // we are the last on the stack.
313 // this is to avoid crashing the program, when e.g. a bridge crashes
314 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
315 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception", sMessage.getStr() );
316 }
317 }
318
319 // ------------------------------------------------------------
320
321 /*
322 * XConnectable
323 */
324
setPredecessor(const Reference<XConnectable> & xPred)325 void Pump::setPredecessor( const Reference< XConnectable >& xPred ) throw()
326 {
327 Guard< Mutex > aGuard( m_aMutex );
328 m_xPred = xPred;
329 }
330
331 // ------------------------------------------------------------
332
getPredecessor()333 Reference< XConnectable > Pump::getPredecessor() throw()
334 {
335 Guard< Mutex > aGuard( m_aMutex );
336 return m_xPred;
337 }
338
339 // ------------------------------------------------------------
340
setSuccessor(const Reference<XConnectable> & xSucc)341 void Pump::setSuccessor( const Reference< XConnectable >& xSucc ) throw()
342 {
343 Guard< Mutex > aGuard( m_aMutex );
344 m_xSucc = xSucc;
345 }
346
347 // ------------------------------------------------------------
348
getSuccessor()349 Reference< XConnectable > Pump::getSuccessor() throw()
350 {
351 Guard< Mutex > aGuard( m_aMutex );
352 return m_xSucc;
353 }
354
355 // -----------------------------------------------------------------
356
357 /*
358 * XActiveDataControl
359 */
360
addListener(const Reference<XStreamListener> & xListener)361 void Pump::addListener( const Reference< XStreamListener >& xListener ) throw()
362 {
363 m_cnt.addInterface( xListener );
364 }
365
366 // ------------------------------------------------------------
367
removeListener(const Reference<XStreamListener> & xListener)368 void Pump::removeListener( const Reference< XStreamListener >& xListener ) throw()
369 {
370 m_cnt.removeInterface( xListener );
371 }
372
373 // ------------------------------------------------------------
374
start()375 void Pump::start() throw( RuntimeException )
376 {
377 Guard< Mutex > aGuard( m_aMutex );
378 m_aThread = osl_createSuspendedThread((oslWorkerFunction)Pump::static_run,this);
379 if( m_aThread )
380 {
381 // will be released by OPump::static_run
382 acquire();
383 osl_resumeThread( m_aThread );
384 }
385 else
386 {
387 throw RuntimeException(
388 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pump::start Couldn't create worker thread" )),
389 *this);
390 }
391 }
392
393 // ------------------------------------------------------------
394
terminate()395 void Pump::terminate() throw()
396 {
397 close();
398
399 // wait for the worker to die
400 if( m_aThread )
401 osl_joinWithThread( m_aThread );
402
403 fireTerminated();
404 fireClose();
405 }
406
407 // ------------------------------------------------------------
408
409 /*
410 * XActiveDataSink
411 */
412
setInputStream(const Reference<XInputStream> & xStream)413 void Pump::setInputStream( const Reference< XInputStream >& xStream ) throw()
414 {
415 Guard< Mutex > aGuard( m_aMutex );
416 m_xInput = xStream;
417 Reference< XConnectable > xConnect( xStream, UNO_QUERY );
418 if( xConnect.is() )
419 xConnect->setSuccessor( this );
420 // data transfer starts in XActiveDataControl::start
421 }
422
423 // ------------------------------------------------------------
424
getInputStream()425 Reference< XInputStream > Pump::getInputStream() throw()
426 {
427 Guard< Mutex > aGuard( m_aMutex );
428 return m_xInput;
429 }
430
431 // ------------------------------------------------------------
432
433 /*
434 * XActiveDataSource
435 */
436
setOutputStream(const Reference<XOutputStream> & xOut)437 void Pump::setOutputStream( const Reference< XOutputStream >& xOut ) throw()
438 {
439 Guard< Mutex > aGuard( m_aMutex );
440 m_xOutput = xOut;
441 Reference< XConnectable > xConnect( xOut, UNO_QUERY );
442 if( xConnect.is() )
443 xConnect->setPredecessor( this );
444 // data transfer starts in XActiveDataControl::start
445 }
446
447 // ------------------------------------------------------------
448
getOutputStream()449 Reference< XOutputStream > Pump::getOutputStream() throw()
450 {
451 Guard< Mutex > aGuard( m_aMutex );
452 return m_xOutput;
453 }
454
455
456 // XServiceInfo
getImplementationName()457 OUString Pump::getImplementationName() throw( )
458 {
459 return OPumpImpl_getImplementationName();
460 }
461
462 // XServiceInfo
supportsService(const OUString & ServiceName)463 sal_Bool Pump::supportsService(const OUString& ServiceName) throw( )
464 {
465 Sequence< OUString > aSNL = getSupportedServiceNames();
466 const OUString * pArray = aSNL.getConstArray();
467
468 for( sal_Int32 i = 0; i < aSNL.getLength(); i++ )
469 if( pArray[i] == ServiceName )
470 return sal_True;
471
472 return sal_False;
473 }
474
475 // XServiceInfo
getSupportedServiceNames(void)476 Sequence< OUString > Pump::getSupportedServiceNames(void) throw( )
477 {
478 return OPumpImpl_getSupportedServiceNames();
479 }
480
481
OPumpImpl_CreateInstance(const Reference<XComponentContext> &)482 Reference< XInterface > SAL_CALL OPumpImpl_CreateInstance( const Reference< XComponentContext > & ) throw (Exception)
483 {
484 return Reference< XInterface >( *new Pump );
485 }
486
OPumpImpl_getImplementationName()487 OUString OPumpImpl_getImplementationName()
488 {
489 return OUString( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.comp.io.Pump") );
490 }
491
OPumpImpl_getSupportedServiceNames(void)492 Sequence<OUString> OPumpImpl_getSupportedServiceNames(void)
493 {
494 OUString s( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.io.Pump" ) );
495 Sequence< OUString > seq( &s , 1 );
496 return seq;
497 }
498
499 }
500
501