/* * JBoss, Home of Professional Open Source. * Copyright 2006, Red Hat Middleware LLC, and individual contributors * as indicated by the @author tags. See the copyright.txt file in the * distribution for a full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package org.jboss.ha.hasessionstate.server; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; import java.util.Vector; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.Name; import javax.naming.NameNotFoundException; import javax.naming.Reference; import javax.naming.StringRefAddr; import org.jboss.ha.framework.interfaces.HAPartition; import org.jboss.ha.hasessionstate.interfaces.PackagedSession; import org.jboss.logging.Logger; import org.jboss.naming.NonSerializableFactory; import org.jboss.system.server.ServerConfigUtil; import EDU.oswego.cs.dl.util.concurrent.Mutex; /** * Default implementation of HASessionState * * @see org.jboss.ha.hasessionstate.interfaces.HASessionState * @author sacha.labourey@cogito-info.ch * @author Bill Burke * @version $Revision: 57188 $ * *

Revisions:
*

2002/01/09: billb *

    *
  1. ripped out sub partitioning stuff. It really belongs as a subclass of HAPartition *
* */ public class HASessionStateImpl implements org.jboss.ha.hasessionstate.interfaces.HASessionState, HAPartition.HAPartitionStateTransfer { protected String _sessionStateName; protected Logger log; protected HAPartition hapGeneral; protected String sessionStateIdentifier; protected String myNodeName; protected long beanCleaningDelay; protected String haPartitionName; protected String haPartitionJndiName; protected final String DEFAULT_PARTITION_JNDI_NAME = ServerConfigUtil.getDefaultPartitionName(); protected final String JNDI_FOLDER_NAME_FOR_HASESSIONSTATE = org.jboss.metadata.ClusterConfigMetaData.JNDI_PREFIX_FOR_SESSION_STATE; protected final String JNDI_FOLDER_NAME_FOR_HAPARTITION = "/HAPartition/"; protected final long MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE = 30L * 60L * 1000L; // 30 minutes... should be set externally or use cache settings protected static final String HA_SESSION_STATE_STATE_TRANSFER = "HASessionStateTransfer"; protected HashMap locks = new HashMap (); public HASessionStateImpl () {} public HASessionStateImpl (String sessionStateName, HAPartition partition, long beanCleaningDelay) { this(sessionStateName, partition.getPartitionName(), beanCleaningDelay); this.hapGeneral = partition; } public HASessionStateImpl (String sessionStateName, String mainHAPartitionName, long beanCleaningDelay) { if (sessionStateName == null) this._sessionStateName = org.jboss.metadata.ClusterConfigMetaData.DEFAULT_SESSION_STATE_NAME; else this._sessionStateName = sessionStateName; this.sessionStateIdentifier = "SessionState-'" + this._sessionStateName + "'"; if (mainHAPartitionName == null) haPartitionName = DEFAULT_PARTITION_JNDI_NAME; else haPartitionName = mainHAPartitionName; haPartitionJndiName = JNDI_FOLDER_NAME_FOR_HAPARTITION + haPartitionName; if (beanCleaningDelay > 0) this.beanCleaningDelay = beanCleaningDelay; else this.beanCleaningDelay = MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE; } public void init () throws Exception { this.log = Logger.getLogger(HASessionStateImpl.class.getName() + "." + this._sessionStateName); // BES 20060416 -- if people used an old config, we may not // have been passed the partition, so have to find in JNDI // JNDI work s/b done in start(), but we have no choice, as // we must register for state transfer in init if (this.hapGeneral == null) { Context ctx = new InitialContext (); this.hapGeneral = (HAPartition)ctx.lookup (haPartitionJndiName); } if (hapGeneral == null) log.error ("Unable to get default HAPartition under name '" + haPartitionJndiName + "'."); this.hapGeneral.registerRPCHandler (this.sessionStateIdentifier, this); this.hapGeneral.subscribeToStateTransferEvents (HA_SESSION_STATE_STATE_TRANSFER, this); } public void start () throws Exception { this.myNodeName = this.hapGeneral.getNodeName (); log.debug ("HASessionState node name : " + this.myNodeName ); // BES 4/7/06 clean up lifecycle; move this to start, as it can't be // called until startService due to JNDI dependency Context ctx = new InitialContext (); this.bind (this._sessionStateName, this, HASessionStateImpl.class, ctx); } protected void bind (String jndiName, Object who, Class classType, Context ctx) throws Exception { // Ah ! This service isn't serializable, so we use a helper class // NonSerializableFactory.bind (jndiName, who); Name n = ctx.getNameParser ("").parse (jndiName); while (n.size () > 1) { String ctxName = n.get (0); try { ctx = (Context)ctx.lookup (ctxName); } catch (NameNotFoundException e) { log.debug ("creating Subcontext" + ctxName); ctx = ctx.createSubcontext (ctxName); } n = n.getSuffix (1); } // The helper class NonSerializableFactory uses address type nns, we go on to // use the helper class to bind the service object in JNDI // StringRefAddr addr = new StringRefAddr ("nns", jndiName); Reference ref = new Reference ( classType.getName (), addr, NonSerializableFactory.class.getName (), null); ctx.bind (n.get (0), ref); } public void stop () throws Exception { purgeState(); // Unbind so we can rebind if restarted try { Context ctx = new InitialContext (); ctx.unbind (this._sessionStateName); NonSerializableFactory.unbind (this._sessionStateName); } catch (Exception ignored) {} } public void destroy() throws Exception { // Remove ref to ourself from HAPartition this.hapGeneral.unregisterRPCHandler(this.sessionStateIdentifier, this); this.hapGeneral.unsubscribeFromStateTransferEvents(HA_SESSION_STATE_STATE_TRANSFER, this); } public String getNodeName () { return this.myNodeName ; } // Used for Session state transfer // public Serializable getCurrentState () { log.debug ("Building and returning state of HASessionState"); if (this.appSessions == null) this.appSessions = new Hashtable (); Serializable result = null; synchronized (this.lockAppSession) { this.purgeState (); try { result = deflate (this.appSessions); } catch (Exception e) { log.error("operation failed", e); } } return result; } public void setCurrentState (Serializable newState) { log.debug ("Receiving state of HASessionState"); if (this.appSessions == null) this.appSessions = new Hashtable (); synchronized (this.lockAppSession) { try { this.appSessions.clear (); // hope to facilitate the job of the GC this.appSessions = (Hashtable)inflate ((byte[])newState); } catch (Exception e) { log.error("operation failed", e); } } } public void purgeState () { synchronized (this.lockAppSession) { for (Enumeration keyEnum = this.appSessions.keys (); keyEnum.hasMoreElements ();) { // trip in apps.. // Object key = keyEnum.nextElement (); Hashtable value = (Hashtable)this.appSessions.get (key); long currentTime = System.currentTimeMillis (); for (Iterator iterSessions = value.values ().iterator (); iterSessions.hasNext ();) { PackagedSession ps = (PackagedSession)iterSessions.next (); if ( (currentTime - ps.unmodifiedExistenceInVM ()) > beanCleaningDelay ) iterSessions.remove (); } } } } protected byte[] deflate (Object object) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream (); Deflater def = new Deflater (java.util.zip.Deflater.BEST_COMPRESSION); DeflaterOutputStream dos = new DeflaterOutputStream (baos, def); ObjectOutputStream out = new ObjectOutputStream (dos); out.writeObject (object); out.close (); dos.finish (); dos.close (); return baos.toByteArray (); } protected Object inflate (byte[] compressedContent) throws IOException { if (compressedContent==null) return null; try { ObjectInputStream in = new ObjectInputStream (new InflaterInputStream (new ByteArrayInputStream (compressedContent))); Object object = in.readObject (); in.close (); return object; } catch (Exception e) { throw new IOException (e.toString ()); } } protected Hashtable appSessions = new Hashtable (); protected Object lockAppSession = new Object (); protected Hashtable getHashtableForApp (String appName) { if (this.appSessions == null) this.appSessions = new Hashtable (); // should never happen though... Hashtable result = null; synchronized (this.lockAppSession) { result = (Hashtable)this.appSessions.get (appName); if (result == null) { result = new Hashtable (); this.appSessions.put (appName, result); } } return result; } public void createSession (String appName, Object keyId) { this._createSession (appName, keyId); } public PackagedSessionImpl _createSession (String appName, Object keyId) { Hashtable app = this.getHashtableForApp (appName); PackagedSessionImpl result = new PackagedSessionImpl ((Serializable)keyId, null, this.myNodeName); app.put (keyId, result); return result; } public void setState (String appName, Object keyId, byte[] state) throws java.rmi.RemoteException { Hashtable app = this.getHashtableForApp (appName); PackagedSession ps = (PackagedSession)app.get (keyId); if (ps == null) { ps = _createSession (appName, keyId); } boolean isStateIdentical = false; Mutex mtx = getLock (appName, keyId); try { if (!mtx.attempt (0)) throw new java.rmi.RemoteException ("Concurent calls on session object."); } catch (InterruptedException ie) { log.info (ie); return; } try { isStateIdentical = ps.setState(state); if (!isStateIdentical) { Object[] args = {appName, ps}; try { this.hapGeneral.callMethodOnCluster (this.sessionStateIdentifier, "_setState", args, new Class[]{String.class, PackagedSession.class}, true); } catch (Exception e) { log.error("operation failed", e); } } } finally { mtx.release (); } } /* public void _setStates (String appName, Hashtable packagedSessions) { synchronized (this.lockAppSession) { Hashtable app = this.getHashtableForApp (appName); if (app == null) { app = new Hashtable (packagedSessions.size ()); this.appSessions.put (appName, app); } app.putAll (packagedSessions); } }*/ public void _setState (String appName, PackagedSession session) { Hashtable app = this.getHashtableForApp (appName); PackagedSession ps = (PackagedSession)app.get (session.getKey ()); if (ps == null) { ps = session; synchronized (app) { app.put (ps.getKey (), ps); } } else { Mutex mtx = getLock (appName, session.getKey ()); try { mtx.acquire (); } catch (InterruptedException ie) { log.info (ie); return; } try { if (ps.getOwner ().equals (this.myNodeName)) { // a modification has occured externally while we were the owner // ownedObjectExternallyModified (appName, session.getKey (), ps, session); } ps.update (session); } finally { mtx.release (); } } } public PackagedSession getState (String appName, Object keyId) { Hashtable app = this.getHashtableForApp (appName); return (PackagedSession)app.get (keyId); } public PackagedSession getStateWithOwnership (String appName, Object keyId) throws java.rmi.RemoteException { return this.localTakeOwnership (appName, keyId); } public PackagedSession localTakeOwnership (String appName, Object keyId) throws java.rmi.RemoteException { Hashtable app = this.getHashtableForApp (appName); PackagedSession ps = (PackagedSession)app.get (keyId); // if the session is not yet available, we simply return null. The persistence manager // will have to take an action accordingly // if (ps == null) return null; Mutex mtx = getLock (appName, keyId); try { if (!mtx.attempt (0)) throw new java.rmi.RemoteException ("Concurent calls on session object."); } catch (InterruptedException ie) { log.info (ie); return null; } try { if (!ps.getOwner ().equals (this.myNodeName)) { Object[] args = {appName, keyId, this.myNodeName, new Long (ps.getVersion ())}; ArrayList answers = null; try { answers = this.hapGeneral.callMethodOnCluster (this.sessionStateIdentifier, "_setOwnership", args, new Class[]{String.class, Object.class, String.class, Long.class}, true); } catch (Exception e) { log.error("operation failed", e); } if (answers != null && answers.contains (Boolean.FALSE)) throw new java.rmi.RemoteException ("Concurent calls on session object."); else { ps.setOwner (this.myNodeName); return ps; } } else return ps; } finally { mtx.release (); } } public Boolean _setOwnership (String appName, Object keyId, String newOwner, Long remoteVersion) { Hashtable app = this.getHashtableForApp (appName); PackagedSession ps = (PackagedSession)app.get (keyId); Boolean answer = Boolean.TRUE; Mutex mtx = getLock (appName, keyId); try { if (!mtx.attempt (0)) return Boolean.FALSE; } catch (InterruptedException ie) { log.info (ie); return Boolean.FALSE; } try { if (!ps.getOwner ().equals (this.myNodeName)) { // this is not our business... we don't care // we do not update the owner of ps as another host may refuse the _setOwnership call // anyway, the update will be sent to us later if state is modified // //ps.setOwner (newOwner); answer = Boolean.TRUE; } else if (ps.getVersion () > remoteVersion.longValue ()) { // we are concerned and our version is more recent than the one of the remote host! // it means that we have concurrent calls on the same state that has not yet been updated // this means we will need to raise a java.rmi.RemoteException // answer = Boolean.FALSE; } else { // the remote host has the same version as us (or more recent? possible?) // we need to update the ownership. We can do this because we know that no other // node can refuse the _setOwnership call ps.setOwner (newOwner); ownedObjectExternallyModified (appName, keyId, ps, ps); answer = Boolean.TRUE; } } finally { mtx.release (); } return answer; } public void takeOwnership (String appName, Object keyId) throws java.rmi.RemoteException { this.localTakeOwnership (appName, keyId); } public void removeSession (String appName, Object keyId) { Hashtable app = this.getHashtableForApp (appName); if (app != null) { PackagedSession ps = (PackagedSession)app.remove (keyId); if (ps != null) { removeLock (appName, keyId); Object[] args = { appName, keyId }; try { this.hapGeneral.callMethodOnCluster (this.sessionStateIdentifier, "_removeSession", args, new Class[]{String.class, Object.class}, true); } catch (Exception e) { log.error("operation failed", e); } } } } public void _removeSession (String appName, Object keyId) { Hashtable app = this.getHashtableForApp (appName); PackagedSession ps = null; ps = (PackagedSession)app.remove (keyId); if (ps != null && ps.getOwner ().equals (this.myNodeName)) ownedObjectExternallyModified (appName, keyId, ps, ps); removeLock (appName, keyId); } protected Hashtable listeners = new Hashtable (); public synchronized void subscribe (String appName, HASessionStateListener listener) { Vector members = (Vector)listeners.get (appName); if (members == null) { members = new Vector (); listeners.put (appName, members); } if (!members.contains (listener)) { members.add (listener); } } public synchronized void unsubscribe (String appName, HASessionStateListener listener) { Vector members = (Vector)listeners.get (appName); if ((members != null) && members.contains (listener)) members.remove (listener); } public void ownedObjectExternallyModified (String appName, Object key, PackagedSession oldSession, PackagedSession newSession) { Vector members = (Vector)listeners.get (appName); if (members != null) for (int i=0; i