1#************************************************************** 2# 3# Licensed to the Apache Software Foundation (ASF) under one 4# or more contributor license agreements. See the NOTICE file 5# distributed with this work for additional information 6# regarding copyright ownership. The ASF licenses this file 7# to you under the Apache License, Version 2.0 (the 8# "License"); you may not use this file except in compliance 9# with the License. You may obtain a copy of the License at 10# 11# http://www.apache.org/licenses/LICENSE-2.0 12# 13# Unless required by applicable law or agreed to in writing, 14# software distributed under the License is distributed on an 15# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16# KIND, either express or implied. See the License for the 17# specific language governing permissions and limitations 18# under the License. 19# 20#************************************************************** 21"tests bridging python implementations of UNO objects" 22import unittest 23import uno 24import unohelper 25import os 26import sys 27 28from com.sun.star.io import XOutputStream, XInputStream, typeOfXOutputStream, typeOfXInputStream 29from com.sun.star.lang import XTypeProvider, typeOfXTypeProvider, XEventListener 30from com.sun.star.uno import XCurrentContext 31 32class SequenceOutputStream( unohelper.Base, XOutputStream ): 33 def __init__( self ): 34 self.s = uno.ByteSequence("") 35 self.closed = 0 36 37 def closeOutput(self): 38 self.closed = 1 39 40 def writeBytes( self, seq ): 41 self.s = self.s + seq 42 43 def flush( self ): 44 pass 45 46 def getSequence( self ): 47 return self.s 48 49 50class SequenceInputStream( XInputStream, unohelper.Base ): 51 def __init__( self, seq ): 52 self.s = seq 53 self.nIndex = 0 54 self.closed = 0 55 56 def closeInput( self): 57 self.closed = 1 58 self.s = None 59 60 def skipBytes( self, nByteCount ): 61 if( nByteCount + self.nIndex > len(self.s) ): 62 nByteCount = len(self.s) - self.nIndex 63 self.nIndex += nByteCount 64 65 def readBytes( self, retSeq, nByteCount ): 66 nRet = 0 67 if( self.nIndex + nByteCount > len(self.s) ): 68 nRet = len(self.s) - self.nIndex 69 else: 70 nRet = nByteCount 71 retSeq = uno.ByteSequence(self.s.value[self.nIndex : self.nIndex + nRet ]) 72 self.nIndex = self.nIndex + nRet 73 return nRet, retSeq 74 75 def readSomeBytes( self, retSeq , nByteCount ): 76 #as we never block ! 77 return readBytes( retSeq, nByteCount ) 78 79 def available( self ): 80 return len( self.s ) - self.nIndex 81 82class SequenceInputStream2( SequenceInputStream ): 83 def __init__( self, seq ): 84 SequenceInputStream.__init__( self, seq ) 85 86class TestCase(unittest.TestCase): 87 def __init__(self,method,ctx): 88 unittest.TestCase.__init__(self,method) 89 self.ctx = ctx 90 91 def setUp(self): 92 self.tobj = self.ctx.ServiceManager.createInstanceWithContext( \ 93 "com.sun.star.test.bridge.CppTestObject",self.ctx) 94 self.pipe = self.ctx.ServiceManager.createInstanceWithContext( \ 95 "com.sun.star.io.Pipe" , self.ctx ) 96 97 def testStandard( self ): 98 dataOut = self.ctx.ServiceManager.createInstanceWithContext( \ 99 "com.sun.star.io.DataOutputStream", self.ctx ) 100 streamOut = SequenceOutputStream() 101 dataOut.setOutputStream( streamOut ) 102 dataOut.writeShort( 42 ) 103 dataOut.writeLong( 43 ) 104 dataOut.closeOutput() 105 106 dataInput = self.ctx.ServiceManager.createInstanceWithContext( \ 107 "com.sun.star.io.DataInputStream", self.ctx ) 108 109 dataInput.setInputStream( SequenceInputStream2( streamOut.getSequence() ) ) 110 111 self.failUnless( 42 == dataInput.readShort() ) 112 self.failUnless( 43 == dataInput.readLong() ) 113 self.failUnless( self.tobj.transportAny( streamOut ) == streamOut ) 114 115 116class NullDevice: 117 def write( self, string ): 118 pass 119 120 121class EventListener( unohelper.Base, XEventListener ): 122 def __init__( self ): 123 self.disposingCalled = False 124 125 def disposing( self , eventObject ): 126 self.disposingCalled = True 127 128class TestHelperCase( unittest.TestCase ): 129 130 def __init__(self,method): 131 unittest.TestCase.__init__(self,method) 132 133 def testUrlHelper( self ): 134 systemPath = os.getcwd() 135 if systemPath.startswith( "/" ): 136 self.failUnless( "/tmp" == unohelper.fileUrlToSystemPath( "file:///tmp" ) ) 137 self.failUnless( "file:///tmp" == unohelper.systemPathToFileUrl( "/tmp" )) 138 else: 139 self.failUnless( "c:\\temp" == unohelper.fileUrlToSystemPath( "file:///c:/temp" ) ) 140 self.failUnless( "file:///c:/temp" == unohelper.systemPathToFileUrl( "c:\\temp" ) ) 141 142 systemPath = unohelper.systemPathToFileUrl( systemPath ) 143 self.failUnless( systemPath + "/a" == unohelper.absolutize( systemPath, "a" ) ) 144 def testInspect( self ): 145 dev = NullDevice() 146# dev = sys.stdout 147 unohelper.inspect( uno.getComponentContext() , dev ) 148 unohelper.inspect( uno.getComponentContext().ServiceManager , dev ) 149 unohelper.inspect( uno.getTypeByName( "com.sun.star.lang.XComponent" ) , dev ) 150 151 def testListener( self ): 152 smgr = uno.getComponentContext().ServiceManager.createInstance( 153 "com.sun.star.lang.ServiceManager" ) 154 155 # check, whether listeners 156 listener = EventListener() 157 smgr.addEventListener( listener ) 158 smgr.dispose() 159 self.failUnless( listener.disposingCalled ) 160 161 # check, whether listeners can be removed 162 smgr = uno.getComponentContext().ServiceManager.createInstance( 163 "com.sun.star.lang.ServiceManager" ) 164 listener = EventListener() 165 smgr.addEventListener( listener ) 166 smgr.removeEventListener( listener ) 167 smgr.dispose() 168 self.failUnless( not listener.disposingCalled ) 169 170 def testCurrentContext( self ): 171 oldContext = uno.getCurrentContext() 172 try: 173 uno.setCurrentContext( 174 unohelper.CurrentContext( oldContext,{"My42":42}) ) 175 self.failUnless( 42 == uno.getCurrentContext().getValueByName( "My42" ) ) 176 self.failUnless( None == uno.getCurrentContext().getValueByName( "My43" ) ) 177 finally: 178 uno.setCurrentContext( oldContext ) 179 180 181 182def suite( ctx ): 183 suite = unittest.TestSuite() 184 suite.addTest(TestCase("testStandard",ctx)) 185 suite.addTest(TestHelperCase( "testUrlHelper" )) 186 suite.addTest(TestHelperCase( "testInspect" )) 187 suite.addTest(TestHelperCase( "testListener" ) ) 188 suite.addTest(TestHelperCase( "testCurrentContext" ) ) 189 return suite 190 191