View Javadoc
1 /* 2 * Copyright (c) 2004, Bruce Lowery 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * - Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * - Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * - Neither the name of JEGG nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 package jegg; 30 import java.lang.reflect.InvocationTargetException; 31 import java.lang.reflect.Method; 32 import java.util.HashMap; 33 import java.util.List; 34 import java.util.Map; 35 import java.util.Set; 36 import java.util.Vector; 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 40 import jegg.queue.*; 41 import jegg.registry.*; 42 import jegg.timer.Timeout; 43 import jegg.timer.Timer; 44 import jegg.timer.TimeoutListener; 45 46 /*** 47 * Base class for implementing an egg. Concrete classes that extend this 48 * class will be plugged into the JEgg framework and be able to send and 49 * receive arbitrary messages. The Egg base class also provides 50 * convenience methods for the following: 51 * <ul> 52 * <li>Responding to a message (see {@link #respond(Object) respond})</li> 53 * <li>Publishing this egg's message port (see {@link #publishPort(String) publishPort})</li> 54 * <li>Looking up another egg's message port (@see {@link #requestPort(String) requestPort})</li> 55 * <li>Broadcasting a message (see {@link #send(Object) send}) 56 * <li>'Binding' to another egg's port (see {@link #bindToPort(Port) bindToPort})</li> 57 * <li>Creating an egg timer (see {@link #createTimer(long, long, boolean) createTimer})</li> 58 * <li>Creation of a JEgg message from an application object (see {@link #createMessage(Object)}) 59 * </ul> 60 * <p> 61 * The default constructor will assign this egg to the default thread. Use the 62 * constructor that takes a {@link jegg.Dispatcher Dispatcher} in order to 63 * assign the egg to a specific thread. 64 * <p> 65 * The derived class should implement handlers for the message types that it expects 66 * to receive. A message handler is implemented by overloading the {@link #handle(Object) handle} 67 * method with a version that accepts the expected message type. When the derived 68 * class is loaded, the <code>handle</code> methods are reflected and cached in 69 * a lookup table to improve the message delivery performance. 70 * <p> 71 * The message dispatcher assigned to the egg will only deliver one message at 72 * a time to the Egg subclass. If an egg is expected to take a long time to 73 * process some messages, then it should be assigned its own dispatcher (see 74 * above). 75 * <p> 76 * Messages are dispatched to an egg according to their priority, in the order 77 * they were sent. So, a more recent message with a high priority can be 78 * delivered to the egg before a message that was sent earlier but with a 79 * lower priority. 80 */ 81 public abstract class Egg 82 { 83 /*** The name of the [overloaded] method that subclasses must implement. */ 84 private static final String HANDLE_METHOD_NAME = "handle"; 85 /*** Message queue */ 86 private PriorityQueue _mqueue = new PriorityQueue(); 87 /*** This class*/ 88 private Class _class; 89 /*** The message dispatcher for this egg*/ 90 private Dispatcher _dispatcher; 91 /*** Lookup table */ 92 private Map _mtable = new HashMap(); 93 /*** Port */ 94 private Port _port; 95 /*** Name */ 96 private Object _id; 97 /*** True if stopped */ 98 private boolean _stopped = false; 99 /*** Current active message */ 100 private Message _currentMessage; 101 /*** Logger */ 102 private static final Log LOG = LogFactory.getLog(Egg.class); 103 /*** 104 * Constructor. The default dispatcher is used to execute this egg. 105 * @param id a unique object that can be used to identify this egg. 106 * @throws IllegalStateException if the derived class doesn't 107 * implement any 'handle' methods. 108 */ 109 public Egg(final Object id) 110 { 111 this(id, Dispatcher.getDefaultScheduler()); 112 if (LOG.isDebugEnabled()) 113 { 114 LOG.debug( 115 "Egg: class " 116 + this.getClass().getName() 117 + ": default constructed"); 118 } 119 } 120 121 /*** 122 * Construct an egg using a specific scheduler to execute the egg. 123 * @param id a unique object that can be used to identify this egg. 124 * @param s the scheduler to use to execute the egg. 125 */ 126 public Egg(final Object id, final Dispatcher s) 127 { 128 super(); 129 this._id = id; 130 _class = this.getClass(); 131 this.fillLookupTable(_class); 132 if (_mtable.isEmpty()) 133 { 134 throw new IllegalStateException( 135 "Class " 136 + _class.getName() 137 + " implements no '" 138 + HANDLE_METHOD_NAME 139 + "' methods"); 140 } 141 _dispatcher = s; 142 _dispatcher.add(this); 143 _port = new Port(this); 144 if (LOG.isDebugEnabled()) 145 { 146 LOG.debug( 147 "Egg: class " 148 + this.getClass().getName() 149 + ": constructed with scheduler"); 150 } 151 } 152 153 /*** 154 * Return the port used to deliver messages to this egg. 155 * @return this egg's message port. 156 */ 157 public final Port getPort() 158 { 159 return _port; 160 } 161 162 /*** 163 * Return the port of the egg that sent the current message. 164 * @return the sender's port 165 */ 166 protected final Port getFromPort() 167 { 168 return _currentMessage.getFrom(); 169 } 170 171 /*** 172 * Return the current message being processed. 173 * @return the`current message. 174 */ 175 protected final Message getCurrentMessage() 176 { 177 return _currentMessage; 178 } 179 180 /*** 181 * Publish this egg's message port in the port registry. Other eggs 182 * can lookup the port in the registry and use it to send messages to 183 * this egg. 184 * @param name the name to register the port under. 185 */ 186 protected final void publishPort(String n) 187 { 188 PortRegistry.getInstance().getPort().send(createMessage(new PublishPortMessage(n, getPort()))); 189 } 190 191 /*** 192 * Send a lookup request to the port registry. The port will be delivered 193 * to this egg using a {@link jegg.registry.LocatePortResponse LocatePortResponse}. 194 * <p> 195 * If the port is not registered in the registry when the registry receives 196 * the port request, the request will be saved until the target port is registered. 197 * 198 * @param name the name of the registered port. 199 */ 200 protected final void requestPort(String n) 201 { 202 PortRegistry.getInstance().getPort().send(createMessage(new LocatePortMessage(n))); 203 } 204 205 /*** 206 * Register to receive <i>all</i> messages broadcast by the egg that the 207 * specified port belongs to. A egg can broadcast a message using the 208 * {@link #send(Object) send} method. 209 * 210 * @param p the port to register for broadcast messages with. 211 */ 212 protected final void bindToPort(Port p) 213 { 214 p.connect(getPort()); 215 } 216 217 // TODO implement unbindFromPort(Port p) 218 219 /*** 220 * Send a message to the egg that sent the current message. 221 * The message will be sent with a <i>medium</i> priority 222 * level assigned to it. 223 * 224 * @param message the message to send to the sending egg. 225 */ 226 protected final void respond(Object message) 227 { 228 respond(message,Priority.MEDIUM); 229 } 230 231 /*** 232 * Send a message to the egg that sent the current message. 233 * The message will be sent with the specified priority 234 * level assigned to it. 235 * 236 * @param message the message to send to the sending egg. 237 * @param priority the priority to assign to the message. 238 */ 239 protected final void respond(Object message, Priority priority) 240 { 241 respond(getFromPort(),message,priority); 242 } 243 244 /*** 245 * Send a message to the egg that the specified port 246 * belongs to. This method is useful when a message 247 * response has to be delayed until later (so the port 248 * is saved for use later). The message will be sent 249 * with a medium priority level assigned to it. 250 * 251 * @param port the port to send the message on. 252 * @param message the message to send to the egg that 253 * the specified port belongs to. 254 */ 255 protected final void respond(Port port, Object message) 256 { 257 respond(port,message,Priority.MEDIUM); 258 } 259 260 /*** 261 * Send a message to the egg that the specified port 262 * belongs to. This method is useful when a message 263 * response has to be delayed until later (so the port 264 * is saved for use later). The message will be sent 265 * with the specified priority level assigned to it. 266 * 267 * @param port the port to send the message on. 268 * @param message the message to send to the egg that 269 * the specified port belongs to. 270 * @param priority the message's priority assignment. 271 */ 272 protected final void respond(Port port, Object message, Priority priority) 273 { 274 port.send(createMessage(message,priority)); 275 } 276 277 /*** 278 * Return the ID for this egg. The egg's ID is arbitrary and not used 279 * by the JEgg framework; this is purely for user convenience. 280 * 281 * @return the id assigned to this egg. 282 */ 283 final Object getId() 284 { 285 return _id; 286 } 287 288 /*** 289 * Process a message from another egg. Note that this method is 290 * declared public. This is because the dispatcher must have 291 * permission to invoke the handle methods in the derived class that 292 * will be in a different, application-defined package. 293 * 294 * @param message the message to process. 295 */ 296 public abstract void handle(final Object message); 297 298 /*** 299 * Broadcast a message to all eggs that have bound to this egg's port 300 * (see {@link #bindToPort(Port) bindToPort}). 301 * 302 * @param message the message to send. 303 */ 304 protected final void send(Object msg) 305 { 306 getPort().broadcast(msg); 307 } 308 309 /*** 310 * Create JEgg message from an application-defined message. The message 311 * will be created with a medium priority level assigned to it. 312 * Additionally, this egg's port will be saved in the JEgg message. 313 * <p> 314 * This is useful to do when the same message has to be sent to many 315 * different eggs. 316 * @param message the application message to wrap in a JEgg message. 317 * @return a JEgg message wrapping the application message. 318 */ 319 protected final Message createMessage(Object m) 320 { 321 return createMessage(m,Priority.MEDIUM); 322 } 323 324 /*** 325 * Create JEgg message from an application-defined message. The message 326 * will be created with the specified priority level assigned to it. 327 * Additionally, this egg's port will be saved in the JEgg message. 328 * <p> 329 * This is useful to do when the same message has to be sent to many 330 * different eggs. 331 * @param message the application message to wrap in a JEgg message. 332 * @return a JEgg message wrapping the application message. 333 */ 334 protected final Message createMessage(Object m, Priority p) 335 { 336 return new Message(m,getPort(),p); 337 } 338 339 /*** 340 * Create an egg timer. The timer will deliver {@link #jegg.timer.Timeout Timeout} 341 * messages to this egg until {@link #jegg.timer.Timer.cancel() Timer.cancel()} 342 * is called. 343 * <p> 344 * The timer is automatically started before being returned. 345 * 346 * @param interval_msec the number of milliseconds between timeout messages. 347 * @param delay_msec the initial delay that the timer will wait before it 348 * starts delivering timeout messages. 349 * @return A new timer instance. 350 */ 351 protected final Timer createRepeatingTimer(long interval_msec, long delay_msec) 352 { 353 TimeoutListener tl = new TimeoutListener() 354 { 355 public void timeout(Timer tt) 356 { 357 emitTimeout(tt); 358 } 359 }; 360 return Timer.createRepeatingTimer(tl, interval_msec, delay_msec); 361 } 362 363 /*** 364 * Create an egg timer. The timer will deliver a single {@link #jegg.timer.Timeout Timeout} 365 * message to this egg. 366 * @param delay_msec the number of milliseconds after which the timeout 367 * message will be delivered. 368 * @return a new timer instance. 369 */ 370 protected final Timer createSingleShotTimer(long delay_msec) 371 { 372 TimeoutListener tl = new TimeoutListener() 373 { 374 public void timeout(Timer tt) 375 { 376 emitTimeout(tt); 377 } 378 }; 379 return Timer.createSingleShotTimer(tl, delay_msec); 380 } 381 382 /*** 383 * Return this egg's message dispatcher. 384 * @return the dispatcher assigned to this egg. 385 */ 386 protected Dispatcher getDispatcher() 387 { 388 return _dispatcher; 389 } 390 391 /*** 392 * Send a timeout notification to the derived class. This method is 393 * used to deliver timeout messages from a running timer (see {@link 394 * #createTimer(long,long,boolean) createTimer}). 395 * 396 * @param t the timer that is sending the timeout message. 397 */ 398 private final void emitTimeout(Timer t) 399 { 400 getPort().send(createMessage(new Timeout(t))); 401 } 402 403 /*** 404 * Add a message to this egg's message queue. This method is only 405 * used by the egg's port when another egg sends a message to this egg. 406 * 407 * @param message the message to add to the queue. 408 */ 409 final void enqueue(final Message message) 410 { 411 if (_stopped) 412 { 413 throw new IllegalStateException("stopped"); 414 } 415 416 synchronized (_dispatcher) 417 { 418 if (LOG.isDebugEnabled()) 419 { 420 LOG.debug(_id.toString() + ": enqueue: " + message.toString()); 421 } 422 423 Priority p = message.getPriority(); 424 _mqueue.add(p,message); 425 426 if (LOG.isDebugEnabled()) 427 { 428 LOG.debug( 429 _id.toString() 430 + ": enqueue: has " 431 + _mqueue.size() 432 + " messages"); 433 } 434 435 if (LOG.isDebugEnabled()) 436 { 437 LOG.debug(_id.toString() + ": enqueue: notifying scheduler"); 438 } 439 440 _dispatcher.notifyAll(); 441 442 if (LOG.isDebugEnabled()) 443 { 444 LOG.debug(_id.toString() + ": enqueue: done"); 445 } 446 } 447 } 448 449 /*** 450 * Return next message in order of priority. 451 * @return 452 */ 453 final Message getNextMessage() 454 { 455 Message m = null; 456 try 457 { 458 m = (Message) _mqueue.next(); 459 } 460 catch (Throwable t) 461 { 462 // EMPTY 463 } 464 return m; 465 } 466 467 /*** 468 * Return the number of undelivered messages in this egg's 469 * message queue. 470 * @return the number of undelivered messages. 471 */ 472 final long getNumPendingMessages() 473 { 474 long num = _mqueue.size(); 475 if (LOG.isDebugEnabled()) 476 { 477 LOG.debug( 478 _id.toString() + ": getNumPendingMessages: " + Long.toString(num)); 479 } 480 return num; 481 } 482 /*** 483 * Deliver a message to the derived class handler. The message is 484 * delivered to the most specific handler based on the concrete type 485 * of the message. 486 * <p> 487 * If no handler can be found, the message is dropped. 488 * <p> 489 * This method is used by a message dispatcher to deliver a message to 490 * the egg. 491 * @param message to handle. 492 */ 493 final void dispatch(final Message incoming) 494 { 495 if (LOG.isDebugEnabled()) 496 { 497 LOG.debug(_id.toString() + ": Dispatching: " + incoming.toString()); 498 } 499 500 Object message = incoming.getMessage(); 501 Class msgType = message.getClass(); 502 Method method = lookup(msgType); 503 504 if (null == method) 505 { 506 LOG.error( 507 "No handler found - dropping message: " + incoming.toString()); 508 return; 509 } 510 try 511 { 512 if (LOG.isDebugEnabled()) 513 { 514 LOG.debug( 515 _id.toString() 516 + ": Invoking handler: " 517 + method.toString()); 518 } 519 _currentMessage = incoming; 520 method.invoke(this, new Object[] {message}); 521 } 522 catch (InvocationTargetException e) 523 { 524 LOG.error("Invocation error: "+e.getCause()); 525 e.printStackTrace(); 526 } 527 catch (Throwable t) 528 { 529 LOG.error( 530 this.getClass().getName()+": " + 531 "Error raised handling message [" 532 + incoming.toString() 533 + "]: " 534 + t); 535 } 536 finally 537 { 538 _currentMessage = null; 539 } 540 } 541 /*** 542 * Return the derived class handler method for a message of the 543 * type specified by the argument. 544 * @param argType the argument type of the handler to return. 545 * @return the handler for messages of the specified type. 546 */ 547 private final Method lookup(final Class argType) 548 { 549 // System.err.println("lookup ("+getId()+"): "+argType.getName()); 550 551 // TODO cache the lookup result if a match is found so 552 // that the next time the same arg_type is passed in, 553 // the search doesn't have to be performed from scratch. 554 555 if (LOG.isDebugEnabled()) 556 { 557 LOG.debug(_id.toString() + ": lookup(" + argType.getName() + ")"); 558 } 559 560 if (argType.equals(Object.class)) 561 { 562 // // System.err.println("lookup ("+getId()+"): returning default handler"); 563 return (Method) _mtable.get(Object.class); 564 } 565 Method m = (Method) _mtable.get(argType); 566 567 if (null != m) 568 { 569 if (LOG.isDebugEnabled()) 570 { 571 LOG.debug( 572 _id.toString() + ": lookup(): returning " + m.toString()); 573 } 574 // // System.err.println("lookup ("+getId()+"): returning handler"); 575 return m; 576 } 577 578 List iflist = new Vector(); 579 iflist.add(argType); 580 581 while (!iflist.isEmpty()) 582 { 583 Class iface = (Class) iflist.remove(0); 584 585 m = (Method) _mtable.get(iface); 586 587 if (null != m) 588 { 589 if (LOG.isDebugEnabled()) 590 { 591 LOG.debug( 592 _id.toString() 593 + ": lookup(): returning " 594 + m.toString()); 595 } 596 // System.err.println("lookup ("+getId()+"): returning handler for "+iface.getName()); 597 return m; 598 } 599 Class[] interfaces = iface.getInterfaces(); 600 for (int i = 0; i < interfaces.length; ++i) 601 { 602 iflist.add(interfaces[i]); 603 } 604 } 605 if (!argType.isInterface()) 606 { 607 Class claz = argType; 608 while (null != claz) 609 { 610 Class superclass = claz.getSuperclass(); 611 if (null != superclass) 612 { 613 m = (Method) _mtable.get(superclass); 614 615 if (null != m) 616 { 617 if (LOG.isDebugEnabled()) 618 { 619 LOG.debug( 620 _id.toString() 621 + ": lookup(): returning " 622 + m.toString()); 623 } 624 // System.err.println("lookup ("+getId()+"): returning handler for "+superclass.getName()); 625 return m; 626 } 627 } 628 claz = superclass; 629 } 630 } 631 632 if (LOG.isDebugEnabled()) 633 { 634 LOG.debug("Egg: lookup(): returning NULL"); 635 } 636 // System.err.println("lookup ("+getId()+"): failed to find handler"); 637 return null; 638 } 639 /*** 640 * Populate the lookup table used to find message handlers in the 641 * derived class. This method is called once when the egg is created. 642 * 643 * @param fromClass the derived class to search for message handlers. 644 */ 645 private final void fillLookupTable(final Class fromClass) 646 { 647 if (LOG.isDebugEnabled()) 648 { 649 LOG.debug( 650 _id.toString() 651 + ": fillLookupTable(" 652 + fromClass.getName() 653 + ")"); 654 } 655 656 Method[] methods = fromClass.getDeclaredMethods(); 657 for (int i = 0; i < methods.length; ++i) 658 { 659 Method m = methods[i]; 660 String methodName = m.getName(); 661 if (!methodName.equals(HANDLE_METHOD_NAME)) 662 { 663 continue; 664 } 665 Class[] methodParameterList = m.getParameterTypes(); 666 if (null == methodParameterList || 1 != methodParameterList.length) 667 { 668 continue; 669 } 670 if (LOG.isDebugEnabled()) 671 { 672 LOG.debug( 673 _id.toString() 674 + ": fillLookupTable: adding handler method: " 675 + m.toString()); 676 } 677 _mtable.put(methodParameterList[0], m); 678 } 679 } 680 681 final PriorityQueue getQueue() { return _mqueue; } 682 683 /*** 684 * Stop this egg from execution. Messages will no longer 685 * be delivered to this egg. 686 * <p> 687 * Warning: this is an irreversible step. An egg can not be 688 * restarted after this method is called. 689 */ 690 final synchronized void stop() 691 { 692 _stopped = true; 693 synchronized (_dispatcher) 694 { 695 _mqueue.clear(); 696 } 697 _dispatcher.remove(this); 698 } 699 } // END OF CLASS Egg

This page was automatically generated by Maven