/* * JBoss, Home of Professional Open Source. * Copyright 2006, Red Hat Middleware LLC, and individual contributors * as indicated by the @author tags. See the copyright.txt file in the * distribution for a full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package org.jboss.ha.framework.server; import java.util.Set; import java.util.Vector; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.io.Serializable; import javax.management.MBeanServer; import javax.management.ObjectName; import EDU.oswego.cs.dl.util.concurrent.Latch; import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; import org.jboss.logging.Logger; import org.jboss.ha.framework.interfaces.ClusterMergeStatus; import org.jboss.ha.framework.interfaces.ClusterNode; import org.jboss.ha.framework.interfaces.DistributedReplicantManager; import org.jboss.ha.framework.interfaces.HAPartition; /** * This class manages replicated objects. * * @author Bill Burke. * @author Sacha Labourey. * @author Scott.stark@jboss.org * @version $Revision: 61770 $ */ public class DistributedReplicantManagerImpl implements DistributedReplicantManagerImplMBean, HAPartition.HAMembershipExtendedListener, HAPartition.HAPartitionStateTransfer, AsynchEventHandler.AsynchEventProcessor { // Constants ----------------------------------------------------- protected final static String SERVICE_NAME = "DistributedReplicantManager"; // Attributes ---------------------------------------------------- protected static int threadID; protected ConcurrentReaderHashMap localReplicants = new ConcurrentReaderHashMap(); protected ConcurrentReaderHashMap replicants = new ConcurrentReaderHashMap(); protected ConcurrentReaderHashMap keyListeners = new ConcurrentReaderHashMap(); protected HashMap intraviewIdCache = new HashMap(); protected HAPartition partition; /** The handler used to send replicant change notifications asynchronously */ protected AsynchEventHandler asynchHandler; protected Logger log; protected MBeanServer mbeanserver; protected ObjectName jmxName; protected String nodeName = null; protected Latch partitionNameKnown = new Latch (); protected boolean trace; protected Class[] add_types=new Class[]{String.class, String.class, Serializable.class}; protected Class[] remove_types=new Class[]{String.class, String.class}; // Static -------------------------------------------------------- // Constructors -------------------------------------------------- /** * This class manages replicated objects through the given partition * * @param partition {@link HAPartition} through which replicated objects will be exchanged */ public DistributedReplicantManagerImpl(HAPartition partition, MBeanServer server) { this.partition = partition; this.mbeanserver = server; this.log = Logger.getLogger(DistributedReplicantManagerImpl.class.getName() + "." + partition.getPartitionName()); this.trace = log.isTraceEnabled(); } // Public -------------------------------------------------------- public void init() throws Exception { log.debug("registerRPCHandler"); partition.registerRPCHandler(SERVICE_NAME, this); log.debug("subscribeToStateTransferEvents"); partition.subscribeToStateTransferEvents(SERVICE_NAME, this); log.debug("registerMembershipListener"); partition.registerMembershipListener(this); // subscribed this "sub-service" of HAPartition with JMX // TODO: In the future (when state transfer issues will be completed), // we will need to redesign the way HAPartitions and its sub-protocols are // registered with JMX. They will most probably be independant JMX services. // String name = "jboss:service=" + SERVICE_NAME + ",partitionName=" + this.partition.getPartitionName(); this.jmxName = new javax.management.ObjectName(name); this.mbeanserver.registerMBean(this, jmxName); } public void start() throws Exception { this.nodeName = this.partition.getNodeName (); // Create the asynch listener handler thread asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler"); asynchHandler.start(); partitionNameKnown.release (); // partition name is now known! //log.info("mergemembers"); //mergeMembers(); } public void stop() throws Exception { // BES 200604 -- implication of NR's JBLCUSTER-38 change. Moving to // destroy allows restart of HAPartition while local registrations // survive -- stopping partition does not stop all registered services // e.g. ejbs; if we maintain their registrations we can pass them to // the cluster when we restart. However, we are leaving all the remote // replicants we have registered around, so they will still be included // as targets if anyone contacts our EJB while partition is stopped. // Probably OK; if they aren't valid the client will find this out. // NR 200505 : [JBCLUSTER-38] move to destroy // if (localReplicants != null) // { // synchronized(localReplicants) // { // while (! localReplicants.isEmpty ()) // { // this.remove ((String)localReplicants.keySet().iterator().next ()); // } // } // } // Stop the asynch handler thread try { asynchHandler.stop(); } catch( Exception e) { log.warn("Failed to stop asynchHandler", e); } // NR 200505 : [JBCLUSTER-38] move to destroy // this.mbeanserver.unregisterMBean (this.jmxName); } // NR 200505 : [JBCLUSTER-38] unbind at destroy public void destroy() throws Exception { // now partition can't be resuscitated, so remove local replicants if (localReplicants != null) { synchronized(localReplicants) { String[] keys = new String[localReplicants.size()]; localReplicants.keySet().toArray(keys); for(int n = 0; n < keys.length; n ++) { this.removeLocal(keys[n]); // channel is disconnected, so // don't try to notify cluster } } } this.mbeanserver.unregisterMBean (this.jmxName); partition.unregisterRPCHandler(SERVICE_NAME, this); partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this); partition.unregisterMembershipListener(this); } public String listContent () throws Exception { // we merge all replicants services: local only or not // java.util.Collection services = this.getAllServices (); StringBuffer result = new StringBuffer (); java.util.Iterator catsIter = services.iterator (); result.append ("
"); while (catsIter.hasNext ()) { String category = (String)catsIter.next (); HashMap content = (HashMap)this.replicants.get (category); if (content == null) content = new HashMap (); java.util.Iterator keysIter = content.keySet ().iterator (); result.append ("-----------------------------------------------\n"); result.append ("Service : ").append (category).append ("\n\n"); Serializable local = lookupLocalReplicant(category); if (local == null) result.append ("\t- Service is *not* available locally\n"); else result.append ("\t- Service *is* also available locally\n"); while (keysIter.hasNext ()) { String location = (String)keysIter.next (); result.append ("\t- ").append(location).append ("\n"); } result.append ("\n"); } result.append (""); return result.toString (); } public String listXmlContent () throws Exception { // we merge all replicants services: local only or not // java.util.Collection services = this.getAllServices (); StringBuffer result = new StringBuffer (); result.append ("