1   /*
2    * Copyright (c) 2004, Bruce Lowery
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions are met:
7    *
8    *    - Redistributions of source code must retain the above copyright notice,
9    *      this list of conditions and the following disclaimer.
10   *    - Redistributions in binary form must reproduce the above copyright
11   *      notice, this list of conditions and the following disclaimer in the
12   *      documentation and/or other materials provided with the distribution.
13   *    - Neither the name of JEGG nor the names of its contributors may be used
14   *      to endorse or promote products derived from this software without
15   *      specific prior written permission.
16   *
17   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18   * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19   * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21   * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22   * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23   * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24   * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25   * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27   * POSSIBILITY OF SUCH DAMAGE.
28   */
29  package jegg;
30  
31  import java.util.Collections;
32  import java.util.HashSet;
33  import java.util.Set;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  
37  /***
38   * Used to dispatch messages to eggs.  An instance of this
39   * class may be used to dispatch messages to multiple eggs, but it will
40   * always dispatch only a single message at a time.  
41   * <p>
42   * When this class is loaded, a special dispatcher is created that is used 
43   * to dispatch messages for eggs that were constructed without explicit
44   * specification of a dispatcher for the egg.
45   * <p>
46   * If a particular egg may take a long time to handle a message dispatched
47   * to it, then that egg should be constructed with its own dispatcher (see
48   * {@link jegg.Egg.Egg(Object,Dispatcher) Egg(Object,Dispatcher)}).
49   * <p>
50   * The Dispatcher class provides the fundamental mechanism that decouples
51   * an Egg implementation from its execution thread.  The Dispatcher wraps
52   * the thread, not the Egg.
53   */
54  public class Dispatcher extends Thread
55  {
56      // TODO If one egg has a low priority message and another egg has
57      // a high priority message pending at the same time, it is not
58      // guaranteed that the high priority message will be delivered to
59      // its target egg before the low priority message is delivered to
60      // the other egg.  
61      
62      /*** Class logger */
63      private static Log LOG = LogFactory.getLog(Dispatcher.class);
64  
65      /*** The default scheduler */
66      private static final Dispatcher DEFAULT_DISPATCHER =
67          new Dispatcher("default-dispatcher");
68  
69      /*** The set of eggs that this scheduler is executing. */
70      private Set _scheduled = Collections.synchronizedSet(new HashSet());
71  
72      /***
73       * Construct an anonymous dispatcher.
74       */
75      public Dispatcher()
76      {
77          super();
78          start();
79      }
80      /***
81       * Construct a named dispatcher.
82       * @param name the name of the dispatcher.
83       */
84      public Dispatcher(final String name)
85      {
86          super(name);
87          start();
88      }
89  
90      /***
91       * Construct a named scheduler that is a member of a specified
92       * thread group.
93       * @param group The group to bind the scheduler to.
94       * @param name The name of the scheduler.
95       */
96      public Dispatcher(final ThreadGroup group, final String name)
97      {
98          super(group, name);
99          start();
100     }
101 
102     /***
103      * Return a reference to the default dispatcher.  This is the
104      * dispatcher that dispatches messages for all eggs that are
105      * constructed without specifying a particular dispatcher.
106      * @return the default scheduler.
107      */
108     static Dispatcher getDefaultScheduler()
109     {
110         return DEFAULT_DISPATCHER;
111     }
112 
113     /***
114      * Add an egg to the set of eggs assigned to this dispatcher.
115      * @param e the egg to add to the dispatcher's set.
116      */
117     final void add(final Egg e)
118     {
119         LOG.trace("trace");
120         synchronized (_scheduled)
121         {
122             _scheduled.add(e);
123             _scheduled.notify();
124         }
125     }
126     /***
127      * Remove an egg from this dispatcher's set.  After the
128      * egg has been removed, no more messages will be dispatched to the egg.
129      * @param e the egg to remove.
130      */
131     final void remove(final Egg e)
132     {
133         LOG.trace("trace");
134         synchronized (_scheduled)
135         {
136             _scheduled.remove(e);
137             _scheduled.notify();
138         }
139     }
140 
141     /*** 
142      * Implementation of the Thread superclass run method.  This 
143      * implementation delivers messsages in turn to each egg.  Messages
144      * are dispatched based on priority.
145      */
146     public final void run()
147     {
148         LOG.trace("run(): starting");
149 
150         while (!interrupted())
151         {
152             Egg[] eggs = null;
153 
154             while (null == eggs || 0 == eggs.length)
155             {
156                 synchronized (this)
157                 {
158                     synchronized (_scheduled)
159                     {
160                         if (!_scheduled.isEmpty())
161                         {
162                             LOG.debug("Getting eggs");
163                             eggs =
164                                 (Egg[]) _scheduled.toArray(
165                                     new Egg[_scheduled.size()]);
166                         }
167                     }
168 
169                     if (null == eggs || 0 == eggs.length)
170                     {
171                         try
172                         {
173                             LOG.debug("Waiting for eggs");
174                             this.wait();
175                         }
176                         catch (InterruptedException e)
177                         {
178                             LOG.debug("Interrupted");
179                             return;
180                         }
181                     }
182                 }
183             }
184 
185             boolean startOver = false;
186 
187             synchronized (this)
188             {
189                 int num = 0;
190                 int numm = 0;
191                 while (0 == num)
192                 {
193                     for (int i = 0; i < eggs.length; ++i)
194                     {
195                         if (LOG.isDebugEnabled())
196                         {    
197                             LOG.debug(
198                                     "Checking messages on egg: " + eggs[i].getId());
199                         }
200                         num += eggs[i].getNumPendingMessages();
201                         numm += eggs[i].getQueue().size();
202                     }
203                     
204                     if (0 == num)
205                     {
206                         try
207                         {
208                             LOG.debug("Waiting for messages");
209                             this.wait();
210                             startOver = true;
211                             break;
212                         }
213                         catch (InterruptedException e)
214                         {
215                             LOG.debug("Interrupted");
216                             return;
217                         }
218                     }
219                 }
220 
221             }
222 
223             if (startOver)
224             {
225                 continue;
226             }
227 
228             for (int i = 0; i < eggs.length; ++i)
229             {
230                 if (interrupted())
231                 {
232                     LOG.debug("Interrupted");
233                     return;
234                 }
235 
236                 LOG.debug("Checking for a priority message");
237                 Message message = eggs[i].getNextMessage();
238 
239                 if (null != message)
240                 {
241                     LOG.debug("Dispatching message");
242                     eggs[i].dispatch(message);
243                 }
244             }
245         } //END while (!interrupted())
246 
247     } // END METHOD run()
248 
249 }
This page was automatically generated by Maven