/* * JBoss, Home of Professional Open Source. * Copyright 2006, Red Hat Middleware LLC, and individual contributors * as indicated by the @author tags. See the copyright.txt file in the * distribution for a full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package org.jboss.ha.framework.server; import java.util.Set; import java.util.Vector; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.io.Serializable; import javax.management.MBeanServer; import javax.management.ObjectName; import EDU.oswego.cs.dl.util.concurrent.Latch; import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; import org.jboss.logging.Logger; import org.jboss.ha.framework.interfaces.ClusterMergeStatus; import org.jboss.ha.framework.interfaces.ClusterNode; import org.jboss.ha.framework.interfaces.DistributedReplicantManager; import org.jboss.ha.framework.interfaces.HAPartition; /** * This class manages replicated objects. * * @author Bill Burke. * @author Sacha Labourey. * @author Scott.stark@jboss.org * @version $Revision: 61770 $ */ public class DistributedReplicantManagerImpl implements DistributedReplicantManagerImplMBean, HAPartition.HAMembershipExtendedListener, HAPartition.HAPartitionStateTransfer, AsynchEventHandler.AsynchEventProcessor { // Constants ----------------------------------------------------- protected final static String SERVICE_NAME = "DistributedReplicantManager"; // Attributes ---------------------------------------------------- protected static int threadID; protected ConcurrentReaderHashMap localReplicants = new ConcurrentReaderHashMap(); protected ConcurrentReaderHashMap replicants = new ConcurrentReaderHashMap(); protected ConcurrentReaderHashMap keyListeners = new ConcurrentReaderHashMap(); protected HashMap intraviewIdCache = new HashMap(); protected HAPartition partition; /** The handler used to send replicant change notifications asynchronously */ protected AsynchEventHandler asynchHandler; protected Logger log; protected MBeanServer mbeanserver; protected ObjectName jmxName; protected String nodeName = null; protected Latch partitionNameKnown = new Latch (); protected boolean trace; protected Class[] add_types=new Class[]{String.class, String.class, Serializable.class}; protected Class[] remove_types=new Class[]{String.class, String.class}; // Static -------------------------------------------------------- // Constructors -------------------------------------------------- /** * This class manages replicated objects through the given partition * * @param partition {@link HAPartition} through which replicated objects will be exchanged */ public DistributedReplicantManagerImpl(HAPartition partition, MBeanServer server) { this.partition = partition; this.mbeanserver = server; this.log = Logger.getLogger(DistributedReplicantManagerImpl.class.getName() + "." + partition.getPartitionName()); this.trace = log.isTraceEnabled(); } // Public -------------------------------------------------------- public void init() throws Exception { log.debug("registerRPCHandler"); partition.registerRPCHandler(SERVICE_NAME, this); log.debug("subscribeToStateTransferEvents"); partition.subscribeToStateTransferEvents(SERVICE_NAME, this); log.debug("registerMembershipListener"); partition.registerMembershipListener(this); // subscribed this "sub-service" of HAPartition with JMX // TODO: In the future (when state transfer issues will be completed), // we will need to redesign the way HAPartitions and its sub-protocols are // registered with JMX. They will most probably be independant JMX services. // String name = "jboss:service=" + SERVICE_NAME + ",partitionName=" + this.partition.getPartitionName(); this.jmxName = new javax.management.ObjectName(name); this.mbeanserver.registerMBean(this, jmxName); } public void start() throws Exception { this.nodeName = this.partition.getNodeName (); // Create the asynch listener handler thread asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler"); asynchHandler.start(); partitionNameKnown.release (); // partition name is now known! //log.info("mergemembers"); //mergeMembers(); } public void stop() throws Exception { // BES 200604 -- implication of NR's JBLCUSTER-38 change. Moving to // destroy allows restart of HAPartition while local registrations // survive -- stopping partition does not stop all registered services // e.g. ejbs; if we maintain their registrations we can pass them to // the cluster when we restart. However, we are leaving all the remote // replicants we have registered around, so they will still be included // as targets if anyone contacts our EJB while partition is stopped. // Probably OK; if they aren't valid the client will find this out. // NR 200505 : [JBCLUSTER-38] move to destroy // if (localReplicants != null) // { // synchronized(localReplicants) // { // while (! localReplicants.isEmpty ()) // { // this.remove ((String)localReplicants.keySet().iterator().next ()); // } // } // } // Stop the asynch handler thread try { asynchHandler.stop(); } catch( Exception e) { log.warn("Failed to stop asynchHandler", e); } // NR 200505 : [JBCLUSTER-38] move to destroy // this.mbeanserver.unregisterMBean (this.jmxName); } // NR 200505 : [JBCLUSTER-38] unbind at destroy public void destroy() throws Exception { // now partition can't be resuscitated, so remove local replicants if (localReplicants != null) { synchronized(localReplicants) { String[] keys = new String[localReplicants.size()]; localReplicants.keySet().toArray(keys); for(int n = 0; n < keys.length; n ++) { this.removeLocal(keys[n]); // channel is disconnected, so // don't try to notify cluster } } } this.mbeanserver.unregisterMBean (this.jmxName); partition.unregisterRPCHandler(SERVICE_NAME, this); partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this); partition.unregisterMembershipListener(this); } public String listContent () throws Exception { // we merge all replicants services: local only or not // java.util.Collection services = this.getAllServices (); StringBuffer result = new StringBuffer (); java.util.Iterator catsIter = services.iterator (); result.append ("
");
      
      while (catsIter.hasNext ())
      {
         String category = (String)catsIter.next ();
         HashMap content = (HashMap)this.replicants.get (category);
         if (content == null)
            content = new HashMap ();
         java.util.Iterator keysIter = content.keySet ().iterator ();
                  
         result.append ("-----------------------------------------------\n");
         result.append ("Service : ").append (category).append ("\n\n");
         
         Serializable local = lookupLocalReplicant(category);
         if (local == null)
            result.append ("\t- Service is *not* available locally\n");
         else
            result.append ("\t- Service *is* also available locally\n");

         while (keysIter.hasNext ())
         {
            String location = (String)keysIter.next ();            
            result.append ("\t- ").append(location).append ("\n");
         }
         
         result.append ("\n");
         
      }
      
      result.append ("
"); return result.toString (); } public String listXmlContent () throws Exception { // we merge all replicants services: local only or not // java.util.Collection services = this.getAllServices (); StringBuffer result = new StringBuffer (); result.append ("\n"); java.util.Iterator catsIter = services.iterator (); while (catsIter.hasNext ()) { String category = (String)catsIter.next (); HashMap content = (HashMap)this.replicants.get (category); if (content == null) content = new HashMap (); java.util.Iterator keysIter = content.keySet ().iterator (); result.append ("\t\n"); result.append ("\t\t").append (category).append ("\n"); Serializable local = lookupLocalReplicant(category); if (local != null) { result.append ("\t\t\n"); result.append ("\t\t\t").append (this.nodeName).append ("\n"); result.append ("\t\t\n"); } while (keysIter.hasNext ()) { String location = (String)keysIter.next (); result.append ("\t\t\n"); result.append ("\t\t\t").append (location).append ("\n"); result.append ("\t\t\n"); } result.append ("\t\n"); } result.append ("\n"); return result.toString (); } // HAPartition.HAPartitionStateTransfer implementation ---------------------------------------------- public Serializable getCurrentState () { java.util.Collection services = this.getAllServices (); HashMap result = new HashMap (); java.util.Iterator catsIter = services.iterator (); while (catsIter.hasNext ()) { String category = (String)catsIter.next (); HashMap content = (HashMap)this.replicants.get (category); if (content == null) content = new HashMap (); else content = (HashMap)content.clone (); Serializable local = lookupLocalReplicant(category); if (local != null) content.put (this.nodeName, local); result.put (category, content); } // we add the intraviewid cache to the global result // Object[] globalResult = new Object[] {result, intraviewIdCache}; return globalResult; } public void setCurrentState(Serializable newState) { Object[] globalState = (Object[])newState; HashMap map = (HashMap)globalState[0]; this.replicants.putAll(map); this.intraviewIdCache = (HashMap)globalState[1]; if( trace ) { log.trace(nodeName + ": received new state, will republish local replicants"); } MembersPublisher publisher = new MembersPublisher(); publisher.start(); } public Collection getAllServices () { HashSet services = new HashSet(); services.addAll (localReplicants.keySet ()); services.addAll (replicants.keySet ()); return services; } // HAPartition.HAMembershipListener implementation ---------------------------------------------- public void membershipChangedDuringMerge(Vector deadMembers, Vector newMembers, Vector allMembers, Vector originatingGroups) { // Here we only care about deadMembers. Purge all replicant lists of deadMembers // and then notify all listening nodes. // log.info("Merging partitions..."); log.info("Dead members: " + deadMembers.size()); log.info("Originating groups: " + originatingGroups); purgeDeadMembers(deadMembers); if (newMembers.size() > 0) { new MergeMembers().start(); } } public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers) { // Here we only care about deadMembers. Purge all replicant lists of deadMembers // and then notify all listening nodes. // log.info("I am (" + nodeName + ") received membershipChanged event:"); log.info("Dead members: " + deadMembers.size() + " (" + deadMembers + ")"); log.info("New Members : " + newMembers.size() + " (" + newMembers + ")"); log.info("All Members : " + allMembers.size() + " (" + allMembers + ")"); purgeDeadMembers(deadMembers); // we don't need to merge members anymore } // AsynchEventHandler.AsynchEventProcessor implementation ----------------- public void processEvent(Object event) { KeyChangeEvent kce = (KeyChangeEvent) event; notifyKeyListeners(kce.key, kce.replicants); } static class KeyChangeEvent { String key; List replicants; } // DistributedReplicantManager implementation ---------------------------------------------- public void add(String key, Serializable replicant) throws Exception { if( trace ) log.trace("add, key="+key+", value="+replicant); partitionNameKnown.acquire (); // we don't propagate until our name is known Object[] args = {key, this.nodeName, replicant}; partition.callMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true); synchronized(localReplicants) { localReplicants.put(key, replicant); notifyKeyListeners(key, lookupReplicants(key)); } } public void remove(String key) throws Exception { partitionNameKnown.acquire (); // we don't propagate until our name is known // optimisation: we don't make a costly network call // if there is nothing to remove if (localReplicants.containsKey(key)) { Object[] args = {key, this.nodeName}; partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true); removeLocal(key); } } protected void removeLocal(String key) { synchronized(localReplicants) { localReplicants.remove(key); List result = lookupReplicants(key); if (result == null) result = new ArrayList (); // don't pass null but an empty list notifyKeyListeners(key, result); } } public Serializable lookupLocalReplicant(String key) { return (Serializable)localReplicants.get(key); } public List lookupReplicants(String key) { Serializable local = lookupLocalReplicant(key); HashMap replicant = (HashMap)replicants.get(key); if (replicant == null && local == null) return null; ArrayList rtn = new ArrayList(); if (replicant == null) { if (local != null) rtn.add(local); } else { // JBAS-2677. Put the replicants in view order. ClusterNode[] nodes = partition.getClusterNodes(); String replNode; Object replVal; for (int i = 0; i < nodes.length; i++) { replNode = nodes[i].getName(); if (local != null && nodeName.equals(replNode)) { rtn.add(local); continue; } replVal = replicant.get(replNode); if (replVal != null) rtn.add(replVal); } } return rtn; } public List lookupReplicantsNodeNames(String key) { boolean locallyReplicated = localReplicants.containsKey (key); HashMap replicant = (HashMap)replicants.get(key); if (replicant == null && !locallyReplicated) return null; ArrayList rtn = new ArrayList(); if (replicant == null) { if (locallyReplicated) rtn.add(this.nodeName); } else { // JBAS-2677. Put the replicants in view order. Set keys = replicant.keySet(); ClusterNode[] nodes = partition.getClusterNodes(); String keyOwner; for (int i = 0; i < nodes.length; i++) { keyOwner = nodes[i].getName(); if (locallyReplicated && nodeName.equals(keyOwner)) { rtn.add(this.nodeName); continue; } if (keys.contains(keyOwner)) rtn.add(keyOwner); } } return rtn; } public void registerListener(String key, DistributedReplicantManager.ReplicantListener subscriber) { synchronized(keyListeners) { ArrayList listeners = (ArrayList)keyListeners.get(key); if (listeners == null) { listeners = new ArrayList(); keyListeners.put(key, listeners); } listeners.add(subscriber); } } public void unregisterListener(String key, DistributedReplicantManager.ReplicantListener subscriber) { synchronized(keyListeners) { ArrayList listeners = (ArrayList)keyListeners.get (key); if (listeners == null) return; listeners.remove(subscriber); if (listeners.size() == 0) keyListeners.remove(key); } } public int getReplicantsViewId(String key) { Integer result = (Integer)this.intraviewIdCache.get (key); if (result == null) return 0; else return result.intValue (); } public boolean isMasterReplica (String key) { if( trace ) log.trace("isMasterReplica, key="+key); // if I am not a replicat, I cannot be the master... // if (!localReplicants.containsKey (key)) { if( trace ) log.trace("no localReplicants, key="+key+", isMasterReplica=false"); return false; } Vector allNodes = this.partition.getCurrentView (); HashMap repForKey = (HashMap)replicants.get(key); if (repForKey==null) { if( trace ) log.trace("no replicants, key="+key+", isMasterReplica=true"); return true; } Vector replicaNodes = new Vector ((repForKey).keySet ()); boolean isMasterReplica = false; for (int i=0; i replicants.size ()) { // The merge process needs to remove some (now) // unexisting keys // for (int currentKeysId=0, currentKeysMax=currentStatus.size (); currentKeysId