1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activecluster.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.activecluster.Cluster;
23 import org.activecluster.ClusterEvent;
24 import org.activecluster.ClusterListener;
25 import org.activecluster.Node;
26 import org.activecluster.election.ElectionStrategy;
27 import org.activecluster.election.impl.BullyElectionStrategy;
28
29 import javax.jms.Destination;
30 import javax.jms.JMSException;
31 import java.util.ArrayList;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Map.Entry;
38 import java.util.Timer;
39 import java.util.TimerTask;
40
41
42 /***
43 * Represents a node list
44 *
45 * @version $Revision: 1.1 $
46 */
47 public class StateServiceImpl implements StateService {
48
49 private final static Log log = LogFactory.getLog(StateServiceImpl.class);
50 private Cluster cluster;
51 private Object clusterLock;
52 private Map nodes = new HashMap();
53 private long inactiveTime;
54 private List listeners = Collections.synchronizedList(new ArrayList());
55 private Destination localDestination;
56 private Runnable localNodePing;
57 private Timer timer;
58 private NodeImpl coordinator;
59 private ElectionStrategy electionStrategy;
60
61 public StateServiceImpl(Cluster cluster, Object clusterLock, Runnable localNodePing, Timer timer, long inactiveTime) {
62 this.cluster = cluster;
63 this.clusterLock = clusterLock;
64 this.localDestination = cluster.getLocalNode().getDestination();
65 this.localNodePing = localNodePing;
66 this.timer = timer;
67 this.inactiveTime = inactiveTime;
68 long delay = inactiveTime / 3;
69 timer.scheduleAtFixedRate(createTimerTask(), delay, delay);
70 (this.coordinator = (NodeImpl) cluster.getLocalNode()).setCoordinator(true);
71 this.electionStrategy = new BullyElectionStrategy();
72 }
73
74 /***
75 * @return the current election strategy
76 */
77 public ElectionStrategy getElectionStrategy() {
78 return electionStrategy;
79 }
80
81 /***
82 * set the election strategy
83 *
84 * @param electionStrategy
85 */
86 public void setElectionStrategy(ElectionStrategy electionStrategy) {
87 this.electionStrategy = electionStrategy;
88 }
89
90 public long getInactiveTime() {
91 return inactiveTime;
92 }
93
94 public void setInactiveTime(long inactiveTime) {
95 this.inactiveTime = inactiveTime;
96 }
97
98 public synchronized Map getNodes() {
99 HashMap answer = new HashMap(nodes.size());
100 for (Iterator iter = nodes.entrySet().iterator(); iter.hasNext();) {
101 Map.Entry entry = (Map.Entry) iter.next();
102 Destination key = (Destination) entry.getKey();
103 NodeEntry nodeEntry = (NodeEntry) entry.getValue();
104 answer.put(key, nodeEntry.node);
105 }
106 return answer;
107 }
108
109 public synchronized void keepAlive(Node node) {
110 Destination key = node.getDestination();
111 if (!localDestination.equals(key)) {
112 NodeEntry entry = (NodeEntry) nodes.get(key);
113 if (entry == null) {
114 entry = new NodeEntry();
115 entry.node = node;
116 nodes.put(key, entry);
117 nodeAdded(node);
118 synchronized (clusterLock) {
119 clusterLock.notifyAll();
120 }
121 }
122 else {
123
124 if (stateHasChanged(entry.node, node)) {
125 entry.node = node;
126 nodeUpdated(node);
127 }
128 }
129
130
131
132 entry.lastKeepAlive = getTimeMillis();
133 }
134 }
135
136 public synchronized void shutdown(Node node) {
137 Destination key = node.getDestination();
138 nodes.remove(key);
139
140 ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE);
141
142 Object[] array = listeners.toArray();
143 for (int i = 0, size = array.length; i < size; i++) {
144 ClusterListener listener = (ClusterListener) array[i];
145 listener.onNodeRemoved(event);
146 }
147 }
148
149 public synchronized void checkForTimeouts() {
150 localNodePing.run();
151 long time = getTimeMillis();
152 for (Iterator iter = nodes.entrySet().iterator(); iter.hasNext();) {
153 Map.Entry entry = (Entry) iter.next();
154 NodeEntry nodeEntry = (NodeEntry) entry.getValue();
155 if (nodeEntry.lastKeepAlive + inactiveTime < time) {
156 iter.remove();
157 nodeFailed(nodeEntry.node);
158 }
159 }
160 }
161
162 public TimerTask createTimerTask() {
163 return new TimerTask() {
164 public void run() {
165 checkForTimeouts();
166 }
167 };
168 }
169
170 public void addClusterListener(ClusterListener listener) {
171 listeners.add(listener);
172 }
173
174 public void removeClusterListener(ClusterListener listener) {
175 listeners.remove(listener);
176 }
177
178 protected void nodeAdded(Node node) {
179 ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE);
180
181 Object[] array = listeners.toArray();
182 for (int i = 0, size = array.length; i < size; i++) {
183 ClusterListener listener = (ClusterListener) array[i];
184 listener.onNodeAdd(event);
185 }
186 doElection();
187 }
188
189 protected void nodeUpdated(Node node) {
190 ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.UPDATE_NODE);
191
192 Object[] array = listeners.toArray();
193 for (int i = 0, size = array.length; i < size; i++) {
194 ClusterListener listener = (ClusterListener) array[i];
195 listener.onNodeUpdate(event);
196 }
197 }
198
199 protected void nodeFailed(Node node) {
200 ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.REMOVE_NODE);
201
202 Object[] array = listeners.toArray();
203 for (int i = 0, size = array.length; i < size; i++) {
204 ClusterListener listener = (ClusterListener) array[i];
205 listener.onNodeFailed(event);
206 }
207 doElection();
208 }
209
210 protected void coordinatorChanged(Node node) {
211 ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ELECTED_COORDINATOR);
212
213 Object[] array = listeners.toArray();
214 for (int i = 0, size = array.length; i < size; i++) {
215 ClusterListener listener = (ClusterListener) array[i];
216 listener.onCoordinatorChanged(event);
217 }
218 }
219
220 protected void doElection() {
221 if (electionStrategy != null) {
222 try {
223 NodeImpl newElected = (NodeImpl) electionStrategy.doElection(cluster);
224 if (newElected != null && !newElected.equals(coordinator)) {
225 coordinator.setCoordinator(false);
226 coordinator = newElected;
227 coordinator.setCoordinator(true);
228 coordinatorChanged(coordinator);
229 }
230 }
231 catch (JMSException jmsEx) {
232 log.error("do election failed", jmsEx);
233 }
234 }
235 }
236
237 /***
238 * For performance we may wish to use a less granualar timing mechanism
239 * only updating the time every x millis since we're only using
240 * the time as a judge of when a node has not pinged for at least a few
241 * hundred millis etc.
242 */
243 protected long getTimeMillis() {
244 return System.currentTimeMillis();
245 }
246
247 protected static class NodeEntry {
248 public Node node;
249 public long lastKeepAlive;
250 }
251
252
253 /***
254 * @return true if the node has changed state from the old in memory copy to the
255 * newly arrived copy
256 */
257 protected boolean stateHasChanged(Node oldNode, Node newNode) {
258 Map oldState = oldNode.getState();
259 Map newState = newNode.getState();
260 if (oldState == newState) {
261 return false;
262 }
263 return oldState == null || newState == null || !oldState.equals(newState);
264 }
265 }