View Javadoc
1 /* 2 * Copyright (c) 2004, Bruce Lowery 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * - Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * - Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * - Neither the name of JEGG nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 package jegg; 30 31 import java.util.Collections; 32 import java.util.HashSet; 33 import java.util.Set; 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 37 /*** 38 * Used to dispatch messages to eggs. An instance of this 39 * class may be used to dispatch messages to multiple eggs, but it will 40 * always dispatch only a single message at a time. 41 * <p> 42 * When this class is loaded, a special dispatcher is created that is used 43 * to dispatch messages for eggs that were constructed without explicit 44 * specification of a dispatcher for the egg. 45 * <p> 46 * If a particular egg may take a long time to handle a message dispatched 47 * to it, then that egg should be constructed with its own dispatcher (see 48 * {@link jegg.Egg.Egg(Object,Dispatcher) Egg(Object,Dispatcher)}). 49 * <p> 50 * The Dispatcher class provides the fundamental mechanism that decouples 51 * an Egg implementation from its execution thread. The Dispatcher wraps 52 * the thread, not the Egg. 53 */ 54 public class Dispatcher extends Thread 55 { 56 // TODO If one egg has a low priority message and another egg has 57 // a high priority message pending at the same time, it is not 58 // guaranteed that the high priority message will be delivered to 59 // its target egg before the low priority message is delivered to 60 // the other egg. 61 62 /*** Class logger */ 63 private static Log LOG = LogFactory.getLog(Dispatcher.class); 64 65 /*** The default scheduler */ 66 private static final Dispatcher DEFAULT_DISPATCHER = 67 new Dispatcher("default-dispatcher"); 68 69 /*** The set of eggs that this scheduler is executing. */ 70 private Set _scheduled = Collections.synchronizedSet(new HashSet()); 71 72 /*** 73 * Construct an anonymous dispatcher. 74 */ 75 public Dispatcher() 76 { 77 super(); 78 start(); 79 } 80 /*** 81 * Construct a named dispatcher. 82 * @param name the name of the dispatcher. 83 */ 84 public Dispatcher(final String name) 85 { 86 super(name); 87 start(); 88 } 89 90 /*** 91 * Construct a named scheduler that is a member of a specified 92 * thread group. 93 * @param group The group to bind the scheduler to. 94 * @param name The name of the scheduler. 95 */ 96 public Dispatcher(final ThreadGroup group, final String name) 97 { 98 super(group, name); 99 start(); 100 } 101 102 /*** 103 * Return a reference to the default dispatcher. This is the 104 * dispatcher that dispatches messages for all eggs that are 105 * constructed without specifying a particular dispatcher. 106 * @return the default scheduler. 107 */ 108 static Dispatcher getDefaultScheduler() 109 { 110 return DEFAULT_DISPATCHER; 111 } 112 113 /*** 114 * Add an egg to the set of eggs assigned to this dispatcher. 115 * @param e the egg to add to the dispatcher's set. 116 */ 117 final void add(final Egg e) 118 { 119 LOG.trace("trace"); 120 synchronized (_scheduled) 121 { 122 _scheduled.add(e); 123 _scheduled.notify(); 124 } 125 } 126 /*** 127 * Remove an egg from this dispatcher's set. After the 128 * egg has been removed, no more messages will be dispatched to the egg. 129 * @param e the egg to remove. 130 */ 131 final void remove(final Egg e) 132 { 133 LOG.trace("trace"); 134 synchronized (_scheduled) 135 { 136 _scheduled.remove(e); 137 _scheduled.notify(); 138 } 139 } 140 141 /*** 142 * Implementation of the Thread superclass run method. This 143 * implementation delivers messsages in turn to each egg. Messages 144 * are dispatched based on priority. 145 */ 146 public final void run() 147 { 148 LOG.trace("run(): starting"); 149 150 while (!interrupted()) 151 { 152 Egg[] eggs = null; 153 154 while (null == eggs || 0 == eggs.length) 155 { 156 synchronized (this) 157 { 158 synchronized (_scheduled) 159 { 160 if (!_scheduled.isEmpty()) 161 { 162 LOG.debug("Getting eggs"); 163 eggs = 164 (Egg[]) _scheduled.toArray( 165 new Egg[_scheduled.size()]); 166 } 167 } 168 169 if (null == eggs || 0 == eggs.length) 170 { 171 try 172 { 173 LOG.debug("Waiting for eggs"); 174 this.wait(); 175 } 176 catch (InterruptedException e) 177 { 178 LOG.debug("Interrupted"); 179 return; 180 } 181 } 182 } 183 } 184 185 boolean startOver = false; 186 187 synchronized (this) 188 { 189 int num = 0; 190 int numm = 0; 191 while (0 == num) 192 { 193 for (int i = 0; i < eggs.length; ++i) 194 { 195 if (LOG.isDebugEnabled()) 196 { 197 LOG.debug( 198 "Checking messages on egg: " + eggs[i].getId()); 199 } 200 num += eggs[i].getNumPendingMessages(); 201 numm += eggs[i].getQueue().size(); 202 } 203 204 if (0 == num) 205 { 206 try 207 { 208 LOG.debug("Waiting for messages"); 209 this.wait(); 210 startOver = true; 211 break; 212 } 213 catch (InterruptedException e) 214 { 215 LOG.debug("Interrupted"); 216 return; 217 } 218 } 219 } 220 221 } 222 223 if (startOver) 224 { 225 continue; 226 } 227 228 for (int i = 0; i < eggs.length; ++i) 229 { 230 if (interrupted()) 231 { 232 LOG.debug("Interrupted"); 233 return; 234 } 235 236 LOG.debug("Checking for a priority message"); 237 Message message = eggs[i].getNextMessage(); 238 239 if (null != message) 240 { 241 LOG.debug("Dispatching message"); 242 eggs[i].dispatch(message); 243 } 244 } 245 } //END while (!interrupted()) 246 247 } // END METHOD run() 248 249 }

This page was automatically generated by Maven