View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activecluster.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.activecluster.Cluster;
23  import org.activecluster.ClusterEvent;
24  import org.activecluster.ClusterListener;
25  import org.activecluster.Node;
26  import org.activecluster.election.ElectionStrategy;
27  import org.activecluster.election.impl.BullyElectionStrategy;
28  
29  import javax.jms.Destination;
30  import javax.jms.JMSException;
31  import java.util.ArrayList;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Map.Entry;
38  import java.util.Timer;
39  import java.util.TimerTask;
40  
41  
42  /***
43   * Represents a node list
44   *
45   * @version $Revision: 1.1 $
46   */
47  public class StateServiceImpl implements StateService {
48  
49      private final static Log log = LogFactory.getLog(StateServiceImpl.class);
50      private Cluster cluster;
51      private Object clusterLock;
52      private Map nodes = new HashMap();
53      private long inactiveTime;
54      private List listeners = Collections.synchronizedList(new ArrayList());
55      private Destination localDestination;
56      private Runnable localNodePing;
57      private Timer timer;
58      private NodeImpl coordinator;
59      private ElectionStrategy electionStrategy;
60  
61      public StateServiceImpl(Cluster cluster, Object clusterLock, Runnable localNodePing, Timer timer, long inactiveTime) {
62          this.cluster = cluster;
63          this.clusterLock = clusterLock;
64          this.localDestination = cluster.getLocalNode().getDestination();
65          this.localNodePing = localNodePing;
66          this.timer = timer;
67          this.inactiveTime = inactiveTime;
68          long delay = inactiveTime / 3;
69          timer.scheduleAtFixedRate(createTimerTask(), delay, delay);
70          (this.coordinator = (NodeImpl) cluster.getLocalNode()).setCoordinator(true);
71          this.electionStrategy = new BullyElectionStrategy();
72      }
73  
74      /***
75       * @return the current election strategy
76       */
77      public ElectionStrategy getElectionStrategy() {
78          return electionStrategy;
79      }
80  
81      /***
82       * set the election strategy
83       *
84       * @param electionStrategy
85       */
86      public void setElectionStrategy(ElectionStrategy electionStrategy) {
87          this.electionStrategy = electionStrategy;
88      }
89  
90      public long getInactiveTime() {
91          return inactiveTime;
92      }
93  
94      public void setInactiveTime(long inactiveTime) {
95          this.inactiveTime = inactiveTime;
96      }
97  
98      public synchronized Map getNodes() {
99          HashMap answer = new HashMap(nodes.size());
100         for (Iterator iter = nodes.entrySet().iterator(); iter.hasNext();) {
101             Map.Entry entry = (Map.Entry) iter.next();
102             Destination key = (Destination) entry.getKey();
103             NodeEntry nodeEntry = (NodeEntry) entry.getValue();
104             answer.put(key, nodeEntry.node);
105         }
106         return answer;
107     }
108 
109     public synchronized void keepAlive(Node node) {
110         Destination key = node.getDestination();
111         if (!localDestination.equals(key)) {
112             NodeEntry entry = (NodeEntry) nodes.get(key);
113             if (entry == null) {
114                 entry = new NodeEntry();
115                 entry.node = node;
116                 nodes.put(key, entry);
117                 nodeAdded(node);
118                 synchronized (clusterLock) {
119                     clusterLock.notifyAll();
120                 }
121             }
122             else {
123                 // has the data changed
124                 if (stateHasChanged(entry.node, node)) {
125                     entry.node = node;
126                     nodeUpdated(node);
127                 }
128             }
129 
130             // lets update the timer at which the node will be considered
131             // to be dead
132             entry.lastKeepAlive = getTimeMillis();
133         }
134     }
135 
136     public synchronized void shutdown(Node node) {
137         Destination key = node.getDestination();
138         nodes.remove(key);
139 
140         ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE);
141         // lets take a copy to make contention easier
142         Object[] array = listeners.toArray();
143         for (int i = 0, size = array.length; i < size; i++) {
144             ClusterListener listener = (ClusterListener) array[i];
145             listener.onNodeRemoved(event);
146         }
147     }
148 
149     public synchronized void checkForTimeouts() {
150         localNodePing.run();
151         long time = getTimeMillis();
152         for (Iterator iter = nodes.entrySet().iterator(); iter.hasNext();) {
153             Map.Entry entry = (Entry) iter.next();
154             NodeEntry nodeEntry = (NodeEntry) entry.getValue();
155             if (nodeEntry.lastKeepAlive + inactiveTime < time) {
156                 iter.remove();
157                 nodeFailed(nodeEntry.node);
158             }
159         }
160     }
161 
162     public TimerTask createTimerTask() {
163         return new TimerTask() {
164             public void run() {
165                 checkForTimeouts();
166             }
167         };
168     }
169 
170     public void addClusterListener(ClusterListener listener) {
171         listeners.add(listener);
172     }
173 
174     public void removeClusterListener(ClusterListener listener) {
175         listeners.remove(listener);
176     }
177 
178     protected void nodeAdded(Node node) {
179         ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE);
180         // lets take a copy to make contention easier
181         Object[] array = listeners.toArray();
182         for (int i = 0, size = array.length; i < size; i++) {
183             ClusterListener listener = (ClusterListener) array[i];
184             listener.onNodeAdd(event);
185         }
186         doElection();
187     }
188 
189     protected void nodeUpdated(Node node) {
190         ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.UPDATE_NODE);
191         // lets take a copy to make contention easier
192         Object[] array = listeners.toArray();
193         for (int i = 0, size = array.length; i < size; i++) {
194             ClusterListener listener = (ClusterListener) array[i];
195             listener.onNodeUpdate(event);
196         }
197     }
198 
199     protected void nodeFailed(Node node) {
200         ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.REMOVE_NODE);
201         // lets take a copy to make contention easier
202         Object[] array = listeners.toArray();
203         for (int i = 0, size = array.length; i < size; i++) {
204             ClusterListener listener = (ClusterListener) array[i];
205             listener.onNodeFailed(event);
206         }
207         doElection();
208     }
209 
210     protected void coordinatorChanged(Node node) {
211         ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ELECTED_COORDINATOR);
212         // lets take a copy to make contention easier
213         Object[] array = listeners.toArray();
214         for (int i = 0, size = array.length; i < size; i++) {
215             ClusterListener listener = (ClusterListener) array[i];
216             listener.onCoordinatorChanged(event);
217         }
218     }
219 
220     protected void doElection() {
221         if (electionStrategy != null) {
222             try {
223                 NodeImpl newElected = (NodeImpl) electionStrategy.doElection(cluster);
224                 if (newElected != null && !newElected.equals(coordinator)) {
225                     coordinator.setCoordinator(false);
226                     coordinator = newElected;
227                     coordinator.setCoordinator(true);
228                     coordinatorChanged(coordinator);
229                 }
230             }
231             catch (JMSException jmsEx) {
232                 log.error("do election failed", jmsEx);
233             }
234         }
235     }
236 
237     /***
238      * For performance we may wish to use a less granualar timing mechanism
239      * only updating the time every x millis since we're only using
240      * the time as a judge of when a node has not pinged for at least a few
241      * hundred millis etc.
242      */
243     protected long getTimeMillis() {
244         return System.currentTimeMillis();
245     }
246 
247     protected static class NodeEntry {
248         public Node node;
249         public long lastKeepAlive;
250     }
251 
252 
253     /***
254      * @return true if the node has changed state from the old in memory copy to the
255      *         newly arrived copy
256      */
257     protected boolean stateHasChanged(Node oldNode, Node newNode) {
258         Map oldState = oldNode.getState();
259         Map newState = newNode.getState();
260         if (oldState == newState) {
261             return false;
262         }
263         return oldState == null || newState == null || !oldState.equals(newState);
264     }
265 }