1#*************************************************************************
2#
3# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4#
5# Copyright 2000, 2010 Oracle and/or its affiliates.
6#
7# OpenOffice.org - a multi-platform office productivity suite
8#
9# This file is part of OpenOffice.org.
10#
11# OpenOffice.org is free software: you can redistribute it and/or modify
12# it under the terms of the GNU Lesser General Public License version 3
13# only, as published by the Free Software Foundation.
14#
15# OpenOffice.org is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18# GNU Lesser General Public License version 3 for more details
19# (a copy is included in the LICENSE file that accompanied this code).
20#
21# You should have received a copy of the GNU Lesser General Public License
22# version 3 along with OpenOffice.org.  If not, see
23# <http://www.openoffice.org/license.html>
24# for a copy of the LGPLv3 License.
25#
26#*************************************************************************
27"tests bridging python implementations of UNO objects"
28import unittest
29import uno
30import unohelper
31import os
32import sys
33
34from com.sun.star.io import XOutputStream, XInputStream, typeOfXOutputStream, typeOfXInputStream
35from com.sun.star.lang import XTypeProvider, typeOfXTypeProvider, XEventListener
36from com.sun.star.uno import XCurrentContext
37
38class SequenceOutputStream( unohelper.Base, XOutputStream ):
39      def __init__( self ):
40          self.s = uno.ByteSequence("")
41          self.closed = 0
42
43      def closeOutput(self):
44          self.closed = 1
45
46      def writeBytes( self, seq ):
47          self.s = self.s + seq
48
49      def flush( self ):
50          pass
51
52      def getSequence( self ):
53          return self.s
54
55
56class SequenceInputStream( XInputStream, unohelper.Base ):
57      def __init__( self, seq ):
58          self.s = seq
59          self.nIndex = 0
60          self.closed = 0
61
62      def closeInput( self):
63          self.closed = 1
64          self.s = None
65
66      def skipBytes( self, nByteCount ):
67          if( nByteCount + self.nIndex > len(self.s) ):
68              nByteCount = len(self.s) - self.nIndex
69          self.nIndex += nByteCount
70
71      def readBytes( self, retSeq, nByteCount ):
72          nRet = 0
73          if( self.nIndex + nByteCount > len(self.s) ):
74              nRet = len(self.s) - self.nIndex
75          else:
76              nRet = nByteCount
77          retSeq = uno.ByteSequence(self.s.value[self.nIndex : self.nIndex + nRet ])
78          self.nIndex = self.nIndex + nRet
79          return nRet, retSeq
80
81      def readSomeBytes( self, retSeq , nByteCount ):
82          #as we never block !
83          return readBytes( retSeq, nByteCount )
84
85      def available( self ):
86          return len( self.s ) - self.nIndex
87
88class SequenceInputStream2( SequenceInputStream ):
89      def __init__( self, seq ):
90            SequenceInputStream.__init__( self, seq )
91
92class TestCase(unittest.TestCase):
93      def __init__(self,method,ctx):
94          unittest.TestCase.__init__(self,method)
95          self.ctx = ctx
96
97      def setUp(self):
98          self.tobj = self.ctx.ServiceManager.createInstanceWithContext( \
99                           "com.sun.star.test.bridge.CppTestObject",self.ctx)
100          self.pipe = self.ctx.ServiceManager.createInstanceWithContext( \
101                           "com.sun.star.io.Pipe" , self.ctx )
102
103      def testStandard( self ):
104          dataOut = self.ctx.ServiceManager.createInstanceWithContext( \
105                        "com.sun.star.io.DataOutputStream", self.ctx )
106          streamOut = SequenceOutputStream()
107          dataOut.setOutputStream( streamOut )
108          dataOut.writeShort( 42 )
109          dataOut.writeLong( 43 )
110          dataOut.closeOutput()
111
112          dataInput = self.ctx.ServiceManager.createInstanceWithContext( \
113                   "com.sun.star.io.DataInputStream", self.ctx )
114
115          dataInput.setInputStream( SequenceInputStream2( streamOut.getSequence() ) )
116
117          self.failUnless( 42 == dataInput.readShort() )
118          self.failUnless( 43 == dataInput.readLong() )
119          self.failUnless( self.tobj.transportAny( streamOut ) == streamOut )
120
121
122class NullDevice:
123      def write( self, string ):
124            pass
125
126
127class EventListener( unohelper.Base, XEventListener ):
128    def __init__( self ):
129        self.disposingCalled = False
130
131    def disposing( self , eventObject ):
132        self.disposingCalled = True
133
134class TestHelperCase( unittest.TestCase ):
135
136      def __init__(self,method):
137            unittest.TestCase.__init__(self,method)
138
139      def testUrlHelper( self ):
140            systemPath = os.getcwd()
141            if systemPath.startswith( "/" ):
142                  self.failUnless( "/tmp" == unohelper.fileUrlToSystemPath( "file:///tmp" ) )
143                  self.failUnless( "file:///tmp" == unohelper.systemPathToFileUrl( "/tmp" ))
144            else:
145                  self.failUnless( "c:\\temp" == unohelper.fileUrlToSystemPath( "file:///c:/temp" ) )
146                  self.failUnless( "file:///c:/temp" == unohelper.systemPathToFileUrl( "c:\\temp" ) )
147
148            systemPath = unohelper.systemPathToFileUrl( systemPath )
149            self.failUnless( systemPath + "/a" == unohelper.absolutize( systemPath, "a" ) )
150      def testInspect( self ):
151            dev = NullDevice()
152#            dev = sys.stdout
153            unohelper.inspect( uno.getComponentContext() , dev )
154            unohelper.inspect( uno.getComponentContext().ServiceManager , dev )
155            unohelper.inspect( uno.getTypeByName( "com.sun.star.lang.XComponent" ) , dev )
156
157      def testListener( self ):
158            smgr = uno.getComponentContext().ServiceManager.createInstance(
159                  "com.sun.star.lang.ServiceManager" )
160
161            # check, whether listeners
162            listener = EventListener()
163            smgr.addEventListener( listener )
164            smgr.dispose()
165            self.failUnless( listener.disposingCalled )
166
167            # check, whether listeners can be removed
168            smgr = uno.getComponentContext().ServiceManager.createInstance(
169                  "com.sun.star.lang.ServiceManager" )
170            listener = EventListener()
171            smgr.addEventListener( listener )
172            smgr.removeEventListener( listener )
173            smgr.dispose()
174            self.failUnless( not listener.disposingCalled )
175
176      def testCurrentContext( self ):
177            oldContext = uno.getCurrentContext()
178            try:
179                  uno.setCurrentContext(
180                        unohelper.CurrentContext( oldContext,{"My42":42}) )
181                  self.failUnless( 42 == uno.getCurrentContext().getValueByName( "My42" ) )
182                  self.failUnless( None == uno.getCurrentContext().getValueByName( "My43" ) )
183            finally:
184                  uno.setCurrentContext( oldContext )
185
186
187
188def suite( ctx ):
189    suite = unittest.TestSuite()
190    suite.addTest(TestCase("testStandard",ctx))
191    suite.addTest(TestHelperCase( "testUrlHelper" ))
192    suite.addTest(TestHelperCase( "testInspect" ))
193    suite.addTest(TestHelperCase( "testListener" ) )
194    suite.addTest(TestHelperCase( "testCurrentContext" ) )
195    return suite
196
197