1 /************************************************************** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, 14 * software distributed under the License is distributed on an 15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16 * KIND, either express or implied. See the License for the 17 * specific language governing permissions and limitations 18 * under the License. 19 * 20 *************************************************************/ 21 22 23 24 package com.sun.star.lib.uno.environments.remote; 25 26 import com.sun.star.lib.uno.typedesc.MethodDescription; 27 import com.sun.star.lib.uno.typedesc.TypeDescription; 28 import complexlib.ComplexTestCase; 29 30 public class ThreadPool_Test extends ComplexTestCase { getTestObjectName()31 public String getTestObjectName() { 32 return getClass().getName(); 33 } 34 getTestMethodNames()35 public String[] getTestMethodNames() { 36 return new String[] { "testDispose", 37 "testThreadAsync", 38 "testDynamicThreadSync", 39 "testStaticThreadSync", 40 "testDynamicThreadAsyncSyncOrder", 41 "testStaticThreadAsyncSyncOrder", 42 "testStress", 43 "testAsyncSync" }; 44 } 45 testDispose()46 public void testDispose() throws InterruptedException { 47 IThreadPool iThreadPool = ThreadPoolManager.create(); 48 TestThread testThread = new TestThread(iThreadPool); 49 50 ThreadId threadId = null; 51 52 // start the test thread 53 synchronized(testThread) { 54 testThread.start(); 55 56 testThread.wait(); 57 58 threadId = testThread._threadId; 59 60 // let the thread attach and enter the threadpool 61 testThread.notifyAll(); 62 } 63 64 String message = "blabla"; 65 66 // terminate the test thread 67 synchronized(testThread) { 68 // put reply job 69 iThreadPool.dispose(new RuntimeException(message)); 70 71 testThread.wait(); 72 } 73 74 testThread.join(); 75 76 assure("", testThread._message.equals(message)); 77 } 78 testThreadAsync()79 public void testThreadAsync() throws InterruptedException { 80 TestWorkAt workAt = new TestWorkAt(); 81 82 ThreadId threadId = ThreadId.createFresh(); 83 84 // queue asyncs 85 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 86 Thread.yield(); // force scheduling 87 putJob(workAt, false, threadId, "increment"); 88 } 89 90 synchronized(workAt) { 91 putJob(workAt, false, threadId, "notifyme"); 92 93 while(!workAt._notified) 94 workAt.wait(); 95 } 96 97 assure("", workAt._counter == TestWorkAt.MESSAGES); 98 } 99 testDynamicThreadSync()100 public void testDynamicThreadSync() throws InterruptedException { 101 TestWorkAt workAt = new TestWorkAt(); 102 103 ThreadId threadId = ThreadId.createFresh(); 104 105 // queue asyncs 106 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 107 Thread.yield(); // force scheduling 108 putJob(workAt, true, threadId, "increment"); 109 } 110 111 synchronized(workAt) { 112 putJob(workAt, true, threadId, "notifyme"); 113 114 while(!workAt._notified) 115 workAt.wait(); 116 } 117 118 assure("", workAt._counter == TestWorkAt.MESSAGES); 119 } 120 testStaticThreadSync()121 public void testStaticThreadSync() throws InterruptedException { 122 TestWorkAt workAt = new TestWorkAt(); 123 124 TestThread testThread = new TestThread(); 125 126 ThreadId threadId = null; 127 128 // start the test thread 129 synchronized(testThread) { 130 testThread.start(); 131 132 testThread.wait(); 133 134 threadId = testThread._threadId; 135 136 // let the thread attach and enter the threadpool 137 testThread.notifyAll(); 138 } 139 140 // queue syncs 141 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 142 Thread.yield(); // force scheduling 143 putJob(workAt, true, threadId, "increment"); 144 } 145 146 // terminate the test thread 147 synchronized(testThread) { 148 // put reply job 149 putJob(workAt, true, threadId, null); 150 151 testThread.wait(); 152 } 153 154 testThread.join(); 155 156 assure("", workAt._counter == TestWorkAt.MESSAGES); 157 } 158 testDynamicThreadAsyncSyncOrder()159 public void testDynamicThreadAsyncSyncOrder() throws InterruptedException { 160 TestWorkAt workAt = new TestWorkAt(); 161 162 ThreadId threadId = ThreadId.createFresh(); 163 164 // queue asyncs 165 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 166 Thread.yield(); // force scheduling 167 putJob(workAt, false, threadId, "asyncCall"); 168 } 169 170 // queue syncs 171 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 172 Thread.yield(); // force scheduling 173 putJob(workAt, true, threadId, "syncCall"); 174 } 175 176 synchronized(workAt) { 177 putJob(workAt, true, threadId, "notifyme"); 178 179 while(!workAt._notified) 180 workAt.wait(); 181 } 182 183 assure("", workAt.passedAsyncTest()); 184 } 185 testStaticThreadAsyncSyncOrder()186 public void testStaticThreadAsyncSyncOrder() throws InterruptedException { 187 TestWorkAt workAt = new TestWorkAt(); 188 189 TestThread testThread = new TestThread(); 190 191 // start the test thread 192 synchronized(testThread) { 193 testThread.start(); 194 195 testThread.wait(); 196 } 197 198 ThreadId threadId = testThread._threadId; 199 200 // queue asyncs 201 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 202 Thread.yield(); // force scheduling 203 putJob(workAt, false, threadId, "asyncCall"); 204 } 205 206 // let the thread attach and enter the threadpool 207 synchronized(testThread) { 208 testThread.notifyAll(); 209 } 210 211 // queue syncs 212 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 213 Thread.yield(); // force scheduling 214 putJob(workAt, true, threadId, "syncCall"); 215 } 216 217 // terminate the test thread 218 synchronized(testThread) { 219 // put reply job 220 putJob(workAt, true, threadId, null); 221 222 testThread.wait(); 223 } 224 225 testThread.join(); 226 227 assure("", workAt.passedAsyncTest()); 228 } 229 testStress()230 public void testStress() throws InterruptedException { 231 TestWorkAt workAt = new TestWorkAt(); 232 for (int i = 0; i < TestWorkAt.MESSAGES; ++i) { 233 Thread.yield(); // force scheduling 234 ThreadId threadID = ThreadId.createFresh(); 235 putJob(workAt, true, threadID, "increment"); 236 putJob(workAt, false, threadID, "increment"); 237 } 238 synchronized (workAt) { 239 while (workAt._counter < 2 * TestWorkAt.MESSAGES) { 240 workAt.wait(); 241 } 242 } 243 244 abstract class Stress extends Thread { 245 public Stress(int count) { 246 this.count = count; 247 } 248 249 public void run() { 250 try { 251 for (int i = 0; i < count; ++i) { 252 runTest(); 253 } 254 } catch (Throwable e) { 255 e.printStackTrace(System.err); 256 } 257 } 258 259 protected abstract void runTest() throws InterruptedException; 260 261 private final int count; 262 } 263 264 Stress stress1 = new Stress(50) { 265 protected void runTest() throws InterruptedException { 266 testThreadAsync(); 267 } 268 }; 269 stress1.start(); 270 271 Stress stress2 = new Stress(50) { 272 protected void runTest() throws InterruptedException { 273 testDynamicThreadSync(); 274 } 275 }; 276 stress2.start(); 277 278 Stress stress3 = new Stress(50) { 279 protected void runTest() throws InterruptedException { 280 testStaticThreadSync(); 281 } 282 }; 283 stress3.start(); 284 285 Stress stress4 = new Stress(50) { 286 protected void runTest() throws InterruptedException { 287 testDynamicThreadAsyncSyncOrder(); 288 } 289 }; 290 stress4.start(); 291 292 Stress stress5 = new Stress(50) { 293 protected void runTest() throws InterruptedException { 294 testStaticThreadAsyncSyncOrder(); 295 } 296 }; 297 stress5.start(); 298 299 Stress stress6 = new Stress(500) { 300 protected void runTest() throws InterruptedException { 301 testDispose(); 302 } 303 }; 304 stress6.start(); 305 306 stress1.join(); 307 stress2.join(); 308 stress3.join(); 309 stress4.join(); 310 stress5.join(); 311 stress6.join(); 312 } 313 testAsyncSync()314 public void testAsyncSync() throws InterruptedException { 315 TestWorkAt workAt = new TestWorkAt(); 316 ThreadId threadId = ThreadId.createFresh(); 317 MyWorkAt myWorkAt = new MyWorkAt( workAt ); 318 319 // queue asyncs 320 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 321 if( i == 2 ) 322 { 323 putJob( myWorkAt, false , threadId, "asyncCall" ); 324 } 325 putJob(workAt, false, threadId, "asyncCall"); 326 } 327 328 synchronized(workAt) { 329 putJob(workAt, false, threadId, "notifyme"); 330 331 while(!workAt._notified) 332 workAt.wait(); 333 } 334 335 assure("", 336 workAt._async_counter == TestWorkAt.MESSAGES 337 && myWorkAt._success); 338 } 339 putJob(TestIWorkAt iWorkAt, boolean synchron, ThreadId threadId, String operation)340 private static void putJob(TestIWorkAt iWorkAt, boolean synchron, 341 ThreadId threadId, String operation) { 342 __iThreadPool.putJob( 343 new Job(iWorkAt, __iReceiver, 344 new Message( 345 threadId, operation != null, "oid", __workAt_td, 346 (operation == null 347 ? null 348 : ((MethodDescription) 349 __workAt_td.getMethodDescription(operation))), 350 synchron, null, false, null, null))); 351 } 352 353 private static final class TestThread extends Thread { 354 ThreadId _threadId; 355 Object _disposeId = new Object(); 356 String _message; 357 IThreadPool _iThreadPool; 358 TestThread()359 TestThread() { 360 this(__iThreadPool); 361 } 362 TestThread(IThreadPool iThreadPool)363 TestThread(IThreadPool iThreadPool) { 364 _iThreadPool = iThreadPool; 365 } 366 run()367 public void run() { 368 _threadId = _iThreadPool.getThreadId(); 369 370 371 try { 372 synchronized(this) { 373 // notify that we are running 374 notify(); 375 376 _iThreadPool.attach(); 377 378 // wait until we should continue 379 wait(); 380 } 381 382 _iThreadPool.enter(); 383 } 384 catch(Throwable throwable) { 385 _message = throwable.getMessage(); 386 } 387 388 _iThreadPool.detach(); 389 390 synchronized(this) { 391 // notify the listeners that we are dying 392 notifyAll(); 393 } 394 } 395 } 396 397 private static final class MyWorkAt implements TestIWorkAt { MyWorkAt( TestWorkAt async_WorkAt )398 public MyWorkAt( TestWorkAt async_WorkAt ) { 399 _async_WorkAt = async_WorkAt; 400 } 401 syncCall()402 public void syncCall() throws Throwable 403 { 404 Message iMessage = new Message( 405 __iThreadPool.getThreadId(), false, "oid", __workAt_td, null, 406 false, null, false, null, null); 407 408 // marshal reply 409 ThreadPool_Test.__iThreadPool.putJob( 410 new Job(this, ThreadPool_Test. __iReceiver, iMessage)); 411 } 412 asyncCall()413 public void asyncCall() throws Throwable { 414 for (int i = 0 ; i < 5 ; ++i) { 415 ThreadPool_Test.__iThreadPool.attach(); 416 ThreadPool_Test.putJob(this, true, __iThreadPool.getThreadId(), 417 "syncCall"); 418 // wait for reply 419 ThreadPool_Test.__iThreadPool.enter(); 420 ThreadPool_Test.__iThreadPool.detach(); 421 } 422 // async must have waited for this call 423 _success = _async_WorkAt._async_counter == 2; 424 } 425 increment()426 public void increment() throws Throwable {} 427 notifyme()428 public void notifyme() {} 429 430 public boolean _success = false; 431 432 private final TestWorkAt _async_WorkAt; 433 } 434 435 private static final IThreadPool __iThreadPool = ThreadPoolManager.create(); 436 private static final IReceiver __iReceiver = new TestReceiver(); 437 private static final TypeDescription __workAt_td 438 = TypeDescription.getTypeDescription(TestIWorkAt.class); 439 } 440