/*
* JBoss, Home of Professional Open Source.
* Copyright 2006, Red Hat Middleware LLC, and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.mq.server;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.HashMap;
import javax.jms.JMSException;
import javax.management.MBeanRegistration;
import javax.management.ObjectName;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.pm.CacheStore;
import org.jboss.system.ServiceMBeanSupport;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
/**
* This class implements a Message cache so that larger amounts of messages
* can be processed without running out of memory. When memory starts getting tight
* it starts moving messages out of memory and into a file so that they can be recovered
* later.
*
* The locks should be obtained in the following order:
* mr, the relevent message we are working with
* lruCache, when maintaining the usage order
*
* @author Hiram Chirino
* @author David Maplesden
* @author Peter Antman
* @author Adrian Brock
* @version $Revision: 57198 $
*
* @jmx.mbean name="jboss.mq:service=MessageCache"
* extends="org.jboss.system.ServiceMBean"
*/
public class MessageCache extends ServiceMBeanSupport implements MessageCacheMBean, MBeanRegistration, Runnable
{
public static final long ONE_MEGABYTE = 1024L * 1000;
public static final long DEFAULT_HIGH_MEMORY_MARK = ONE_MEGABYTE * 50;
public static final long DEFAULT_MAX_MEMORY_MARK = ONE_MEGABYTE * 60;
// The cached messages are orded in a LRU linked list
private LRUCache lruCache = new LRUCache();
// Provides a Unique ID to MessageHanles
private SynchronizedLong messageCounter = new SynchronizedLong(0);
long cacheHits = 0;
long cacheMisses = 0;
protected CacheStore cacheStore;
ObjectName cacheStoreName;
private Thread referenceSoftner;
private long highMemoryMark = DEFAULT_HIGH_MEMORY_MARK;
private long maxMemoryMark = DEFAULT_MAX_MEMORY_MARK;
/** Whether to make soft references */
private boolean makeSoftReferences = true;
/** The last time we softened */
private long lastSoften = 0L;
/** soften no more than often */
private long softenNoMoreOftenThanMillis = 0L;
/** soften at least every */
private long softenAtLeastEveryMillis = 0L;
/** The length of time to wait before checking whether we should soften messages */
private long softenWaitMillis = 1000L;
/** The minimum number of hard messages */
private int minimumHard = 1;
/** The maximum number of hard messages */
private int maximumHard = 0;
int softRefCacheSize = 0;
int totalCacheSize = 0;
// Used to get notified when message are being deleted by GC
ReferenceQueue referenceQueue = new ReferenceQueue();
// The historical number of softenings
long softenedSize = 0;
// Check the soft reference depth
boolean checkSoftReferenceDepth = false;
/**
* The getInstance
method
*
* @return a MessageCache
value
*
* @jmx.managed-attribute
*/
public MessageCache getInstance()
{
return this;
}
/**
* Adds a message to the cache.
*/
public MessageReference add(SpyMessage message, BasicQueue queue, int stored) throws javax.jms.JMSException
{
DurableSubscriptionID id = message.header.durableSubscriberID;
return addInternal(message, queue, stored, id);
}
/**
* Adds a message to the cache.
*/
public MessageReference add(SpyMessage message, BasicQueue queue, int stored, DurableSubscriptionID id) throws javax.jms.JMSException
{
return addInternal(message, queue, stored, id);
}
/**
* Adds a message to the cache.
*/
public MessageReference addInternal(SpyMessage message, BasicQueue queue, int stored, DurableSubscriptionID id) throws javax.jms.JMSException
{
// Create the message reference
MessageReference mh = new MessageReference();
mh.init(this, messageCounter.increment(), message, queue, id);
mh.setStored(stored);
// Add it to the cache
synchronized (mh)
{
synchronized (lruCache)
{
lruCache.addMostRecent(mh);
totalCacheSize++;
}
}
validateSoftReferenceDepth();
return mh;
}
/**
* removes a message from the cache
*/
public void remove(MessageReference mr) throws JMSException
{
// Remove if not done already
removeInternal(mr, true, true);
}
/**
* removes a message from the cache without returning it to the pool
* used in two phase removes for joint cache/persistence
*/
public void removeDelayed(MessageReference mr) throws JMSException
{
// Remove from the cache
removeInternal(mr, true, false);
}
/**
* removes a message from the cache but does not clear it,
* used in softening
*/
void soften(MessageReference mr) throws JMSException
{
// Remove from the cache
removeInternal(mr, false, false);
if (makeSoftReferences)
softRefCacheSize++;
}
/**
* removes a message from the cache
*/
protected void removeInternal(MessageReference mr, boolean clear, boolean reset) throws JMSException
{
synchronized (mr)
{
if (mr.stored != MessageReference.REMOVED)
{
synchronized (lruCache)
{
if (mr.hardReference != null) //If message is not hard, dont do lru stuff
lruCache.remove(mr);
if (clear)
totalCacheSize--;
}
if (clear)
mr.clear();
//Will remove it from storage if stored
}
if (reset)
mr.reset();
//Return to the pool
}
}
/**
* The strategy is that we keep the most recently used messages as
* Hard references. Then we make the older ones soft references. Making
* something a soft reference stores it to disk so we need to avoid making
* soft references if we can avoid it. But once it is made a soft reference does
* not mean that it is removed from memory. Depending on how agressive the JVM's
* GC is, it may stay around long enough for it to be used by a client doing a read,
* saving us read from the file system. If memory gets tight the GC will remove
* the soft references. What we want to do is make sure there are at least some
* soft references available so that the GC can reclaim memory.
* @see Runnable#run()
*/
public void run()
{
try
{
while (true)
{
// Get the next soft reference that was canned by the GC
Reference r = null;
if (checkSoftReferenceDepth)
r = referenceQueue.poll();
else
r = referenceQueue.remove(softenWaitMillis);
if (r != null)
{
softRefCacheSize--;
// the GC will free a set of messages together, so we poll them
// all before we validate the soft reference depth.
while ((r = referenceQueue.poll()) != null)
{
softRefCacheSize--;
}
if (log.isTraceEnabled())
log.trace("soft reference cache size is now: " + softRefCacheSize);
checkSoftReferenceDepth = true;
}
long now = System.currentTimeMillis();
// Don't try to soften too often
if (softenNoMoreOftenThanMillis > 0 && (now - lastSoften < softenNoMoreOftenThanMillis))
checkSoftReferenceDepth = false;
// Is it a while since we last softened?
else if (softenAtLeastEveryMillis > 0 && (now - lastSoften > softenAtLeastEveryMillis))
checkSoftReferenceDepth = true;
// Should we check for softening
if (checkSoftReferenceDepth)
{
checkSoftReferenceDepth = validateSoftReferenceDepth();
// Did the softening complete?
if (checkSoftReferenceDepth == false)
lastSoften = now;
}
}
}
catch (InterruptedException e)
{
// Signal to exit the thread.
}
catch (Throwable t)
{
log.error("Message Cache Thread Stopped: ", t);
}
log.debug("Thread exiting.");
}
/**
* This method is in charge of determining if it time to convert some
* hard references over to soft references.
*/
boolean validateSoftReferenceDepth() throws JMSException
{
boolean trace = log.isTraceEnabled();
// Loop until softening is not required or we find a message we can soften
while (getState() == ServiceMBeanSupport.STARTED)
{
MessageReference messageToSoften = null;
synchronized (lruCache)
{
// howmany to change over to soft refs
int softenCount = 0;
int hardCount = getHardRefCacheSize();
int softCount = getSoftRefCacheSize();
// Only soften down to a minimum
if (hardCount <= minimumHard)
return false;
long currentMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
if (currentMem > highMemoryMark)
{
// we need to get more aggresive... how much?? lets get
// a mesurment from 0 to 1
float severity = ((float) (currentMem - highMemoryMark)) / (maxMemoryMark - highMemoryMark);
severity = Math.min(severity, 1.0F);
if (trace)
log.trace("Memory usage serverity=" + severity);
int totalMessageInMem = hardCount + softCount;
int howManyShouldBeSoft = (int) ((totalMessageInMem) * severity);
softenCount = howManyShouldBeSoft - softCount;
}
// Are there too many messages in memory?
if (maximumHard > 0)
{
int removeCount = hardCount - maximumHard;
if (removeCount > 0 && removeCount > softenCount)
softenCount = removeCount;
}
// We can only do so much, somebody else is using all the memory?
if (softenCount > hardCount)
{
if (trace)
log.trace("Soften count " + softenCount + " greater than hard references " + hardCount);
softenCount = hardCount;
}
// Ignore soften counts of 1 since this will happen too often even
// if the serverity is low since it will round up.
if (softenCount > 1 || (maximumHard > 0 && hardCount > maximumHard))
{
if (trace)
log.trace("Need to soften " + softenCount + " messages");
Node node = lruCache.getLeastRecent();
messageToSoften = (MessageReference) node.data;
}
}
// No softening required
if (messageToSoften == null)
return false;
synchronized (messageToSoften)
{
// Soften unless it was removed
if (messageToSoften.messageCache != null && messageToSoften.stored != MessageReference.REMOVED)
{
messageToSoften.makeSoft();
if (messageToSoften.stored == MessageReference.STORED)
{
softenedSize++;
return true;
}
else if (messageToSoften.isPersistent())
{
// Avoid going into a cpu loop if there are persistent
// messages just about to be persisted
return false;
}
}
else if (trace)
log.trace("not softening removed message " + messageToSoften);
}
}
return false;
}
/**
* This gets called when a MessageReference is de-referenced.
* We will pop it to the top of the RLU
*/
void messageReferenceUsedEvent(MessageReference mh, boolean wasHard) throws JMSException
{
synchronized (mh)
{
synchronized (lruCache)
{
if (wasHard)
lruCache.makeMostRecent(mh);
else
{
lruCache.addMostRecent(mh);
}
}
}
if (wasHard == false)
checkSoftReferenceDepth = true;
}
//////////////////////////////////////////////////////////////////////////////////
// Perisitence methods used by the MessageReference.
//////////////////////////////////////////////////////////////////////////////////
SpyMessage loadFromStorage(MessageReference mh) throws JMSException
{
return cacheStore.loadFromStorage(mh);
}
void saveToStorage(MessageReference mh, SpyMessage message) throws JMSException
{
cacheStore.saveToStorage(mh, message);
}
void removeFromStorage(MessageReference mh) throws JMSException
{
cacheStore.removeFromStorage(mh);
}
//////////////////////////////////////////////////////////////////////////////////
//
// The following section deals the the JMX interface to manage the Cache
//
//////////////////////////////////////////////////////////////////////////////////
/**
* This gets called to start the cache service. Synch. by start
*/
protected void startService() throws Exception
{
setupCacheStore();
referenceSoftner = new Thread(this, "JBossMQ Cache Reference Softner");
referenceSoftner.setDaemon(true);
referenceSoftner.start();
}
protected void setupCacheStore() throws Exception
{
cacheStore = (CacheStore) getServer().getAttribute(cacheStoreName, "Instance");
}
/**
* This gets called to stop the cache service.
*/
protected void stopService()
{
synchronized (lruCache)
{
referenceSoftner.interrupt();
referenceSoftner = null;
}
cacheStore = null;
}
/**
* Gets the hardRefCacheSize
* @return Returns a int
*
* @jmx.managed-attribute
*/
public int getHardRefCacheSize()
{
synchronized (lruCache)
{
return lruCache.size();
}
}
/**
* The getSoftenedSize
method
*
* @return a long
value
*
* @jmx.managed-attribute
*/
public long getSoftenedSize()
{
return softenedSize;
}
/**
* Gets the softRefCacheSize
* @return Returns a int
*
* @jmx.managed-attribute
*/
public int getSoftRefCacheSize()
{
return softRefCacheSize;
}
/**
* Gets the totalCacheSize
* @return Returns a int
*
* @jmx.managed-attribute
*/
public int getTotalCacheSize()
{
return totalCacheSize;
}
/**
* Gets the cacheMisses
* @return Returns a int
*
* @jmx.managed-attribute
*/
public long getCacheMisses()
{
return cacheMisses;
}
/**
* Gets the cacheHits
* @return Returns a long
*
* @jmx.managed-attribute
*/
public long getCacheHits()
{
return cacheHits;
}
/**
* Gets whether to make soft references
*
* @jmx.managed-attribute
* @return true when making soft references
*/
public boolean getMakeSoftReferences()
{
return makeSoftReferences;
}
/**
* Sets whether to make soft references
*
* @jmx.managed-attribute
* @param true to make soft references
*/
public void setMakeSoftReferences(boolean makeSoftReferences)
{
this.makeSoftReferences = makeSoftReferences;
}
/**
* Gets the minimum number of hard messages
*
* @jmx.managed-attribute
* @return the minimum number of hard messages
*/
public int getMinimumHard()
{
return minimumHard;
}
/**
* Sets the minimum number of hard messages
*
* @jmx.managed-attribute
* @param minimumHard the minimum number of hard messages
*/
public void setMinimumHard(int minimumHard)
{
if (minimumHard < 1)
this.minimumHard = 1;
else
this.minimumHard = minimumHard;
}
/**
* Gets the maximum number of hard messages
*
* @jmx.managed-attribute
* @return the minimum number of hard messages
*/
public int getMaximumHard()
{
return maximumHard;
}
/**
* Sets the maximum number of hard messages
*
* @jmx.managed-attribute
* @param maximumHard the maximum number of hard messages
*/
public void setMaximumHard(int maximumHard)
{
if (maximumHard < 0)
this.maximumHard = 0;
else
this.maximumHard = maximumHard;
}
/**
* Gets the length of time to wait before checking whether we should soften
*
* @jmx.managed-attribute
* @return the time to wait
*/
public long getSoftenWaitMillis()
{
return softenWaitMillis;
}
/**
* Sets the length of time to wait before checking whether we should soften
*
* @jmx.managed-attribute
* @param millis the time to wait in millis
*/
public void setSoftenWaitMillis(long millis)
{
if (millis < 1000)
softenWaitMillis = 1000;
else
softenWaitMillis = millis;
}
/**
* Gets the minimum length between softening checks
*
* @jmx.managed-attribute
* @return the time to wait
*/
public long getSoftenNoMoreOftenThanMillis()
{
return softenNoMoreOftenThanMillis;
}
/**
* Sets the minimum length between softening checks
*
* @jmx.managed-attribute
* @param wait the time between checks
*/
public void setSoftenNoMoreOftenThanMillis(long millis)
{
if (millis < 0)
softenNoMoreOftenThanMillis = 0;
else
softenNoMoreOftenThanMillis = millis;
}
/**
* Gets the maximum length between softening checks
*
* @jmx.managed-attribute
* @return the time
*/
public long getSoftenAtLeastEveryMillis()
{
return softenAtLeastEveryMillis;
}
/**
* Sets the minimum length between softening checks
*
* @jmx.managed-attribute
* @param wait the time between checks
*/
public void setSoftenAtLeastEveryMillis(long millis)
{
if (millis < 0)
softenAtLeastEveryMillis = 0;
else
softenAtLeastEveryMillis = millis;
}
/**
* Gets the highMemoryMark
* @return Returns a long
*
* @jmx.managed-attribute
*/
public long getHighMemoryMark()
{
return highMemoryMark / ONE_MEGABYTE;
}
/**
* Sets the highMemoryMark
* @param highMemoryMark The highMemoryMark to set
*
* @jmx.managed-attribute
*/
public void setHighMemoryMark(long highMemoryMark)
{
if (highMemoryMark > 0)
this.highMemoryMark = highMemoryMark * ONE_MEGABYTE;
else
this.highMemoryMark = 0;
}
/**
* Gets the maxMemoryMark
* @return Returns a long
*
* @jmx.managed-attribute
*/
public long getMaxMemoryMark()
{
return maxMemoryMark / ONE_MEGABYTE;
}
/**
* Sets the maxMemoryMark
* @param maxMemoryMark The maxMemoryMark to set
*
* @jmx.managed-attribute
*/
public void setMaxMemoryMark(long maxMemoryMark)
{
if (maxMemoryMark > 0)
this.maxMemoryMark = maxMemoryMark * ONE_MEGABYTE;
else
this.maxMemoryMark = 0;
}
/**
* Gets the CurrentMemoryUsage
* @return Returns a long
*
* @jmx.managed-attribute
*/
public long getCurrentMemoryUsage()
{
return (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / ONE_MEGABYTE;
}
/**
* @see ServiceMBeanSupport#getName()
*/
public String getName()
{
return "MessageCache";
}
/**
* @see MessageCacheMBean#setCacheStore(ObjectName)
*
* @jmx.managed-attribute
*/
public void setCacheStore(ObjectName cacheStoreName)
{
this.cacheStoreName = cacheStoreName;
}
/**
* The getCacheStore
method
*
* @return an ObjectName
value
*
* @jmx.managed-attribute
*/
public ObjectName getCacheStore()
{
return cacheStoreName;
}
/**
* This class implements a simple, efficient LRUCache. It is pretty much a
* cut down version of the code in org.jboss.pool.cache.LeastRecentlyUsedCache
*/
class LRUCache
{
int currentSize = 0;
//maps objects to their nodes
HashMap map = new HashMap();
Node mostRecent = null;
Node leastRecent = null;
public void addMostRecent(Object o)
{
Node newNode = new Node();
newNode.data = o;
//insert into map
Object oldNode = map.put(o, newNode);
if (oldNode != null)
{
map.put(o, oldNode);
throw new RuntimeException("Can't add object '" + o + "' to LRUCache that is already in cache.");
}
//insert into linked list
if (mostRecent == null)
{
//first element
mostRecent = newNode;
leastRecent = newNode;
}
else
{
newNode.lessRecent = mostRecent;
mostRecent.moreRecent = newNode;
mostRecent = newNode;
}
++currentSize;
}
// Not used anywhere!!
public void addLeastRecent(Object o)
{
Node newNode = new Node();
newNode.data = o;
//insert into map
Object oldNode = map.put(o, newNode);
if (oldNode != null)
{
map.put(o, oldNode);
throw new RuntimeException("Can't add object '" + o + "' to LRUCache that is already in cache.");
}
//insert into linked list
if (leastRecent == null)
{
//first element
mostRecent = newNode;
leastRecent = newNode;
}
else
{
newNode.moreRecent = leastRecent;
leastRecent.lessRecent = newNode;
leastRecent = newNode;
}
++currentSize;
}
public void remove(Object o)
{
//remove from map
Node node = (Node) map.remove(o);
if (node == null)
throw new RuntimeException("Can't remove object '" + o + "' that is not in cache.");
//remove from linked list
Node more = node.moreRecent;
Node less = node.lessRecent;
if (more == null)
{ //means node is mostRecent
mostRecent = less;
if (mostRecent != null)
{
mostRecent.moreRecent = null; //Mark it as beeing at the top of tree
}
}
else
{
more.lessRecent = less;
}
if (less == null)
{ //means node is leastRecent
leastRecent = more;
if (leastRecent != null)
{
leastRecent.lessRecent = null; //Mark it last in tree
}
}
else
{
less.moreRecent = more;
}
--currentSize;
}
public void makeMostRecent(Object o)
{
//get node from map
Node node = (Node) map.get(o);
if (node == null)
throw new RuntimeException("Can't make most recent object '" + o + "' that is not in cache.");
//reposition in linked list, first remove
Node more = node.moreRecent;
Node less = node.lessRecent;
if (more == null) //means node is mostRecent
return;
else
more.lessRecent = less;
if (less == null) //means node is leastRecent
leastRecent = more;
else
less.moreRecent = more;
//now add back in at most recent position
node.lessRecent = mostRecent;
node.moreRecent = null; //We are at the top
mostRecent.moreRecent = node;
mostRecent = node;
}
public int size()
{
return currentSize;
}
public Node getMostRecent()
{
return mostRecent;
}
public Node getLeastRecent()
{
return leastRecent;
}
}
static class Node
{
Node moreRecent = null;
Node lessRecent = null;
Object data = null;
}
}