/*
* JBoss, Home of Professional Open Source.
* Copyright 2006, Red Hat Middleware LLC, and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.mq.pm;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.jms.JMSException;
import javax.transaction.xa.Xid;
import org.jboss.logging.Logger;
import org.jboss.mq.ConnectionToken;
import org.jboss.mq.Recoverable;
import org.jboss.mq.SpyJMSException;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
/**
* This class allows provides the base for user supplied persistence packages.
*
* @author Hiram Chirino (Cojonudo14@hotmail.com)
* @author Paul Kendall (paul.kendall@orion.co.nz)
* @author Adrian Brock
* @version $Revision: 57198 $
*/
public class TxManager implements Recoverable
{
/** The log */
private static final Logger log = Logger.getLogger(TxManager.class);
/** The persistence manager */
PersistenceManager persistenceManager;
/** Maps Global transactions to local transactions */
ConcurrentHashMap globalToLocal = new ConcurrentHashMap();
/** Prepared Transactions Map> */
ConcurrentHashMap prepared = new ConcurrentHashMap();
/**
* Create a new TxManager
*
* @param pm the persistence manager
*/
public TxManager(PersistenceManager pm)
{
persistenceManager = pm;
}
/**
* Return the local transaction id for a distributed transaction id.
*
* @deprecated
* @param dc the connection
* @param xid the transaction id
* @return The Prepared transaction
* @exception javax.jms.JMSException Description of Exception
*/
public final Tx getPrepared(ConnectionToken dc, Object xid) throws JMSException
{
GlobalXID gxid = new GlobalXID(dc, xid);
Tx txid = (Tx) globalToLocal.get(gxid);
if (txid == null)
throw new SpyJMSException("Transaction does not exist from: " + dc.getClientID() + " xid=" + xid);
return txid;
}
/**
* Create and return a unique transaction id.
*
* @return the transaction id
* @exception JMSException for any error
*/
public final Tx createTx() throws JMSException
{
Tx txId = persistenceManager.createPersistentTx();
return txId;
}
/**
* Commit the transaction to the persistent store.
*
* @param txId the transaction
* @exception JMSException for any error
*/
public final void commitTx(Tx txId) throws JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("Commit branch=" + txId.longValue());
txId.commit(persistenceManager);
}
/**
* Commit the transaction to the persistent store.
*
* @param dc the connection token
* @param xid the transaction
* @exception JMSException for any error
*/
public final void commitTx(ConnectionToken dc, Object xid) throws JMSException
{
boolean trace = log.isTraceEnabled();
GlobalXID gxid = new GlobalXID(dc, xid);
Tx txid = (Tx) globalToLocal.get(gxid);
if (txid == null)
{
PreparedInfo preparedInfo = (PreparedInfo) prepared.get(xid);
if (preparedInfo == null)
throw new SpyJMSException("Transaction does not exist from: " + dc.getClientID() + " xid=" + xid);
Set txids = preparedInfo.getTxids();
for (Iterator i = txids.iterator(); i.hasNext();)
{
txid = (Tx) i.next();
if (trace)
log.trace("Commit xid=" + xid + " branch=" + txid.longValue());
txid.commit(persistenceManager);
}
prepared.remove(xid);
}
else
{
if (trace)
log.trace("Commit xid=" + xid + " branch=" + txid.longValue());
txid.commit(persistenceManager);
}
}
/**
* Add an operation for after a commit
*
* @param txId the transaction
* @param task the task
* @throws JMSException for any error
*/
public void addPostCommitTask(Tx txId, Runnable task) throws JMSException
{
if (txId == null)
{
task.run();
return;
}
txId.addPostCommitTask(task);
}
/**
* Rollback the transaction.
*
* @param txId the transaction
* @exception JMSException for any error
*/
public void rollbackTx(Tx txId) throws JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("Rollback branch=" + txId.longValue());
txId.rollback(persistenceManager);
}
/**
* Rollback the transaction
*
* @param dc the connection token
* @param xid the transaction
* @exception JMSException for any error
*/
public final void rollbackTx(ConnectionToken dc, Object xid) throws JMSException
{
boolean trace = log.isTraceEnabled();
GlobalXID gxid = new GlobalXID(dc, xid);
Tx txid = (Tx) globalToLocal.get(gxid);
if (txid == null)
{
PreparedInfo preparedInfo = (PreparedInfo) prepared.get(xid);
if (preparedInfo == null)
throw new SpyJMSException("Transaction does not exist from: " + dc.getClientID() + " xid=" + xid);
Set txids = preparedInfo.getTxids();
for (Iterator i = txids.iterator(); i.hasNext();)
{
txid = (Tx) i.next();
if (trace)
log.trace("Rolling back xid=" + xid + " branch=" + txid.longValue());
txid.rollback(persistenceManager);
}
prepared.remove(xid);
}
else
{
if (trace)
log.trace("Rolling back xid=" + xid + " branch=" + txid.longValue());
txid.rollback(persistenceManager);
}
}
/**
* Add an operation for after a rollback
*
* @param txId the transaction
* @param task the task
* @throws JMSException for any error
*/
public void addPostRollbackTask(Tx txId, Runnable task) throws JMSException
{
if (txId == null)
return;
txId.addPostRollbackTask(task);
}
/**
* Create and return a unique transaction id. Given a distributed connection
* and a transaction id object, allocate a unique local transaction id if
* the remote id is not already known.
*
* @param dc the connection token
* @param xid the xid
* @return the transaction
* @exception JMSException for any error
*/
public Tx createTx(ConnectionToken dc, Object xid) throws JMSException
{
GlobalXID gxid = new GlobalXID(dc, xid);
if (globalToLocal.containsKey(gxid))
throw new SpyJMSException("Duplicate transaction from: " + dc.getClientID() + " xid=" + xid);
Tx txId = createTx();
if (xid != null && xid instanceof Xid)
txId.setXid((Xid) xid);
globalToLocal.put(gxid, txId);
//Tasks to remove the global to local mappings on commit/rollback
txId.addPostCommitTask(gxid);
txId.addPostRollbackTask(gxid);
return txId;
}
/**
* Restore a prepared transaction
*
* @param txId the transaction id
* @throws JMSException for any error
*/
public void restoreTx(Tx txId) throws JMSException
{
addPreparedTx(txId, txId.getXid(), true);
}
/**
* Add a prepared transactions
*
* @param txId the transaction id
* @param xid the xid
* @param inDoubt whether it is in doubt
* @throws JMSException for any error
*/
void addPreparedTx(Tx txId, Xid xid, boolean inDoubt) throws JMSException
{
PreparedInfo preparedInfo = (PreparedInfo) prepared.get(xid);
if (preparedInfo == null)
{
preparedInfo = new PreparedInfo(xid, false);
prepared.put(xid, preparedInfo);
}
preparedInfo.add(txId);
if (inDoubt)
preparedInfo.setInDoubt(true);
}
/**
* Mark the transaction branch as prepared
*
* @param dc the connection token
* @param xid the xid
* @param txId the transaction
* @throws JMSException for any error
*/
public void markPrepared(ConnectionToken dc, Object xid, Tx txId) throws JMSException
{
try
{
if (xid instanceof Xid)
addPreparedTx(txId, (Xid) xid, false);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Error marking transaction as prepared xid=" + xid + " tx=" + txId, t);
}
}
public Xid[] recover(ConnectionToken dc, int flags) throws Exception
{
Set preparedXids = prepared.keySet();
Xid[] xids = (Xid[]) preparedXids.toArray(new Xid[preparedXids.size()]);
return xids;
}
/**
* Get the prepared transactions
*
* @return
*/
public Map getPreparedTransactions()
{
return prepared;
}
/**
* A global transaction
*/
class GlobalXID implements Runnable
{
ConnectionToken dc;
Object xid;
GlobalXID(ConnectionToken dc, Object xid)
{
this.dc = dc;
this.xid = xid;
}
public boolean equals(Object obj)
{
if (obj == null)
{
return false;
}
if (obj.getClass() != GlobalXID.class)
{
return false;
}
return ((GlobalXID) obj).xid.equals(xid) && ((GlobalXID) obj).dc.equals(dc);
}
public int hashCode()
{
return xid.hashCode();
}
public void run()
{
Tx txId = (Tx) globalToLocal.remove(this);
// Tidyup the prepared transactions
if (txId != null)
{
PreparedInfo preparedInfo = (PreparedInfo) prepared.get(xid);
if (preparedInfo != null)
{
preparedInfo.remove(txId);
if (preparedInfo.isEmpty())
prepared.remove(xid);
}
}
}
}
/**
* Information about a prepared global transaction
*
* @author Adrian Brock
* @version $Revision: 57198 $
*/
public static class PreparedInfo
{
/** The XID */
private Xid xid;
/** Whether the transaction is in doubt */
private boolean inDoubt;
/** The local transaction branches */
private Set txids = new CopyOnWriteArraySet();
/**
* Create a new PreparedInfo.
*
* @param xid the xid
* @param indoubt whether the transaction is in doubt
*/
public PreparedInfo(Xid xid, boolean inDoubt)
{
this.xid = xid;
this.inDoubt = inDoubt;
}
/**
* Whether the transaction is in doubt
*
* @return true when in doubt
*/
public boolean isInDoubt()
{
return inDoubt;
}
/**
* Set the in doubt
*
* @param inDoubt the in doubt value
*/
public void setInDoubt(boolean inDoubt)
{
this.inDoubt = inDoubt;
}
/**
* Get the XID
*
* @return
*/
public Xid getXid()
{
return xid;
}
/**
* Get the local branches
*
* @return the local branches
*/
public Set getTxids()
{
return txids;
}
/**
* Add a local branch
*
* @param txid the local branch
*/
public void add(Tx txid)
{
txids.add(txid);
}
/**
* Remove a local branch
*
* @param txid the local branch
*/
public void remove(Tx txid)
{
txids.remove(txid);
}
/**
* Whether there are no local branches
*
* @return true when there no local branches
*/
public boolean isEmpty()
{
return txids.isEmpty();
}
}
}