1 /************************************************************************* 2 * 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * Copyright 2000, 2010 Oracle and/or its affiliates. 6 * 7 * OpenOffice.org - a multi-platform office productivity suite 8 * 9 * This file is part of OpenOffice.org. 10 * 11 * OpenOffice.org is free software: you can redistribute it and/or modify 12 * it under the terms of the GNU Lesser General Public License version 3 13 * only, as published by the Free Software Foundation. 14 * 15 * OpenOffice.org is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 * GNU Lesser General Public License version 3 for more details 19 * (a copy is included in the LICENSE file that accompanied this code). 20 * 21 * You should have received a copy of the GNU Lesser General Public License 22 * version 3 along with OpenOffice.org. If not, see 23 * <http://www.openoffice.org/license.html> 24 * for a copy of the LGPLv3 License. 25 * 26 ************************************************************************/ 27 28 // MARKER(update_precomp.py): autogen include statement, do not remove 29 #include "precompiled_io.hxx" 30 31 #include <stdio.h> 32 33 #include <osl/diagnose.h> 34 35 #include <com/sun/star/io/XActiveDataSource.hpp> 36 #include <com/sun/star/io/XActiveDataSink.hpp> 37 #include <com/sun/star/io/XActiveDataControl.hpp> 38 #include <com/sun/star/io/XConnectable.hpp> 39 #include <com/sun/star/lang/XSingleServiceFactory.hpp> 40 #include <com/sun/star/lang/XMultiServiceFactory.hpp> 41 #include <com/sun/star/lang/XServiceInfo.hpp> 42 #include <com/sun/star/registry/XRegistryKey.hpp> 43 44 #include <uno/dispatcher.h> 45 #include <uno/mapping.hxx> 46 #include <cppuhelper/implbase5.hxx> 47 #include <cppuhelper/factory.hxx> 48 #include <cppuhelper/interfacecontainer.hxx> 49 #include <osl/mutex.hxx> 50 #include <osl/thread.h> 51 52 53 using namespace osl; 54 using namespace std; 55 using namespace rtl; 56 using namespace cppu; 57 using namespace com::sun::star::uno; 58 using namespace com::sun::star::lang; 59 using namespace com::sun::star::registry; 60 using namespace com::sun::star::io; 61 62 #include "factreg.hxx" 63 64 namespace io_stm { 65 66 class Pump : public WeakImplHelper5< 67 XActiveDataSource, XActiveDataSink, XActiveDataControl, XConnectable, XServiceInfo > 68 { 69 Mutex m_aMutex; 70 oslThread m_aThread; 71 72 Reference< XConnectable > m_xPred; 73 Reference< XConnectable > m_xSucc; 74 Reference< XInputStream > m_xInput; 75 Reference< XOutputStream > m_xOutput; 76 OInterfaceContainerHelper m_cnt; 77 sal_Bool m_closeFired; 78 79 void run(); 80 static void static_run( void* pObject ); 81 82 void close(); 83 void fireClose(); 84 void fireStarted(); 85 void fireTerminated(); 86 void fireError( const Any &a ); 87 88 public: 89 Pump(); 90 virtual ~Pump(); 91 92 // XActiveDataSource 93 virtual void SAL_CALL setOutputStream( const Reference< ::com::sun::star::io::XOutputStream >& xOutput ) throw(); 94 virtual Reference< ::com::sun::star::io::XOutputStream > SAL_CALL getOutputStream() throw(); 95 96 // XActiveDataSink 97 virtual void SAL_CALL setInputStream( const Reference< ::com::sun::star::io::XInputStream >& xStream ) throw(); 98 virtual Reference< ::com::sun::star::io::XInputStream > SAL_CALL getInputStream() throw(); 99 100 // XActiveDataControl 101 virtual void SAL_CALL addListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw(); 102 virtual void SAL_CALL removeListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw(); 103 virtual void SAL_CALL start() throw( RuntimeException ); 104 virtual void SAL_CALL terminate() throw(); 105 106 // XConnectable 107 virtual void SAL_CALL setPredecessor( const Reference< ::com::sun::star::io::XConnectable >& xPred ) throw(); 108 virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getPredecessor() throw(); 109 virtual void SAL_CALL setSuccessor( const Reference< ::com::sun::star::io::XConnectable >& xSucc ) throw(); 110 virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getSuccessor() throw(); 111 112 public: // XServiceInfo 113 virtual OUString SAL_CALL getImplementationName() throw( ); 114 virtual Sequence< OUString > SAL_CALL getSupportedServiceNames(void) throw( ); 115 virtual sal_Bool SAL_CALL supportsService(const OUString& ServiceName) throw( ); 116 }; 117 118 Pump::Pump() : m_aThread( 0 ), 119 m_cnt( m_aMutex ), 120 m_closeFired( sal_False ) 121 { 122 g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt ); 123 } 124 125 Pump::~Pump() 126 { 127 // exit gracefully 128 if( m_aThread ) 129 { 130 osl_joinWithThread( m_aThread ); 131 osl_destroyThread( m_aThread ); 132 } 133 g_moduleCount.modCnt.release( &g_moduleCount.modCnt ); 134 } 135 136 void Pump::fireError( const Any & exception ) 137 { 138 OInterfaceIteratorHelper iter( m_cnt ); 139 while( iter.hasMoreElements() ) 140 { 141 try 142 { 143 static_cast< XStreamListener * > ( iter.next() )->error( exception ); 144 } 145 catch ( RuntimeException &e ) 146 { 147 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); 148 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() ); 149 } 150 } 151 } 152 153 void Pump::fireClose() 154 { 155 sal_Bool bFire = sal_False; 156 { 157 MutexGuard guard( m_aMutex ); 158 if( ! m_closeFired ) 159 { 160 m_closeFired = sal_True; 161 bFire = sal_True; 162 } 163 } 164 165 if( bFire ) 166 { 167 OInterfaceIteratorHelper iter( m_cnt ); 168 while( iter.hasMoreElements() ) 169 { 170 try 171 { 172 static_cast< XStreamListener * > ( iter.next() )->closed( ); 173 } 174 catch ( RuntimeException &e ) 175 { 176 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); 177 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() ); 178 } 179 } 180 } 181 } 182 183 void Pump::fireStarted() 184 { 185 OInterfaceIteratorHelper iter( m_cnt ); 186 while( iter.hasMoreElements() ) 187 { 188 try 189 { 190 static_cast< XStreamListener * > ( iter.next() )->started( ); 191 } 192 catch ( RuntimeException &e ) 193 { 194 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); 195 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() ); 196 } 197 } 198 } 199 200 void Pump::fireTerminated() 201 { 202 OInterfaceIteratorHelper iter( m_cnt ); 203 while( iter.hasMoreElements() ) 204 { 205 try 206 { 207 static_cast< XStreamListener * > ( iter.next() )->terminated(); 208 } 209 catch ( RuntimeException &e ) 210 { 211 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); 212 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() ); 213 } 214 } 215 } 216 217 218 219 void Pump::close() 220 { 221 // close streams and release references 222 Reference< XInputStream > rInput; 223 Reference< XOutputStream > rOutput; 224 { 225 MutexGuard guard( m_aMutex ); 226 rInput = m_xInput; 227 m_xInput.clear(); 228 229 rOutput = m_xOutput; 230 m_xOutput.clear(); 231 m_xSucc.clear(); 232 m_xPred.clear(); 233 } 234 if( rInput.is() ) 235 { 236 try 237 { 238 rInput->closeInput(); 239 } 240 catch( Exception & ) 241 { 242 // go down calm 243 } 244 } 245 if( rOutput.is() ) 246 { 247 try 248 { 249 rOutput->closeOutput(); 250 } 251 catch( Exception & ) 252 { 253 // go down calm 254 } 255 } 256 } 257 258 void Pump::static_run( void* pObject ) 259 { 260 ((Pump*)pObject)->run(); 261 ((Pump*)pObject)->release(); 262 } 263 264 void Pump::run() 265 { 266 try 267 { 268 fireStarted(); 269 try 270 { 271 Reference< XInputStream > rInput; 272 Reference< XOutputStream > rOutput; 273 { 274 Guard< Mutex > aGuard( m_aMutex ); 275 rInput = m_xInput; 276 rOutput = m_xOutput; 277 } 278 279 if( ! rInput.is() ) 280 { 281 NotConnectedException exception( 282 OUString::createFromAscii( "no input stream set" ) , Reference<XInterface>((OWeakObject*)this) ); 283 throw exception; 284 } 285 Sequence< sal_Int8 > aData; 286 while( rInput->readSomeBytes( aData, 65536 ) ) 287 { 288 if( ! rOutput.is() ) 289 { 290 NotConnectedException exception( 291 OUString::createFromAscii( "no output stream set" ) , Reference<XInterface>( (OWeakObject*)this) ); 292 throw exception; 293 } 294 rOutput->writeBytes( aData ); 295 osl_yieldThread(); 296 } 297 } 298 catch ( IOException & e ) 299 { 300 fireError( makeAny( e ) ); 301 } 302 catch ( RuntimeException & e ) 303 { 304 fireError( makeAny( e ) ); 305 } 306 catch ( Exception & e ) 307 { 308 fireError( makeAny( e ) ); 309 } 310 311 close(); 312 fireClose(); 313 } 314 catch ( com::sun::star::uno::Exception &e ) 315 { 316 // we are the last on the stack. 317 // this is to avoid crashing the program, when e.g. a bridge crashes 318 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); 319 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception", sMessage.getStr() ); 320 } 321 } 322 323 // ------------------------------------------------------------ 324 325 /* 326 * XConnectable 327 */ 328 329 void Pump::setPredecessor( const Reference< XConnectable >& xPred ) throw() 330 { 331 Guard< Mutex > aGuard( m_aMutex ); 332 m_xPred = xPred; 333 } 334 335 // ------------------------------------------------------------ 336 337 Reference< XConnectable > Pump::getPredecessor() throw() 338 { 339 Guard< Mutex > aGuard( m_aMutex ); 340 return m_xPred; 341 } 342 343 // ------------------------------------------------------------ 344 345 void Pump::setSuccessor( const Reference< XConnectable >& xSucc ) throw() 346 { 347 Guard< Mutex > aGuard( m_aMutex ); 348 m_xSucc = xSucc; 349 } 350 351 // ------------------------------------------------------------ 352 353 Reference< XConnectable > Pump::getSuccessor() throw() 354 { 355 Guard< Mutex > aGuard( m_aMutex ); 356 return m_xSucc; 357 } 358 359 // ----------------------------------------------------------------- 360 361 /* 362 * XActiveDataControl 363 */ 364 365 void Pump::addListener( const Reference< XStreamListener >& xListener ) throw() 366 { 367 m_cnt.addInterface( xListener ); 368 } 369 370 // ------------------------------------------------------------ 371 372 void Pump::removeListener( const Reference< XStreamListener >& xListener ) throw() 373 { 374 m_cnt.removeInterface( xListener ); 375 } 376 377 // ------------------------------------------------------------ 378 379 void Pump::start() throw( RuntimeException ) 380 { 381 Guard< Mutex > aGuard( m_aMutex ); 382 m_aThread = osl_createSuspendedThread((oslWorkerFunction)Pump::static_run,this); 383 if( m_aThread ) 384 { 385 // will be released by OPump::static_run 386 acquire(); 387 osl_resumeThread( m_aThread ); 388 } 389 else 390 { 391 throw RuntimeException( 392 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pump::start Couldn't create worker thread" )), 393 *this); 394 } 395 } 396 397 // ------------------------------------------------------------ 398 399 void Pump::terminate() throw() 400 { 401 close(); 402 403 // wait for the worker to die 404 if( m_aThread ) 405 osl_joinWithThread( m_aThread ); 406 407 fireTerminated(); 408 fireClose(); 409 } 410 411 // ------------------------------------------------------------ 412 413 /* 414 * XActiveDataSink 415 */ 416 417 void Pump::setInputStream( const Reference< XInputStream >& xStream ) throw() 418 { 419 Guard< Mutex > aGuard( m_aMutex ); 420 m_xInput = xStream; 421 Reference< XConnectable > xConnect( xStream, UNO_QUERY ); 422 if( xConnect.is() ) 423 xConnect->setSuccessor( this ); 424 // data transfer starts in XActiveDataControl::start 425 } 426 427 // ------------------------------------------------------------ 428 429 Reference< XInputStream > Pump::getInputStream() throw() 430 { 431 Guard< Mutex > aGuard( m_aMutex ); 432 return m_xInput; 433 } 434 435 // ------------------------------------------------------------ 436 437 /* 438 * XActiveDataSource 439 */ 440 441 void Pump::setOutputStream( const Reference< XOutputStream >& xOut ) throw() 442 { 443 Guard< Mutex > aGuard( m_aMutex ); 444 m_xOutput = xOut; 445 Reference< XConnectable > xConnect( xOut, UNO_QUERY ); 446 if( xConnect.is() ) 447 xConnect->setPredecessor( this ); 448 // data transfer starts in XActiveDataControl::start 449 } 450 451 // ------------------------------------------------------------ 452 453 Reference< XOutputStream > Pump::getOutputStream() throw() 454 { 455 Guard< Mutex > aGuard( m_aMutex ); 456 return m_xOutput; 457 } 458 459 460 // XServiceInfo 461 OUString Pump::getImplementationName() throw( ) 462 { 463 return OPumpImpl_getImplementationName(); 464 } 465 466 // XServiceInfo 467 sal_Bool Pump::supportsService(const OUString& ServiceName) throw( ) 468 { 469 Sequence< OUString > aSNL = getSupportedServiceNames(); 470 const OUString * pArray = aSNL.getConstArray(); 471 472 for( sal_Int32 i = 0; i < aSNL.getLength(); i++ ) 473 if( pArray[i] == ServiceName ) 474 return sal_True; 475 476 return sal_False; 477 } 478 479 // XServiceInfo 480 Sequence< OUString > Pump::getSupportedServiceNames(void) throw( ) 481 { 482 return OPumpImpl_getSupportedServiceNames(); 483 } 484 485 486 Reference< XInterface > SAL_CALL OPumpImpl_CreateInstance( const Reference< XComponentContext > & ) throw (Exception) 487 { 488 return Reference< XInterface >( *new Pump ); 489 } 490 491 OUString OPumpImpl_getImplementationName() 492 { 493 return OUString( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.comp.io.Pump") ); 494 } 495 496 Sequence<OUString> OPumpImpl_getSupportedServiceNames(void) 497 { 498 OUString s( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.io.Pump" ) ); 499 Sequence< OUString > seq( &s , 1 ); 500 return seq; 501 } 502 503 } 504 505