xref: /trunk/main/io/source/acceptor/acc_socket.cxx (revision 3716f815)
1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_io.hxx"
26 #include "acceptor.hxx"
27 
28 #include <hash_set>
29 #include <algorithm>
30 
31 #include <rtl/ustrbuf.hxx>
32 #include <com/sun/star/connection/XConnectionBroadcaster.hpp>
33 #include <com/sun/star/connection/ConnectionSetupException.hpp>
34 
35 #include <cppuhelper/implbase2.hxx>
36 
37 using namespace ::osl;
38 using namespace ::rtl;
39 using namespace ::cppu;
40 using namespace ::com::sun::star::uno;
41 using namespace ::com::sun::star::io;
42 using namespace ::com::sun::star::connection;
43 
44 
45 namespace io_acceptor {
46 	template<class T>
47 	struct ReferenceHash
48 	{
49 		size_t operator () (const ::com::sun::star::uno::Reference<T> & ref) const
50         {
51 			return (size_t)ref.get();
52 		}
53 	};
54 
55 	template<class T>
56 	struct ReferenceEqual
57 	{
58 		sal_Bool operator () (const ::com::sun::star::uno::Reference<T> & op1,
59 							  const ::com::sun::star::uno::Reference<T> & op2) const
60         {
61 			return op1.get() == op2.get();
62 		}
63 	};
64 
65 
66 	typedef ::std::hash_set< ::com::sun::star::uno::Reference< ::com::sun::star::io::XStreamListener>,
67                              ReferenceHash< ::com::sun::star::io::XStreamListener>,
68                              ReferenceEqual< ::com::sun::star::io::XStreamListener> >
69 	        XStreamListener_hash_set;
70 
71 
72 	class SocketConnection : public ::cppu::WeakImplHelper2<
73         ::com::sun::star::connection::XConnection,
74 		::com::sun::star::connection::XConnectionBroadcaster>
75 
76 	{
77 	public:
78 		SocketConnection( const OUString & sConnectionDescription );
79 		~SocketConnection();
80 
81 		virtual sal_Int32 SAL_CALL read( ::com::sun::star::uno::Sequence< sal_Int8 >& aReadBytes,
82 										 sal_Int32 nBytesToRead )
83 			throw(::com::sun::star::io::IOException,
84 				  ::com::sun::star::uno::RuntimeException);
85 		virtual void SAL_CALL write( const ::com::sun::star::uno::Sequence< sal_Int8 >& aData )
86 			throw(::com::sun::star::io::IOException,
87 				  ::com::sun::star::uno::RuntimeException);
88 		virtual void SAL_CALL flush(  ) throw(
89 			::com::sun::star::io::IOException,
90 			::com::sun::star::uno::RuntimeException);
91 		virtual void SAL_CALL close(  )
92 			throw(::com::sun::star::io::IOException,
93 				  ::com::sun::star::uno::RuntimeException);
94 		virtual ::rtl::OUString SAL_CALL getDescription(  )
95 			throw(::com::sun::star::uno::RuntimeException);
96 
97 		// XConnectionBroadcaster
98 		virtual void SAL_CALL addStreamListener(const ::com::sun::star::uno::Reference< ::com::sun::star::io::XStreamListener>& aListener)
99 			throw(::com::sun::star::uno::RuntimeException);
100 		virtual void SAL_CALL removeStreamListener(const ::com::sun::star::uno::Reference< ::com::sun::star::io::XStreamListener>& aListener)
101 			throw(::com::sun::star::uno::RuntimeException);
102 
103 	public:
104 		void completeConnectionString();
105 
106 		::osl::StreamSocket m_socket;
107 		::osl::SocketAddr m_addr;
108 		oslInterlockedCount m_nStatus;
109 		::rtl::OUString m_sDescription;
110 
111 		::osl::Mutex _mutex;
112 		sal_Bool     _started;
113 		sal_Bool     _closed;
114 		sal_Bool     _error;
115 		XStreamListener_hash_set _listeners;
116 	};
117 
118 	template<class T>
119 	void notifyListeners(SocketConnection * pCon, sal_Bool * notified, T t)
120 	{
121   		XStreamListener_hash_set listeners;
122 
123 		{
124 			::osl::MutexGuard guard(pCon->_mutex);
125 			if(!*notified)
126 			{
127 				*notified = sal_True;
128 				listeners = pCon->_listeners;
129 			}
130 		}
131 
132 		::std::for_each(listeners.begin(), listeners.end(), t);
133 	}
134 
135 	static void callStarted(Reference<XStreamListener> xStreamListener)
136 	{
137 		xStreamListener->started();
138 	}
139 
140 	struct callError {
141 		const Any & any;
142 
143 		callError(const Any & any);
144 
145 		void operator () (Reference<XStreamListener> xStreamListener);
146 	};
147 
148 	callError::callError(const Any & aAny)
149 		: any(aAny)
150 	{
151 	}
152 
153 	void callError::operator () (Reference<XStreamListener> xStreamListener)
154 	{
155 		xStreamListener->error(any);
156 	}
157 
158 	static void callClosed(Reference<XStreamListener> xStreamListener)
159 	{
160 		xStreamListener->closed();
161 	}
162 
163 
164 	SocketConnection::SocketConnection( const OUString &sConnectionDescription) :
165 		m_nStatus( 0 ),
166 		m_sDescription( sConnectionDescription ),
167 		_started(sal_False),
168 		_closed(sal_False),
169 		_error(sal_False)
170 	{
171 		g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt );
172 		// make it unique
173 		m_sDescription += OUString( RTL_CONSTASCII_USTRINGPARAM( ",uniqueValue=" ) );
174 		m_sDescription += OUString::valueOf(
175             sal::static_int_cast< sal_Int64 >(
176                 reinterpret_cast< sal_IntPtr >(&m_socket)),
177             10 );
178 	}
179 
180 	SocketConnection::~SocketConnection()
181 	{
182 		g_moduleCount.modCnt.release( &g_moduleCount.modCnt );
183 	}
184 
185 	void SocketConnection::completeConnectionString()
186 	{
187 		OUStringBuffer buf( 256 );
188 		buf.appendAscii( ",peerPort=" );
189 		buf.append( (sal_Int32) m_socket.getPeerPort() );
190 		buf.appendAscii( ",peerHost=" );
191 		buf.append( m_socket.getPeerHost( ) );
192 
193 		buf.appendAscii( ",localPort=" );
194 		buf.append( (sal_Int32) m_socket.getLocalPort() );
195 		buf.appendAscii( ",localHost=" );
196 		buf.append( m_socket.getLocalHost() );
197 
198 		m_sDescription += buf.makeStringAndClear();
199 	}
200 
201 	sal_Int32 SocketConnection::read( Sequence < sal_Int8 > & aReadBytes , sal_Int32 nBytesToRead )
202 			throw(::com::sun::star::io::IOException,
203 				  ::com::sun::star::uno::RuntimeException)
204 	{
205 		if( ! m_nStatus )
206 		{
207 			notifyListeners(this, &_started, callStarted);
208 
209 			if( aReadBytes.getLength() != nBytesToRead )
210 			{
211 				aReadBytes.realloc( nBytesToRead );
212 			}
213 
214 			sal_Int32 i = 0;
215 			i = m_socket.read( aReadBytes.getArray()  , aReadBytes.getLength() );
216 
217 			if(i != nBytesToRead)
218 			{
219 				OUString message(RTL_CONSTASCII_USTRINGPARAM("acc_socket.cxx:SocketConnection::read: error - "));
220 				message +=	m_socket.getErrorAsString();
221 
222 				IOException ioException(message, Reference<XInterface>(static_cast<XConnection *>(this)));
223 
224 				Any any;
225 				any <<= ioException;
226 
227 				notifyListeners(this, &_error, callError(any));
228 
229 				throw ioException;
230 			}
231 
232 			return i;
233 		}
234 		else
235 		{
236 			OUString message(RTL_CONSTASCII_USTRINGPARAM("acc_socket.cxx:SocketConnection::read: error - connection already closed"));
237 
238 			IOException ioException(message, Reference<XInterface>(static_cast<XConnection *>(this)));
239 
240 			Any any;
241 			any <<= ioException;
242 
243 			notifyListeners(this, &_error, callError(any));
244 
245 			throw ioException;
246 		}
247 	}
248 
249 	void SocketConnection::write( const Sequence < sal_Int8 > &seq )
250 			throw(::com::sun::star::io::IOException,
251 				  ::com::sun::star::uno::RuntimeException)
252 	{
253 		if( ! m_nStatus )
254 		{
255 			if( m_socket.write( seq.getConstArray() , seq.getLength() ) != seq.getLength() )
256 			{
257 				OUString message(RTL_CONSTASCII_USTRINGPARAM("acc_socket.cxx:SocketConnection::write: error - "));
258 				message += m_socket.getErrorAsString();
259 
260 				IOException ioException(message, Reference<XInterface>(static_cast<XConnection *>(this)));
261 
262 				Any any;
263 				any <<= ioException;
264 
265 				notifyListeners(this, &_error, callError(any));
266 
267 				throw ioException;
268 			}
269 		}
270 		else
271 		{
272 			OUString message(RTL_CONSTASCII_USTRINGPARAM("acc_socket.cxx:SocketConnection::write: error - connection already closed"));
273 
274 			IOException ioException(message, Reference<XInterface>(static_cast<XConnection *>(this)));
275 
276 			Any any;
277 			any <<= ioException;
278 
279 			notifyListeners(this, &_error, callError(any));
280 
281 			throw ioException;
282 		}
283 	}
284 
285 	void SocketConnection::flush( )
286 			throw(::com::sun::star::io::IOException,
287 				  ::com::sun::star::uno::RuntimeException)
288 	{
289 
290 	}
291 
292 	void SocketConnection::close()
293 			throw(::com::sun::star::io::IOException,
294 				  ::com::sun::star::uno::RuntimeException)
295 	{
296 		// enshure close is called only once
297 		if(  1 == osl_incrementInterlockedCount( (&m_nStatus) ) )
298 		{
299 			m_socket.shutdown();
300 			notifyListeners(this, &_closed, callClosed);
301 		}
302 	}
303 
304 	OUString SocketConnection::getDescription()
305 			throw( ::com::sun::star::uno::RuntimeException)
306 	{
307 		return m_sDescription;
308 	}
309 
310 
311 	// XConnectionBroadcaster
312 	void SAL_CALL SocketConnection::addStreamListener(const Reference<XStreamListener> & aListener) throw(RuntimeException)
313 	{
314 		MutexGuard guard(_mutex);
315 
316 		_listeners.insert(aListener);
317 	}
318 
319 	void SAL_CALL SocketConnection::removeStreamListener(const Reference<XStreamListener> & aListener) throw(RuntimeException)
320 	{
321 		MutexGuard guard(_mutex);
322 
323 		_listeners.erase(aListener);
324 	}
325 
326 	SocketAcceptor::SocketAcceptor( const OUString &sSocketName,
327 									sal_uInt16 nPort,
328 									sal_Bool bTcpNoDelay,
329 									const OUString &sConnectionDescription) :
330 		m_sSocketName( sSocketName ),
331 		m_sConnectionDescription( sConnectionDescription ),
332 		m_nPort( nPort ),
333 		m_bTcpNoDelay( bTcpNoDelay ),
334 		m_bClosed( sal_False )
335 	{
336 	}
337 
338 
339 	void SocketAcceptor::init()
340 	{
341 		if( ! m_addr.setPort( m_nPort ) )
342 		{
343 			OUStringBuffer message( 128 );
344 			message.appendAscii( "acc_socket.cxx:SocketAcceptor::init - error - invalid tcp/ip port " );
345 			message.append( (sal_Int32) m_nPort );
346 			throw ConnectionSetupException(
347 				message.makeStringAndClear() , Reference< XInterface> () );
348 		}
349 		if( ! m_addr.setHostname( m_sSocketName.pData ) )
350 		{
351 			OUStringBuffer message( 128 );
352 			message.appendAscii( "acc_socket.cxx:SocketAcceptor::init - error - invalid host " );
353 			message.append( m_sSocketName );
354 			throw ConnectionSetupException(
355 				message.makeStringAndClear(), Reference< XInterface > () );
356 		}
357 		m_socket.setOption( osl_Socket_OptionReuseAddr, 1);
358 
359 		if(! m_socket.bind(m_addr) )
360 		{
361 			OUStringBuffer message( 128 );
362 			message.appendAscii( "acc_socket.cxx:SocketAcceptor::init - error - couldn't bind on " );
363 			message.append( m_sSocketName ).appendAscii( ":" ).append((sal_Int32)m_nPort);
364 			throw ConnectionSetupException(
365 				message.makeStringAndClear(),
366 				Reference<XInterface>());
367 		}
368 
369 		if(! m_socket.listen() )
370 		{
371 			OUStringBuffer message( 128 );
372 			message.appendAscii( "acc_socket.cxx:SocketAcceptor::init - error - can't listen on " );
373 			message.append( m_sSocketName ).appendAscii( ":" ).append( (sal_Int32) m_nPort);
374 			throw ConnectionSetupException(	message.makeStringAndClear(),Reference<XInterface>() );
375 		}
376 	}
377 
378 	Reference< XConnection > SocketAcceptor::accept( )
379 	{
380 		SocketConnection *pConn = new SocketConnection( m_sConnectionDescription );
381 
382 		if( m_socket.acceptConnection( pConn->m_socket )!= osl_Socket_Ok )
383 		{
384 			// stopAccepting was called
385 			delete pConn;
386 			return Reference < XConnection > ();
387 		}
388 		if( m_bClosed )
389 		{
390 			delete pConn;
391 			return Reference < XConnection > ();
392 		}
393 
394 		pConn->completeConnectionString();
395 		if( m_bTcpNoDelay )
396 		{
397 			sal_Int32 nTcpNoDelay = sal_True;
398 			pConn->m_socket.setOption( osl_Socket_OptionTcpNoDelay , &nTcpNoDelay,
399 									   sizeof( nTcpNoDelay ) , osl_Socket_LevelTcp );
400 		}
401 
402 		return Reference < XConnection > ( (XConnection * ) pConn );
403 	}
404 
405 	void SocketAcceptor::stopAccepting()
406 	{
407 		m_bClosed = sal_True;
408 		m_socket.close();
409 	}
410 }
411 
412 
413