/* * JBoss, Home of Professional Open Source. * Copyright 2006, Red Hat Middleware LLC, and individual contributors * as indicated by the @author tags. See the copyright.txt file in the * distribution for a full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package org.jboss.ha.framework.server; import java.lang.reflect.Method; import java.net.InetAddress; import java.rmi.dgc.VMID; import java.rmi.server.UID; import java.util.Iterator; import java.util.Set; import java.util.Vector; import javax.management.Attribute; import javax.management.AttributeList; import javax.management.InstanceNotFoundException; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.management.ReflectionException; import org.jboss.ha.framework.interfaces.HAPartition; import org.jboss.naming.NamingServiceMBean; import org.jboss.system.ServiceMBean; import org.jboss.system.ServiceMBeanSupport; import org.jboss.system.server.ServerConfigUtil; import org.jgroups.Channel; import org.jgroups.JChannel; import org.jgroups.Version; import org.jgroups.debug.Debugger; import org.jgroups.jmx.JChannelFactoryMBean; import org.jgroups.jmx.JmxConfigurator; import org.w3c.dom.Attr; import org.w3c.dom.Element; import org.w3c.dom.NamedNodeMap; import org.w3c.dom.Node; import org.w3c.dom.NodeList; /** * Management Bean for Cluster HAPartitions. It will start a JGroups * channel and initialize the ReplicantManager and DistributedStateService. * * @author Bill Burke. * @author Sacha Labourey. * @version $Revision: 62947 $ */ public class ClusterPartition extends ServiceMBeanSupport implements ClusterPartitionMBean { // Constants ----------------------------------------------------- public static final String JGROUPS_JMX_DOMAIN = "jboss.jgroups"; public static final String CHANNEL_JMX_ATTRIBUTES = "type=channel,cluster="; public static final String PROTOCOL_JMX_ATTRIBUTES = "type=protocol,cluster="; // Attributes ---------------------------------------------------- protected String partitionName = ServerConfigUtil.getDefaultPartitionName(); protected String jgProps = "UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=64;" + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + "PING(timeout=2000;num_initial_members=3):" + "MERGE2(min_interval=5000;max_interval=10000):" + "FD:" + "VERIFY_SUSPECT(timeout=1500):" + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):" + "UNICAST(timeout=600,1200,2400):" + "pbcast.STABLE(desired_avg_gossip=20000):" + "FRAG(down_thread=false;up_thread=false;frag_size=8192):" + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + "shun=false;print_local_addr=true):" + "pbcast.STATE_TRANSFER"; protected HAPartitionImpl partition; protected boolean deadlock_detection = false; protected boolean allow_sync_events = false; protected JChannelFactoryMBean multiplexer = null; protected String stackName = null; protected org.jgroups.JChannel channel; protected Debugger debugger=null; protected boolean use_debugger=false; protected String nodeName = null; protected InetAddress nodeAddress = null; /** Number of milliseconds to wait until state has been transferred. Increase this value for large states * 0 = wait forever */ protected long state_transfer_timeout=60000; protected long method_call_timeout=60000; protected boolean channelRegistered; protected boolean protocolsRegistered; // Static -------------------------------------------------------- // Constructors -------------------------------------------------- // Public -------------------------------------------------------- // ClusterPartitionMBean implementation ---------------------------------------------- public String getPartitionName() { return partitionName; } public void setPartitionName(String newName) { partitionName = newName; } public String getPartitionProperties() { // The channel knows best if (channel != null) return channel.getProperties(); if (multiplexer == null && stackName == null) return jgProps; // We are configured for the multiplexer but don't know // the details of the stack yet return null; } public void setPartitionProperties(String newProps) { jgProps = newProps; } /** Convert a list of elements to the JG property string */ public void setPartitionConfig(Element config) { StringBuffer buffer = new StringBuffer(); NodeList stack = config.getChildNodes(); int length = stack.getLength(); for(int s = 0; s < length; s ++) { Node node = stack.item(s); if( node.getNodeType() != Node.ELEMENT_NODE ) continue; Element tag = (Element) node; String protocol = tag.getTagName(); buffer.append(protocol); NamedNodeMap attrs = tag.getAttributes(); int attrLength = attrs.getLength(); if( attrLength > 0 ) buffer.append('('); for(int a = 0; a < attrLength; a ++) { Attr attr = (Attr) attrs.item(a); String name = attr.getName(); String value = attr.getValue(); buffer.append(name); buffer.append('='); buffer.append(value); if( a < attrLength-1 ) buffer.append(';'); } if( attrLength > 0 ) buffer.append(')'); buffer.append(':'); } // Remove the trailing ':' buffer.setLength(buffer.length()-1); this.jgProps = buffer.toString(); log.debug("Setting JGProps from xml to: "+jgProps); } /** * Uniquely identifies this node. MUST be unique accros the whole cluster! * Cannot be changed once the partition has been started */ public String getNodeName() { return this.nodeName; } public void setNodeName(String node) throws Exception { if (this.getState() == ServiceMBean.CREATED || this.getState() == ServiceMBean.STARTED || this.getState() == ServiceMBean.STARTING) { throw new Exception ("Node name cannot be changed once the partition has been started"); } else { this.nodeName = node; } } public InetAddress getNodeAddress() { return nodeAddress; } public void setNodeAddress(InetAddress address) { this.nodeAddress = address; } public String getJGroupsVersion() { return Version.version + "( " + Version.cvs + ")"; } public long getStateTransferTimeout() { return state_transfer_timeout; } public void setStateTransferTimeout(long timeout) { this.state_transfer_timeout=timeout; } public long getMethodCallTimeout() { return method_call_timeout; } public void setMethodCallTimeout(long timeout) { this.method_call_timeout=timeout; } public JChannelFactoryMBean getMultiplexer() { return multiplexer; } public void setMultiplexer(JChannelFactoryMBean muxFactory) { this.multiplexer = muxFactory; } public String getMultiplexerStack() { return stackName; } public void setMultiplexerStack(String stackName) { this.stackName = stackName; } // public boolean getChannelDebugger() // { // return this.use_debugger; // } // // public void setChannelDebugger(boolean flag) // { // this.use_debugger=flag; // } public boolean getDeadlockDetection() { return deadlock_detection; } public void setDeadlockDetection(boolean doit) { deadlock_detection = doit; } public boolean getAllowSynchronousMembershipNotifications() { return allow_sync_events; } public void setAllowSynchronousMembershipNotifications(boolean allowSync) { this.allow_sync_events = allowSync; } protected ObjectName getObjectName(MBeanServer server, ObjectName name) throws MalformedObjectNameException { return name == null ? OBJECT_NAME : name; } public HAPartition getHAPartition () { return this.partition; } /** Return the list of member nodes that built from the current view * @return A Vector Strings representing the host:port values of the nodes */ public Vector getCurrentView() { return partition.getCurrentView(); } // ServiceMBeanSupport overrides --------------------------------------------------- public String getName() { return partitionName; } protected void createService() throws Exception { log.debug("Creating JGroups JChannel"); if (stackName != null && multiplexer != null) { this.channel = (JChannel) multiplexer.createMultiplexerChannel(stackName, getPartitionName()); } else { this.channel = new org.jgroups.JChannel(jgProps); // JBAS-4406 Hack to register the channel registerChannelInJmx(); } if(use_debugger && debugger == null) { debugger=new Debugger(channel); debugger.start(); } channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE); channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE); log.debug("Creating HAPartition"); partition = createPartition(); // JBAS-2769 Init partition in create log.debug("Initing HAPartition: " + partition); partition.init(); log.debug("HAPartition initialized"); } /** * Extension point meant for test cases; instantiates the HAPartitionImpl. * Test cases can instantiate their own subclass of HAPartitionImpl. */ protected HAPartitionImpl createPartition() throws Exception { HAPartitionImpl result = new HAPartitionImpl(partitionName, channel, deadlock_detection, getServer()); result.setStateTransferTimeout(this.state_transfer_timeout); result.setMethodCallTimeout(this.method_call_timeout); return result; } protected void startService() throws Exception { // We push the independant name in the protocol stack // before it is connected to the cluster // if (this.nodeName == null || "".equals(this.nodeName)) this.nodeName = generateUniqueNodeName (); java.util.HashMap staticNodeName = new java.util.HashMap(); staticNodeName.put("additional_data", this.nodeName.getBytes()); // JBAS-4258 -- invoke via reflection to allow upgrade to JGroups 2.5 Class[] paramTypes = new Class[]{org.jgroups.Event.class}; Method downMethod = JChannel.class.getDeclaredMethod("down", paramTypes); Object[] params = { new org.jgroups.Event(org.jgroups.Event.CONFIG, staticNodeName) }; downMethod.invoke(channel, params); this.channel.getProtocolStack().flushEvents(); // temporary fix for JG bug (808170) TODO: REMOVE ONCE JGROUPS IS FIXED log.debug("Starting ClusterPartition: " + partitionName); channel.connect(partitionName); try { log.debug("Starting channel"); partition.startPartition(); log.debug("Started ClusterPartition: " + partitionName); } catch (Exception e) { log.debug("Caught exception after channel connected; closing channel -- " + e.getLocalizedMessage()); channel.disconnect(); throw e; } } protected void stopService() throws Exception { stopChannelDebugger(); log.debug("Stopping ClusterPartition: " + partitionName); partition.closePartition(); log.debug("Stopped ClusterPartition: " + partitionName); } // NR 200505 : [JBCLUSTER-38] close partition just disconnect from channel // destroy close it. protected void destroyService() throws Exception { log.debug("Destroying ClusterPartition: " + partitionName); partition.destroyPartition(); // JBAS-4406 Hack unregisterChannelFromJmx(); log.debug("Destroyed ClusterPartition: " + partitionName); } protected String generateUniqueNodeName () throws Exception { // we first try to find a simple meaningful name: // 1st) "local-IP:JNDI_PORT" if JNDI is running on this machine // 2nd) "local-IP:JMV_GUID" otherwise // 3rd) return a fully GUID-based representation // // Before anything we determine the local host IP (and NOT name as this could be // resolved differently by other nodes...) // But use the specified node address for multi-homing String hostIP = null; InetAddress address = ServerConfigUtil.fixRemoteAddress(nodeAddress); if (address == null) { log.debug ("unable to create a GUID for this cluster, check network configuration is correctly setup (getLocalHost has returned an exception)"); log.debug ("using a full GUID strategy"); return new VMID().toString(); } else { hostIP = address.getHostAddress(); } // 1st: is JNDI up and running? // try { AttributeList al = this.server.getAttributes(NamingServiceMBean.OBJECT_NAME, new String[] {"State", "Port"}); int status = ((Integer)((Attribute)al.get(0)).getValue()).intValue(); if (status == ServiceMBean.STARTED) { // we can proceed with the JNDI trick! int port = ((Integer)((Attribute)al.get(1)).getValue()).intValue(); return hostIP + ":" + port; } else { log.debug("JNDI has been found but the service wasn't started so we cannot " + "be entirely sure we are the only one that wants to use this PORT " + "as a GUID on this host."); } } catch (InstanceNotFoundException e) { log.debug ("JNDI not running here, cannot use this strategy to find a node GUID for the cluster"); } catch (ReflectionException e) { log.debug ("JNDI querying has returned an exception, cannot use this strategy to find a node GUID for the cluster"); } // 2nd: host-GUID strategy // String uid = new UID().toString(); return hostIP + ":" + uid; } public String showHistory () { StringBuffer buff = new StringBuffer(); Vector data = new Vector (this.partition.history); for (java.util.Iterator row = data.iterator(); row.hasNext();) { String info = (String) row.next(); buff.append(info).append("\n"); } return buff.toString(); } public String showHistoryAsXML () { StringBuffer buff = new StringBuffer(); buff.append("\n"); Vector data = new Vector (this.partition.history); for (java.util.Iterator row = data.iterator(); row.hasNext();) { buff.append(" \n "); String info = (String) row.next(); buff.append(info); buff.append("\n \n"); } buff.append("\n"); return buff.toString(); } public void startChannelDebugger() { startChannelDebugger(false); } public void startChannelDebugger(boolean accumulative) { if(debugger == null) { debugger=new Debugger(this.channel, accumulative); debugger.start(); } } public void stopChannelDebugger() { if(debugger != null) { // debugger.stop(); // uncomment when new JGroups version is available debugger=null; } } protected void registerChannelInJmx() { if (server != null) { try { String protocolPrefix = JGROUPS_JMX_DOMAIN + ":" + PROTOCOL_JMX_ATTRIBUTES + getPartitionName(); JmxConfigurator.registerProtocols(server, channel, protocolPrefix); protocolsRegistered = true; String name = JGROUPS_JMX_DOMAIN + ":" + CHANNEL_JMX_ATTRIBUTES + getPartitionName(); JmxConfigurator.registerChannel(channel, server, name); channelRegistered = true; } catch (Exception e) { log.error("Caught exception registering channel in JXM", e); } } } protected void unregisterChannelFromJmx() { ObjectName on = null; if (channelRegistered) { // Unregister the channel itself try { on = new ObjectName(JGROUPS_JMX_DOMAIN + ":" + CHANNEL_JMX_ATTRIBUTES + getPartitionName()); server.unregisterMBean(on); } catch (Exception e) { if (on != null) log.error("Caught exception unregistering channel at " + on, e); else log.error("Caught exception unregistering channel", e); } } if (protocolsRegistered) { // Unregister the protocols try { on = new ObjectName(JGROUPS_JMX_DOMAIN + ":*," + PROTOCOL_JMX_ATTRIBUTES + getPartitionName()); Set mbeans=server.queryNames(on, null); if(mbeans != null) { for(Iterator it=mbeans.iterator(); it.hasNext();) { server.unregisterMBean((ObjectName)it.next()); } } } catch (Exception e) { if (on != null) log.error("Caught exception unregistering protocols at " + on, e); else log.error("Caught exception unregistering protocols", e); } } } }