1 /*
2 * Copyright (c) 2004, Bruce Lowery
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * - Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * - Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * - Neither the name of JEGG nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29 package jegg;
30 import java.lang.reflect.InvocationTargetException;
31 import java.lang.reflect.Method;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.Vector;
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39
40 import jegg.queue.*;
41 import jegg.registry.*;
42 import jegg.timer.Timeout;
43 import jegg.timer.Timer;
44 import jegg.timer.TimeoutListener;
45
46 /***
47 * Base class for implementing an egg. Concrete classes that extend this
48 * class will be plugged into the JEgg framework and be able to send and
49 * receive arbitrary messages. The Egg base class also provides
50 * convenience methods for the following:
51 * <ul>
52 * <li>Responding to a message (see {@link #respond(Object) respond})</li>
53 * <li>Publishing this egg's message port (see {@link #publishPort(String) publishPort})</li>
54 * <li>Looking up another egg's message port (@see {@link #requestPort(String) requestPort})</li>
55 * <li>Broadcasting a message (see {@link #send(Object) send})
56 * <li>'Binding' to another egg's port (see {@link #bindToPort(Port) bindToPort})</li>
57 * <li>Creating an egg timer (see {@link #createTimer(long, long, boolean) createTimer})</li>
58 * <li>Creation of a JEgg message from an application object (see {@link #createMessage(Object)})
59 * </ul>
60 * <p>
61 * The default constructor will assign this egg to the default thread. Use the
62 * constructor that takes a {@link jegg.Dispatcher Dispatcher} in order to
63 * assign the egg to a specific thread.
64 * <p>
65 * The derived class should implement handlers for the message types that it expects
66 * to receive. A message handler is implemented by overloading the {@link #handle(Object) handle}
67 * method with a version that accepts the expected message type. When the derived
68 * class is loaded, the <code>handle</code> methods are reflected and cached in
69 * a lookup table to improve the message delivery performance.
70 * <p>
71 * The message dispatcher assigned to the egg will only deliver one message at
72 * a time to the Egg subclass. If an egg is expected to take a long time to
73 * process some messages, then it should be assigned its own dispatcher (see
74 * above).
75 * <p>
76 * Messages are dispatched to an egg according to their priority, in the order
77 * they were sent. So, a more recent message with a high priority can be
78 * delivered to the egg before a message that was sent earlier but with a
79 * lower priority.
80 */
81 public abstract class Egg
82 {
83 /*** The name of the [overloaded] method that subclasses must implement. */
84 private static final String HANDLE_METHOD_NAME = "handle";
85 /*** Message queue */
86 private PriorityQueue _mqueue = new PriorityQueue();
87 /*** This class*/
88 private Class _class;
89 /*** The message dispatcher for this egg*/
90 private Dispatcher _dispatcher;
91 /*** Lookup table */
92 private Map _mtable = new HashMap();
93 /*** Port */
94 private Port _port;
95 /*** Name */
96 private Object _id;
97 /*** True if stopped */
98 private boolean _stopped = false;
99 /*** Current active message */
100 private Message _currentMessage;
101 /*** Logger */
102 private static final Log LOG = LogFactory.getLog(Egg.class);
103 /***
104 * Constructor. The default dispatcher is used to execute this egg.
105 * @param id a unique object that can be used to identify this egg.
106 * @throws IllegalStateException if the derived class doesn't
107 * implement any 'handle' methods.
108 */
109 public Egg(final Object id)
110 {
111 this(id, Dispatcher.getDefaultScheduler());
112 if (LOG.isDebugEnabled())
113 {
114 LOG.debug(
115 "Egg: class "
116 + this.getClass().getName()
117 + ": default constructed");
118 }
119 }
120
121 /***
122 * Construct an egg using a specific scheduler to execute the egg.
123 * @param id a unique object that can be used to identify this egg.
124 * @param s the scheduler to use to execute the egg.
125 */
126 public Egg(final Object id, final Dispatcher s)
127 {
128 super();
129 this._id = id;
130 _class = this.getClass();
131 this.fillLookupTable(_class);
132 if (_mtable.isEmpty())
133 {
134 throw new IllegalStateException(
135 "Class "
136 + _class.getName()
137 + " implements no '"
138 + HANDLE_METHOD_NAME
139 + "' methods");
140 }
141 _dispatcher = s;
142 _dispatcher.add(this);
143 _port = new Port(this);
144 if (LOG.isDebugEnabled())
145 {
146 LOG.debug(
147 "Egg: class "
148 + this.getClass().getName()
149 + ": constructed with scheduler");
150 }
151 }
152
153 /***
154 * Return the port used to deliver messages to this egg.
155 * @return this egg's message port.
156 */
157 public final Port getPort()
158 {
159 return _port;
160 }
161
162 /***
163 * Return the port of the egg that sent the current message.
164 * @return the sender's port
165 */
166 protected final Port getFromPort()
167 {
168 return _currentMessage.getFrom();
169 }
170
171 /***
172 * Return the current message being processed.
173 * @return the`current message.
174 */
175 protected final Message getCurrentMessage()
176 {
177 return _currentMessage;
178 }
179
180 /***
181 * Publish this egg's message port in the port registry. Other eggs
182 * can lookup the port in the registry and use it to send messages to
183 * this egg.
184 * @param name the name to register the port under.
185 */
186 protected final void publishPort(String n)
187 {
188 PortRegistry.getInstance().getPort().send(createMessage(new PublishPortMessage(n, getPort())));
189 }
190
191 /***
192 * Send a lookup request to the port registry. The port will be delivered
193 * to this egg using a {@link jegg.registry.LocatePortResponse LocatePortResponse}.
194 * <p>
195 * If the port is not registered in the registry when the registry receives
196 * the port request, the request will be saved until the target port is registered.
197 *
198 * @param name the name of the registered port.
199 */
200 protected final void requestPort(String n)
201 {
202 PortRegistry.getInstance().getPort().send(createMessage(new LocatePortMessage(n)));
203 }
204
205 /***
206 * Register to receive <i>all</i> messages broadcast by the egg that the
207 * specified port belongs to. A egg can broadcast a message using the
208 * {@link #send(Object) send} method.
209 *
210 * @param p the port to register for broadcast messages with.
211 */
212 protected final void bindToPort(Port p)
213 {
214 p.connect(getPort());
215 }
216
217 // TODO implement unbindFromPort(Port p)
218
219 /***
220 * Send a message to the egg that sent the current message.
221 * The message will be sent with a <i>medium</i> priority
222 * level assigned to it.
223 *
224 * @param message the message to send to the sending egg.
225 */
226 protected final void respond(Object message)
227 {
228 respond(message,Priority.MEDIUM);
229 }
230
231 /***
232 * Send a message to the egg that sent the current message.
233 * The message will be sent with the specified priority
234 * level assigned to it.
235 *
236 * @param message the message to send to the sending egg.
237 * @param priority the priority to assign to the message.
238 */
239 protected final void respond(Object message, Priority priority)
240 {
241 respond(getFromPort(),message,priority);
242 }
243
244 /***
245 * Send a message to the egg that the specified port
246 * belongs to. This method is useful when a message
247 * response has to be delayed until later (so the port
248 * is saved for use later). The message will be sent
249 * with a medium priority level assigned to it.
250 *
251 * @param port the port to send the message on.
252 * @param message the message to send to the egg that
253 * the specified port belongs to.
254 */
255 protected final void respond(Port port, Object message)
256 {
257 respond(port,message,Priority.MEDIUM);
258 }
259
260 /***
261 * Send a message to the egg that the specified port
262 * belongs to. This method is useful when a message
263 * response has to be delayed until later (so the port
264 * is saved for use later). The message will be sent
265 * with the specified priority level assigned to it.
266 *
267 * @param port the port to send the message on.
268 * @param message the message to send to the egg that
269 * the specified port belongs to.
270 * @param priority the message's priority assignment.
271 */
272 protected final void respond(Port port, Object message, Priority priority)
273 {
274 port.send(createMessage(message,priority));
275 }
276
277 /***
278 * Return the ID for this egg. The egg's ID is arbitrary and not used
279 * by the JEgg framework; this is purely for user convenience.
280 *
281 * @return the id assigned to this egg.
282 */
283 final Object getId()
284 {
285 return _id;
286 }
287
288 /***
289 * Process a message from another egg. Note that this method is
290 * declared public. This is because the dispatcher must have
291 * permission to invoke the handle methods in the derived class that
292 * will be in a different, application-defined package.
293 *
294 * @param message the message to process.
295 */
296 public abstract void handle(final Object message);
297
298 /***
299 * Broadcast a message to all eggs that have bound to this egg's port
300 * (see {@link #bindToPort(Port) bindToPort}).
301 *
302 * @param message the message to send.
303 */
304 protected final void send(Object msg)
305 {
306 getPort().broadcast(msg);
307 }
308
309 /***
310 * Create JEgg message from an application-defined message. The message
311 * will be created with a medium priority level assigned to it.
312 * Additionally, this egg's port will be saved in the JEgg message.
313 * <p>
314 * This is useful to do when the same message has to be sent to many
315 * different eggs.
316 * @param message the application message to wrap in a JEgg message.
317 * @return a JEgg message wrapping the application message.
318 */
319 protected final Message createMessage(Object m)
320 {
321 return createMessage(m,Priority.MEDIUM);
322 }
323
324 /***
325 * Create JEgg message from an application-defined message. The message
326 * will be created with the specified priority level assigned to it.
327 * Additionally, this egg's port will be saved in the JEgg message.
328 * <p>
329 * This is useful to do when the same message has to be sent to many
330 * different eggs.
331 * @param message the application message to wrap in a JEgg message.
332 * @return a JEgg message wrapping the application message.
333 */
334 protected final Message createMessage(Object m, Priority p)
335 {
336 return new Message(m,getPort(),p);
337 }
338
339 /***
340 * Create an egg timer. The timer will deliver {@link #jegg.timer.Timeout Timeout}
341 * messages to this egg until {@link #jegg.timer.Timer.cancel() Timer.cancel()}
342 * is called.
343 * <p>
344 * The timer is automatically started before being returned.
345 *
346 * @param interval_msec the number of milliseconds between timeout messages.
347 * @param delay_msec the initial delay that the timer will wait before it
348 * starts delivering timeout messages.
349 * @return A new timer instance.
350 */
351 protected final Timer createRepeatingTimer(long interval_msec, long delay_msec)
352 {
353 TimeoutListener tl = new TimeoutListener()
354 {
355 public void timeout(Timer tt)
356 {
357 emitTimeout(tt);
358 }
359 };
360 return Timer.createRepeatingTimer(tl, interval_msec, delay_msec);
361 }
362
363 /***
364 * Create an egg timer. The timer will deliver a single {@link #jegg.timer.Timeout Timeout}
365 * message to this egg.
366 * @param delay_msec the number of milliseconds after which the timeout
367 * message will be delivered.
368 * @return a new timer instance.
369 */
370 protected final Timer createSingleShotTimer(long delay_msec)
371 {
372 TimeoutListener tl = new TimeoutListener()
373 {
374 public void timeout(Timer tt)
375 {
376 emitTimeout(tt);
377 }
378 };
379 return Timer.createSingleShotTimer(tl, delay_msec);
380 }
381
382 /***
383 * Return this egg's message dispatcher.
384 * @return the dispatcher assigned to this egg.
385 */
386 protected Dispatcher getDispatcher()
387 {
388 return _dispatcher;
389 }
390
391 /***
392 * Send a timeout notification to the derived class. This method is
393 * used to deliver timeout messages from a running timer (see {@link
394 * #createTimer(long,long,boolean) createTimer}).
395 *
396 * @param t the timer that is sending the timeout message.
397 */
398 private final void emitTimeout(Timer t)
399 {
400 getPort().send(createMessage(new Timeout(t)));
401 }
402
403 /***
404 * Add a message to this egg's message queue. This method is only
405 * used by the egg's port when another egg sends a message to this egg.
406 *
407 * @param message the message to add to the queue.
408 */
409 final void enqueue(final Message message)
410 {
411 if (_stopped)
412 {
413 throw new IllegalStateException("stopped");
414 }
415
416 synchronized (_dispatcher)
417 {
418 if (LOG.isDebugEnabled())
419 {
420 LOG.debug(_id.toString() + ": enqueue: " + message.toString());
421 }
422
423 Priority p = message.getPriority();
424 _mqueue.add(p,message);
425
426 if (LOG.isDebugEnabled())
427 {
428 LOG.debug(
429 _id.toString()
430 + ": enqueue: has "
431 + _mqueue.size()
432 + " messages");
433 }
434
435 if (LOG.isDebugEnabled())
436 {
437 LOG.debug(_id.toString() + ": enqueue: notifying scheduler");
438 }
439
440 _dispatcher.notifyAll();
441
442 if (LOG.isDebugEnabled())
443 {
444 LOG.debug(_id.toString() + ": enqueue: done");
445 }
446 }
447 }
448
449 /***
450 * Return next message in order of priority.
451 * @return
452 */
453 final Message getNextMessage()
454 {
455 Message m = null;
456 try
457 {
458 m = (Message) _mqueue.next();
459 }
460 catch (Throwable t)
461 {
462 // EMPTY
463 }
464 return m;
465 }
466
467 /***
468 * Return the number of undelivered messages in this egg's
469 * message queue.
470 * @return the number of undelivered messages.
471 */
472 final long getNumPendingMessages()
473 {
474 long num = _mqueue.size();
475 if (LOG.isDebugEnabled())
476 {
477 LOG.debug(
478 _id.toString() + ": getNumPendingMessages: " + Long.toString(num));
479 }
480 return num;
481 }
482 /***
483 * Deliver a message to the derived class handler. The message is
484 * delivered to the most specific handler based on the concrete type
485 * of the message.
486 * <p>
487 * If no handler can be found, the message is dropped.
488 * <p>
489 * This method is used by a message dispatcher to deliver a message to
490 * the egg.
491 * @param message to handle.
492 */
493 final void dispatch(final Message incoming)
494 {
495 if (LOG.isDebugEnabled())
496 {
497 LOG.debug(_id.toString() + ": Dispatching: " + incoming.toString());
498 }
499
500 Object message = incoming.getMessage();
501 Class msgType = message.getClass();
502 Method method = lookup(msgType);
503
504 if (null == method)
505 {
506 LOG.error(
507 "No handler found - dropping message: " + incoming.toString());
508 return;
509 }
510 try
511 {
512 if (LOG.isDebugEnabled())
513 {
514 LOG.debug(
515 _id.toString()
516 + ": Invoking handler: "
517 + method.toString());
518 }
519 _currentMessage = incoming;
520 method.invoke(this, new Object[] {message});
521 }
522 catch (InvocationTargetException e)
523 {
524 LOG.error("Invocation error: "+e.getCause());
525 e.printStackTrace();
526 }
527 catch (Throwable t)
528 {
529 LOG.error(
530 this.getClass().getName()+": " +
531 "Error raised handling message ["
532 + incoming.toString()
533 + "]: "
534 + t);
535 }
536 finally
537 {
538 _currentMessage = null;
539 }
540 }
541 /***
542 * Return the derived class handler method for a message of the
543 * type specified by the argument.
544 * @param argType the argument type of the handler to return.
545 * @return the handler for messages of the specified type.
546 */
547 private final Method lookup(final Class argType)
548 {
549 // System.err.println("lookup ("+getId()+"): "+argType.getName());
550
551 // TODO cache the lookup result if a match is found so
552 // that the next time the same arg_type is passed in,
553 // the search doesn't have to be performed from scratch.
554
555 if (LOG.isDebugEnabled())
556 {
557 LOG.debug(_id.toString() + ": lookup(" + argType.getName() + ")");
558 }
559
560 if (argType.equals(Object.class))
561 {
562 // // System.err.println("lookup ("+getId()+"): returning default handler");
563 return (Method) _mtable.get(Object.class);
564 }
565 Method m = (Method) _mtable.get(argType);
566
567 if (null != m)
568 {
569 if (LOG.isDebugEnabled())
570 {
571 LOG.debug(
572 _id.toString() + ": lookup(): returning " + m.toString());
573 }
574 // // System.err.println("lookup ("+getId()+"): returning handler");
575 return m;
576 }
577
578 List iflist = new Vector();
579 iflist.add(argType);
580
581 while (!iflist.isEmpty())
582 {
583 Class iface = (Class) iflist.remove(0);
584
585 m = (Method) _mtable.get(iface);
586
587 if (null != m)
588 {
589 if (LOG.isDebugEnabled())
590 {
591 LOG.debug(
592 _id.toString()
593 + ": lookup(): returning "
594 + m.toString());
595 }
596 // System.err.println("lookup ("+getId()+"): returning handler for "+iface.getName());
597 return m;
598 }
599 Class[] interfaces = iface.getInterfaces();
600 for (int i = 0; i < interfaces.length; ++i)
601 {
602 iflist.add(interfaces[i]);
603 }
604 }
605 if (!argType.isInterface())
606 {
607 Class claz = argType;
608 while (null != claz)
609 {
610 Class superclass = claz.getSuperclass();
611 if (null != superclass)
612 {
613 m = (Method) _mtable.get(superclass);
614
615 if (null != m)
616 {
617 if (LOG.isDebugEnabled())
618 {
619 LOG.debug(
620 _id.toString()
621 + ": lookup(): returning "
622 + m.toString());
623 }
624 // System.err.println("lookup ("+getId()+"): returning handler for "+superclass.getName());
625 return m;
626 }
627 }
628 claz = superclass;
629 }
630 }
631
632 if (LOG.isDebugEnabled())
633 {
634 LOG.debug("Egg: lookup(): returning NULL");
635 }
636 // System.err.println("lookup ("+getId()+"): failed to find handler");
637 return null;
638 }
639 /***
640 * Populate the lookup table used to find message handlers in the
641 * derived class. This method is called once when the egg is created.
642 *
643 * @param fromClass the derived class to search for message handlers.
644 */
645 private final void fillLookupTable(final Class fromClass)
646 {
647 if (LOG.isDebugEnabled())
648 {
649 LOG.debug(
650 _id.toString()
651 + ": fillLookupTable("
652 + fromClass.getName()
653 + ")");
654 }
655
656 Method[] methods = fromClass.getDeclaredMethods();
657 for (int i = 0; i < methods.length; ++i)
658 {
659 Method m = methods[i];
660 String methodName = m.getName();
661 if (!methodName.equals(HANDLE_METHOD_NAME))
662 {
663 continue;
664 }
665 Class[] methodParameterList = m.getParameterTypes();
666 if (null == methodParameterList || 1 != methodParameterList.length)
667 {
668 continue;
669 }
670 if (LOG.isDebugEnabled())
671 {
672 LOG.debug(
673 _id.toString()
674 + ": fillLookupTable: adding handler method: "
675 + m.toString());
676 }
677 _mtable.put(methodParameterList[0], m);
678 }
679 }
680
681 final PriorityQueue getQueue() { return _mqueue; }
682
683 /***
684 * Stop this egg from execution. Messages will no longer
685 * be delivered to this egg.
686 * <p>
687 * Warning: this is an irreversible step. An egg can not be
688 * restarted after this method is called.
689 */
690 final synchronized void stop()
691 {
692 _stopped = true;
693 synchronized (_dispatcher)
694 {
695 _mqueue.clear();
696 }
697 _dispatcher.remove(this);
698 }
699 } // END OF CLASS Egg
This page was automatically generated by Maven