xref: /trunk/main/io/source/acceptor/acc_socket.cxx (revision cdf0e10c)
1 /*************************************************************************
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * Copyright 2000, 2010 Oracle and/or its affiliates.
6  *
7  * OpenOffice.org - a multi-platform office productivity suite
8  *
9  * This file is part of OpenOffice.org.
10  *
11  * OpenOffice.org is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser General Public License version 3
13  * only, as published by the Free Software Foundation.
14  *
15  * OpenOffice.org is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU Lesser General Public License version 3 for more details
19  * (a copy is included in the LICENSE file that accompanied this code).
20  *
21  * You should have received a copy of the GNU Lesser General Public License
22  * version 3 along with OpenOffice.org.  If not, see
23  * <http://www.openoffice.org/license.html>
24  * for a copy of the LGPLv3 License.
25  *
26  ************************************************************************/
27 
28 // MARKER(update_precomp.py): autogen include statement, do not remove
29 #include "precompiled_io.hxx"
30 #include "acceptor.hxx"
31 
32 #include <hash_set>
33 #include <algorithm>
34 
35 #include <rtl/ustrbuf.hxx>
36 #include <com/sun/star/connection/XConnectionBroadcaster.hpp>
37 #include <com/sun/star/connection/ConnectionSetupException.hpp>
38 
39 #include <cppuhelper/implbase2.hxx>
40 
41 using namespace ::osl;
42 using namespace ::rtl;
43 using namespace ::cppu;
44 using namespace ::com::sun::star::uno;
45 using namespace ::com::sun::star::io;
46 using namespace ::com::sun::star::connection;
47 
48 
49 namespace io_acceptor {
50 	template<class T>
51 	struct ReferenceHash
52 	{
53 		size_t operator () (const ::com::sun::star::uno::Reference<T> & ref) const
54         {
55 			return (size_t)ref.get();
56 		}
57 	};
58 
59 	template<class T>
60 	struct ReferenceEqual
61 	{
62 		sal_Bool operator () (const ::com::sun::star::uno::Reference<T> & op1,
63 							  const ::com::sun::star::uno::Reference<T> & op2) const
64         {
65 			return op1.get() == op2.get();
66 		}
67 	};
68 
69 
70 	typedef ::std::hash_set< ::com::sun::star::uno::Reference< ::com::sun::star::io::XStreamListener>,
71                              ReferenceHash< ::com::sun::star::io::XStreamListener>,
72                              ReferenceEqual< ::com::sun::star::io::XStreamListener> >
73 	        XStreamListener_hash_set;
74 
75 
76 	class SocketConnection : public ::cppu::WeakImplHelper2<
77         ::com::sun::star::connection::XConnection,
78 		::com::sun::star::connection::XConnectionBroadcaster>
79 
80 	{
81 	public:
82 		SocketConnection( const OUString & sConnectionDescription );
83 		~SocketConnection();
84 
85 		virtual sal_Int32 SAL_CALL read( ::com::sun::star::uno::Sequence< sal_Int8 >& aReadBytes,
86 										 sal_Int32 nBytesToRead )
87 			throw(::com::sun::star::io::IOException,
88 				  ::com::sun::star::uno::RuntimeException);
89 		virtual void SAL_CALL write( const ::com::sun::star::uno::Sequence< sal_Int8 >& aData )
90 			throw(::com::sun::star::io::IOException,
91 				  ::com::sun::star::uno::RuntimeException);
92 		virtual void SAL_CALL flush(  ) throw(
93 			::com::sun::star::io::IOException,
94 			::com::sun::star::uno::RuntimeException);
95 		virtual void SAL_CALL close(  )
96 			throw(::com::sun::star::io::IOException,
97 				  ::com::sun::star::uno::RuntimeException);
98 		virtual ::rtl::OUString SAL_CALL getDescription(  )
99 			throw(::com::sun::star::uno::RuntimeException);
100 
101 		// XConnectionBroadcaster
102 		virtual void SAL_CALL addStreamListener(const ::com::sun::star::uno::Reference< ::com::sun::star::io::XStreamListener>& aListener)
103 			throw(::com::sun::star::uno::RuntimeException);
104 		virtual void SAL_CALL removeStreamListener(const ::com::sun::star::uno::Reference< ::com::sun::star::io::XStreamListener>& aListener)
105 			throw(::com::sun::star::uno::RuntimeException);
106 
107 	public:
108 		void completeConnectionString();
109 
110 		::osl::StreamSocket m_socket;
111 		::osl::SocketAddr m_addr;
112 		oslInterlockedCount m_nStatus;
113 		::rtl::OUString m_sDescription;
114 
115 		::osl::Mutex _mutex;
116 		sal_Bool     _started;
117 		sal_Bool     _closed;
118 		sal_Bool     _error;
119 		XStreamListener_hash_set _listeners;
120 	};
121 
122 	template<class T>
123 	void notifyListeners(SocketConnection * pCon, sal_Bool * notified, T t)
124 	{
125   		XStreamListener_hash_set listeners;
126 
127 		{
128 			::osl::MutexGuard guard(pCon->_mutex);
129 			if(!*notified)
130 			{
131 				*notified = sal_True;
132 				listeners = pCon->_listeners;
133 			}
134 		}
135 
136 		::std::for_each(listeners.begin(), listeners.end(), t);
137 	}
138 
139 	static void callStarted(Reference<XStreamListener> xStreamListener)
140 	{
141 		xStreamListener->started();
142 	}
143 
144 	struct callError {
145 		const Any & any;
146 
147 		callError(const Any & any);
148 
149 		void operator () (Reference<XStreamListener> xStreamListener);
150 	};
151 
152 	callError::callError(const Any & aAny)
153 		: any(aAny)
154 	{
155 	}
156 
157 	void callError::operator () (Reference<XStreamListener> xStreamListener)
158 	{
159 		xStreamListener->error(any);
160 	}
161 
162 	static void callClosed(Reference<XStreamListener> xStreamListener)
163 	{
164 		xStreamListener->closed();
165 	}
166 
167 
168 	SocketConnection::SocketConnection( const OUString &sConnectionDescription) :
169 		m_nStatus( 0 ),
170 		m_sDescription( sConnectionDescription ),
171 		_started(sal_False),
172 		_closed(sal_False),
173 		_error(sal_False)
174 	{
175 		g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt );
176 		// make it unique
177 		m_sDescription += OUString( RTL_CONSTASCII_USTRINGPARAM( ",uniqueValue=" ) );
178 		m_sDescription += OUString::valueOf(
179             sal::static_int_cast< sal_Int64 >(
180                 reinterpret_cast< sal_IntPtr >(&m_socket)),
181             10 );
182 	}
183 
184 	SocketConnection::~SocketConnection()
185 	{
186 		g_moduleCount.modCnt.release( &g_moduleCount.modCnt );
187 	}
188 
189 	void SocketConnection::completeConnectionString()
190 	{
191 		OUStringBuffer buf( 256 );
192 		buf.appendAscii( ",peerPort=" );
193 		buf.append( (sal_Int32) m_socket.getPeerPort() );
194 		buf.appendAscii( ",peerHost=" );
195 		buf.append( m_socket.getPeerHost( ) );
196 
197 		buf.appendAscii( ",localPort=" );
198 		buf.append( (sal_Int32) m_socket.getLocalPort() );
199 		buf.appendAscii( ",localHost=" );
200 		buf.append( m_socket.getLocalHost() );
201 
202 		m_sDescription += buf.makeStringAndClear();
203 	}
204 
205 	sal_Int32 SocketConnection::read( Sequence < sal_Int8 > & aReadBytes , sal_Int32 nBytesToRead )
206 			throw(::com::sun::star::io::IOException,
207 				  ::com::sun::star::uno::RuntimeException)
208 	{
209 		if( ! m_nStatus )
210 		{
211 			notifyListeners(this, &_started, callStarted);
212 
213 			if( aReadBytes.getLength() != nBytesToRead )
214 			{
215 				aReadBytes.realloc( nBytesToRead );
216 			}
217 
218 			sal_Int32 i = 0;
219 			i = m_socket.read( aReadBytes.getArray()  , aReadBytes.getLength() );
220 
221 			if(i != nBytesToRead)
222 			{
223 				OUString message(RTL_CONSTASCII_USTRINGPARAM("acc_socket.cxx:SocketConnection::read: error - "));
224 				message +=	m_socket.getErrorAsString();
225 
226 				IOException ioException(message, Reference<XInterface>(static_cast<XConnection *>(this)));
227 
228 				Any any;
229 				any <<= ioException;
230 
231 				notifyListeners(this, &_error, callError(any));
232 
233 				throw ioException;
234 			}
235 
236 			return i;
237 		}
238 		else
239 		{
240 			OUString message(RTL_CONSTASCII_USTRINGPARAM("acc_socket.cxx:SocketConnection::read: error - connection already closed"));
241 
242 			IOException ioException(message, Reference<XInterface>(static_cast<XConnection *>(this)));
243 
244 			Any any;
245 			any <<= ioException;
246 
247 			notifyListeners(this, &_error, callError(any));
248 
249 			throw ioException;
250 		}
251 	}
252 
253 	void SocketConnection::write( const Sequence < sal_Int8 > &seq )
254 			throw(::com::sun::star::io::IOException,
255 				  ::com::sun::star::uno::RuntimeException)
256 	{
257 		if( ! m_nStatus )
258 		{
259 			if( m_socket.write( seq.getConstArray() , seq.getLength() ) != seq.getLength() )
260 			{
261 				OUString message(RTL_CONSTASCII_USTRINGPARAM("acc_socket.cxx:SocketConnection::write: error - "));
262 				message += m_socket.getErrorAsString();
263 
264 				IOException ioException(message, Reference<XInterface>(static_cast<XConnection *>(this)));
265 
266 				Any any;
267 				any <<= ioException;
268 
269 				notifyListeners(this, &_error, callError(any));
270 
271 				throw ioException;
272 			}
273 		}
274 		else
275 		{
276 			OUString message(RTL_CONSTASCII_USTRINGPARAM("acc_socket.cxx:SocketConnection::write: error - connection already closed"));
277 
278 			IOException ioException(message, Reference<XInterface>(static_cast<XConnection *>(this)));
279 
280 			Any any;
281 			any <<= ioException;
282 
283 			notifyListeners(this, &_error, callError(any));
284 
285 			throw ioException;
286 		}
287 	}
288 
289 	void SocketConnection::flush( )
290 			throw(::com::sun::star::io::IOException,
291 				  ::com::sun::star::uno::RuntimeException)
292 	{
293 
294 	}
295 
296 	void SocketConnection::close()
297 			throw(::com::sun::star::io::IOException,
298 				  ::com::sun::star::uno::RuntimeException)
299 	{
300 		// enshure close is called only once
301 		if(  1 == osl_incrementInterlockedCount( (&m_nStatus) ) )
302 		{
303 			m_socket.shutdown();
304 			notifyListeners(this, &_closed, callClosed);
305 		}
306 	}
307 
308 	OUString SocketConnection::getDescription()
309 			throw( ::com::sun::star::uno::RuntimeException)
310 	{
311 		return m_sDescription;
312 	}
313 
314 
315 	// XConnectionBroadcaster
316 	void SAL_CALL SocketConnection::addStreamListener(const Reference<XStreamListener> & aListener) throw(RuntimeException)
317 	{
318 		MutexGuard guard(_mutex);
319 
320 		_listeners.insert(aListener);
321 	}
322 
323 	void SAL_CALL SocketConnection::removeStreamListener(const Reference<XStreamListener> & aListener) throw(RuntimeException)
324 	{
325 		MutexGuard guard(_mutex);
326 
327 		_listeners.erase(aListener);
328 	}
329 
330 	SocketAcceptor::SocketAcceptor( const OUString &sSocketName,
331 									sal_uInt16 nPort,
332 									sal_Bool bTcpNoDelay,
333 									const OUString &sConnectionDescription) :
334 		m_sSocketName( sSocketName ),
335 		m_sConnectionDescription( sConnectionDescription ),
336 		m_nPort( nPort ),
337 		m_bTcpNoDelay( bTcpNoDelay ),
338 		m_bClosed( sal_False )
339 	{
340 	}
341 
342 
343 	void SocketAcceptor::init()
344 	{
345 		if( ! m_addr.setPort( m_nPort ) )
346 		{
347 			OUStringBuffer message( 128 );
348 			message.appendAscii( "acc_socket.cxx:SocketAcceptor::init - error - invalid tcp/ip port " );
349 			message.append( (sal_Int32) m_nPort );
350 			throw ConnectionSetupException(
351 				message.makeStringAndClear() , Reference< XInterface> () );
352 		}
353 		if( ! m_addr.setHostname( m_sSocketName.pData ) )
354 		{
355 			OUStringBuffer message( 128 );
356 			message.appendAscii( "acc_socket.cxx:SocketAcceptor::init - error - invalid host " );
357 			message.append( m_sSocketName );
358 			throw ConnectionSetupException(
359 				message.makeStringAndClear(), Reference< XInterface > () );
360 		}
361 		m_socket.setOption( osl_Socket_OptionReuseAddr, 1);
362 
363 		if(! m_socket.bind(m_addr) )
364 		{
365 			OUStringBuffer message( 128 );
366 			message.appendAscii( "acc_socket.cxx:SocketAcceptor::init - error - couldn't bind on " );
367 			message.append( m_sSocketName ).appendAscii( ":" ).append((sal_Int32)m_nPort);
368 			throw ConnectionSetupException(
369 				message.makeStringAndClear(),
370 				Reference<XInterface>());
371 		}
372 
373 		if(! m_socket.listen() )
374 		{
375 			OUStringBuffer message( 128 );
376 			message.appendAscii( "acc_socket.cxx:SocketAcceptor::init - error - can't listen on " );
377 			message.append( m_sSocketName ).appendAscii( ":" ).append( (sal_Int32) m_nPort);
378 			throw ConnectionSetupException(	message.makeStringAndClear(),Reference<XInterface>() );
379 		}
380 	}
381 
382 	Reference< XConnection > SocketAcceptor::accept( )
383 	{
384 		SocketConnection *pConn = new SocketConnection( m_sConnectionDescription );
385 
386 		if( m_socket.acceptConnection( pConn->m_socket )!= osl_Socket_Ok )
387 		{
388 			// stopAccepting was called
389 			delete pConn;
390 			return Reference < XConnection > ();
391 		}
392 		if( m_bClosed )
393 		{
394 			delete pConn;
395 			return Reference < XConnection > ();
396 		}
397 
398 		pConn->completeConnectionString();
399 		if( m_bTcpNoDelay )
400 		{
401 			sal_Int32 nTcpNoDelay = sal_True;
402 			pConn->m_socket.setOption( osl_Socket_OptionTcpNoDelay , &nTcpNoDelay,
403 									   sizeof( nTcpNoDelay ) , osl_Socket_LevelTcp );
404 		}
405 
406 		return Reference < XConnection > ( (XConnection * ) pConn );
407 	}
408 
409 	void SocketAcceptor::stopAccepting()
410 	{
411 		m_bClosed = sal_True;
412 		m_socket.close();
413 	}
414 }
415 
416 
417