1 /*
2 * Copyright (c) 2004, Bruce Lowery
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * - Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * - Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * - Neither the name of JEGG nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29 package jegg;
30
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.Set;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36
37 /***
38 * Used to dispatch messages to eggs. An instance of this
39 * class may be used to dispatch messages to multiple eggs, but it will
40 * always dispatch only a single message at a time.
41 * <p>
42 * When this class is loaded, a special dispatcher is created that is used
43 * to dispatch messages for eggs that were constructed without explicit
44 * specification of a dispatcher for the egg.
45 * <p>
46 * If a particular egg may take a long time to handle a message dispatched
47 * to it, then that egg should be constructed with its own dispatcher (see
48 * {@link jegg.Egg.Egg(Object,Dispatcher) Egg(Object,Dispatcher)}).
49 * <p>
50 * The Dispatcher class provides the fundamental mechanism that decouples
51 * an Egg implementation from its execution thread. The Dispatcher wraps
52 * the thread, not the Egg.
53 */
54 public class Dispatcher extends Thread
55 {
56 // TODO If one egg has a low priority message and another egg has
57 // a high priority message pending at the same time, it is not
58 // guaranteed that the high priority message will be delivered to
59 // its target egg before the low priority message is delivered to
60 // the other egg.
61
62 /*** Class logger */
63 private static Log LOG = LogFactory.getLog(Dispatcher.class);
64
65 /*** The default scheduler */
66 private static final Dispatcher DEFAULT_DISPATCHER =
67 new Dispatcher("default-dispatcher");
68
69 /*** The set of eggs that this scheduler is executing. */
70 private Set _scheduled = Collections.synchronizedSet(new HashSet());
71
72 /***
73 * Construct an anonymous dispatcher.
74 */
75 public Dispatcher()
76 {
77 super();
78 start();
79 }
80 /***
81 * Construct a named dispatcher.
82 * @param name the name of the dispatcher.
83 */
84 public Dispatcher(final String name)
85 {
86 super(name);
87 start();
88 }
89
90 /***
91 * Construct a named scheduler that is a member of a specified
92 * thread group.
93 * @param group The group to bind the scheduler to.
94 * @param name The name of the scheduler.
95 */
96 public Dispatcher(final ThreadGroup group, final String name)
97 {
98 super(group, name);
99 start();
100 }
101
102 /***
103 * Return a reference to the default dispatcher. This is the
104 * dispatcher that dispatches messages for all eggs that are
105 * constructed without specifying a particular dispatcher.
106 * @return the default scheduler.
107 */
108 static Dispatcher getDefaultScheduler()
109 {
110 return DEFAULT_DISPATCHER;
111 }
112
113 /***
114 * Add an egg to the set of eggs assigned to this dispatcher.
115 * @param e the egg to add to the dispatcher's set.
116 */
117 final void add(final Egg e)
118 {
119 LOG.trace("trace");
120 synchronized (_scheduled)
121 {
122 _scheduled.add(e);
123 _scheduled.notify();
124 }
125 }
126 /***
127 * Remove an egg from this dispatcher's set. After the
128 * egg has been removed, no more messages will be dispatched to the egg.
129 * @param e the egg to remove.
130 */
131 final void remove(final Egg e)
132 {
133 LOG.trace("trace");
134 synchronized (_scheduled)
135 {
136 _scheduled.remove(e);
137 _scheduled.notify();
138 }
139 }
140
141 /***
142 * Implementation of the Thread superclass run method. This
143 * implementation delivers messsages in turn to each egg. Messages
144 * are dispatched based on priority.
145 */
146 public final void run()
147 {
148 LOG.trace("run(): starting");
149
150 while (!interrupted())
151 {
152 Egg[] eggs = null;
153
154 while (null == eggs || 0 == eggs.length)
155 {
156 synchronized (this)
157 {
158 synchronized (_scheduled)
159 {
160 if (!_scheduled.isEmpty())
161 {
162 LOG.debug("Getting eggs");
163 eggs =
164 (Egg[]) _scheduled.toArray(
165 new Egg[_scheduled.size()]);
166 }
167 }
168
169 if (null == eggs || 0 == eggs.length)
170 {
171 try
172 {
173 LOG.debug("Waiting for eggs");
174 this.wait();
175 }
176 catch (InterruptedException e)
177 {
178 LOG.debug("Interrupted");
179 return;
180 }
181 }
182 }
183 }
184
185 boolean startOver = false;
186
187 synchronized (this)
188 {
189 int num = 0;
190 int numm = 0;
191 while (0 == num)
192 {
193 for (int i = 0; i < eggs.length; ++i)
194 {
195 if (LOG.isDebugEnabled())
196 {
197 LOG.debug(
198 "Checking messages on egg: " + eggs[i].getId());
199 }
200 num += eggs[i].getNumPendingMessages();
201 numm += eggs[i].getQueue().size();
202 }
203
204 if (0 == num)
205 {
206 try
207 {
208 LOG.debug("Waiting for messages");
209 this.wait();
210 startOver = true;
211 break;
212 }
213 catch (InterruptedException e)
214 {
215 LOG.debug("Interrupted");
216 return;
217 }
218 }
219 }
220
221 }
222
223 if (startOver)
224 {
225 continue;
226 }
227
228 for (int i = 0; i < eggs.length; ++i)
229 {
230 if (interrupted())
231 {
232 LOG.debug("Interrupted");
233 return;
234 }
235
236 LOG.debug("Checking for a priority message");
237 Message message = eggs[i].getNextMessage();
238
239 if (null != message)
240 {
241 LOG.debug("Dispatching message");
242 eggs[i].dispatch(message);
243 }
244 }
245 } //END while (!interrupted())
246
247 } // END METHOD run()
248
249 }
This page was automatically generated by Maven