Clover coverage report - ActiveCluster - 1.1-SNAPSHOT
Coverage timestamp: Tue May 24 2005 08:48:28 BST
file stats: LOC: 191   Methods: 23
NCLOC: 136   Classes: 1
30 day Evaluation Version distributed via the Maven Jar Repository. Clover is not free. You have 30 days to evaluate it. Please visit http://www.thecortex.net/clover to obtain a licensed version of Clover
 
 Source file Conditionals Statements Methods TOTAL
DefaultCluster.java 0% 0% 0% 0%
coverage
 1   
 /** 
 2   
  * 
 3   
  * Copyright 2004 Protique Ltd
 4   
  * 
 5   
  * Licensed under the Apache License, Version 2.0 (the "License"); 
 6   
  * you may not use this file except in compliance with the License. 
 7   
  * You may obtain a copy of the License at 
 8   
  * 
 9   
  * http://www.apache.org/licenses/LICENSE-2.0
 10   
  * 
 11   
  * Unless required by applicable law or agreed to in writing, software
 12   
  * distributed under the License is distributed on an "AS IS" BASIS, 
 13   
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14   
  * See the License for the specific language governing permissions and 
 15   
  * limitations under the License. 
 16   
  * 
 17   
  **/
 18   
 package org.activecluster.impl;
 19   
 
 20   
 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
 21   
 import org.apache.commons.logging.Log;
 22   
 import org.apache.commons.logging.LogFactory;
 23   
 import org.activecluster.Cluster;
 24   
 import org.activecluster.ClusterListener;
 25   
 import org.activecluster.LocalNode;
 26   
 import org.activecluster.Service;
 27   
 
 28   
 import javax.jms.*;
 29   
 import java.io.Serializable;
 30   
 import java.util.Map;
 31   
 import java.util.Timer;
 32   
 
 33   
 /**
 34   
  * A default implementation of ActiveCluster which uses standard JMS operations
 35   
  *
 36   
  * @version $Revision: 1.1 $
 37   
  */
 38   
 public class DefaultCluster implements Cluster {
 39   
 
 40   
     private final static Log log = LogFactory.getLog(DefaultCluster.class);
 41   
 
 42   
     private StateServiceImpl stateService;
 43   
     private LocalNode localNode;
 44   
     private Topic destination;
 45   
     private Connection connection;
 46   
     private Session session;
 47   
     private MessageProducer producer;
 48   
     private MessageConsumer consumer;
 49   
     private Timer timer;
 50   
     private SynchronizedBoolean started = new SynchronizedBoolean(false);
 51   
     private Object clusterLock = new Object();
 52   
 
 53  0
     public DefaultCluster(final LocalNode localNode, Topic dataTopic, Topic destination, Connection connection, Session session,
 54   
                           MessageProducer producer, Timer timer, long inactiveTime) throws JMSException {
 55  0
         this.localNode = localNode;
 56  0
         this.destination = destination;
 57  0
         this.connection = connection;
 58  0
         this.session = session;
 59  0
         this.producer = producer;
 60  0
         this.timer = timer;
 61   
 
 62  0
         if (producer == null) {
 63  0
             throw new IllegalArgumentException("No producer specified!");
 64   
         }
 65   
 
 66   
         // now lets subscribe the service to the updates from the data topic
 67  0
         consumer = session.createConsumer(dataTopic, null, true);
 68   
 
 69  0
         log.info("Creating data consumer on topic: " + dataTopic);
 70   
 
 71  0
         this.stateService = new StateServiceImpl(this, clusterLock, new Runnable() {
 72  0
             public void run() {
 73  0
                 if (localNode instanceof ReplicatedLocalNode) {
 74  0
                     ((ReplicatedLocalNode) localNode).pingRemoteNodes();
 75   
                 }
 76   
             }
 77   
         }, timer, inactiveTime);
 78  0
         consumer.setMessageListener(new StateConsumer(stateService));
 79   
     }
 80   
 
 81  0
     public synchronized void addClusterListener(ClusterListener listener) {
 82  0
         stateService.addClusterListener(listener);
 83   
     }
 84   
 
 85  0
     public synchronized void removeClusterListener(ClusterListener listener) {
 86  0
         stateService.removeClusterListener(listener);
 87   
     }
 88   
 
 89  0
     public Topic getDestination() {
 90  0
         return destination;
 91   
     }
 92   
 
 93  0
     public LocalNode getLocalNode() {
 94  0
         return localNode;
 95   
     }
 96   
 
 97  0
     public Map getNodes() {
 98  0
         return stateService.getNodes();
 99   
     }
 100   
 
 101  0
     public synchronized void send(Destination destination, Message message) throws JMSException {
 102  0
         producer.send(destination, message);
 103   
     }
 104   
 
 105  0
     public synchronized MessageConsumer createConsumer(Destination destination) throws JMSException {
 106  0
         return getSession().createConsumer(destination);
 107   
     }
 108   
 
 109  0
     public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
 110  0
         return getSession().createConsumer(destination, selector);
 111   
     }
 112   
 
 113  0
     public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
 114  0
         return getSession().createConsumer(destination, selector, noLocal);
 115   
     }
 116   
 
 117  0
     public synchronized Message createMessage() throws JMSException {
 118  0
         return getSession().createMessage();
 119   
     }
 120   
 
 121  0
     public synchronized BytesMessage createBytesMessage() throws JMSException {
 122  0
         return getSession().createBytesMessage();
 123   
     }
 124   
 
 125  0
     public synchronized MapMessage createMapMessage() throws JMSException {
 126  0
         return getSession().createMapMessage();
 127   
     }
 128   
 
 129  0
     public synchronized ObjectMessage createObjectMessage() throws JMSException {
 130  0
         return getSession().createObjectMessage();
 131   
     }
 132   
 
 133  0
     public synchronized ObjectMessage createObjectMessage(Serializable object) throws JMSException {
 134  0
         return getSession().createObjectMessage(object);
 135   
     }
 136   
 
 137  0
     public synchronized StreamMessage createStreamMessage() throws JMSException {
 138  0
         return getSession().createStreamMessage();
 139   
     }
 140   
 
 141  0
     public synchronized TextMessage createTextMessage() throws JMSException {
 142  0
         return getSession().createTextMessage();
 143   
     }
 144   
 
 145  0
     public synchronized TextMessage createTextMessage(String text) throws JMSException {
 146  0
         return getSession().createTextMessage(text);
 147   
     }
 148   
 
 149  0
     public synchronized void start() throws JMSException {
 150  0
         if (started.commit(false, true)) {
 151  0
             connection.start();
 152   
         }
 153   
     }
 154   
 
 155  0
     public void stop() throws JMSException {
 156  0
         try {
 157  0
             if (localNode instanceof Service) {
 158  0
                 ((Service) localNode).stop();
 159   
             }
 160  0
             timer.cancel();
 161  0
             session.close();
 162  0
             connection.stop();
 163  0
             connection.close();
 164   
         }
 165   
         finally {
 166  0
             connection = null;
 167  0
             session = null;
 168   
         }
 169   
     }
 170   
 
 171  0
     public boolean waitForClusterToComplete(int expectedCount, long timeout) throws InterruptedException {
 172  0
         timeout = timeout > 0 ? timeout : Long.MAX_VALUE;
 173  0
         long waitTime = timeout;
 174  0
         long start = System.currentTimeMillis();
 175  0
         synchronized (clusterLock) {
 176  0
             while (stateService.getNodes().size() < expectedCount && started.get() && waitTime > 0) {
 177  0
                 clusterLock.wait(waitTime);
 178  0
                 waitTime = timeout - (System.currentTimeMillis() - start);
 179   
             }
 180   
         }
 181  0
         return stateService.getNodes().size() >= expectedCount;
 182   
     }
 183   
 
 184  0
     protected Session getSession() throws JMSException {
 185  0
         if (session == null) {
 186  0
             throw new JMSException("Cannot perform operation, this cluster connection is now closed");
 187   
         }
 188  0
         return session;
 189   
     }
 190   
 }
 191