/* * JBoss, Home of Professional Open Source. * Copyright 2006, Red Hat Middleware LLC, and individual contributors * as indicated by the @author tags. See the copyright.txt file in the * distribution for a full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package org.jboss.ha.framework.server; import org.jboss.ha.framework.interfaces.HAPartition; import org.jboss.ha.framework.interfaces.ClusterNode; import org.jboss.system.server.ServerConfigLocator; import org.jboss.logging.Logger; import org.jboss.ha.framework.interfaces.HAPartition.AsynchHAMembershipListener; import java.util.*; import java.io.*; /** * Handles transfering files on the cluster. Files are sent in small chunks at a time (up to MAX_CHUNK_BUFFER_SIZE bytes per * Cluster call). * * @author Scott Marlow. * @version $Revision: 62037 $ */ public class ClusterFileTransfer implements AsynchHAMembershipListener { // Specify max file transfer buffer size that we read and write at a time. // This influences the number of times that we will invoke disk read/write file // operations versus how much memory we will consume for a file transfer. private static final int MAX_CHUNK_BUFFER_SIZE = 512 * 1024; // collection of in-progress file push operations private Map mPushsInProcess = Collections.synchronizedMap(new HashMap()); // collection of in-progress file pull operations private Map mPullsInProcess = Collections.synchronizedMap(new HashMap()); private HAPartition mPartition; private static final File TEMP_DIRECTORY = ServerConfigLocator.locate().getServerTempDir(); // Mapping between parent folder name and target destination folder // the search key is the parent folder name and value is the java.io.File. // We don't synchronize on the mParentFolders as we assume its safe to read it. private Map mParentFolders = null; private static final String SERVICE_NAME = ClusterFileTransfer.class.getName() + "Service"; private static final Logger log = Logger.getLogger(ClusterFileTransfer.class.getName()); /** * Constructor needs the cluster partition and the mapping of server folder names to the java.io.File instance * representing the physical folder. * * @param partition represents the cluster. * @param destinationDirectoryMap is the mapping between server folder name and physical folder representation. */ public ClusterFileTransfer(HAPartition partition, Map destinationDirectoryMap) { this.mPartition = partition; this.mPartition.registerRPCHandler(SERVICE_NAME, this); this.mPartition.registerMembershipListener(this); mParentFolders = destinationDirectoryMap; } /** * Get specified file from the cluster. * * @param file identifies the file to get from the cluster. * @param parentName is the parent folder name for the file on both source and destination nodes. * @throws ClusterFileTransferException */ public void pull(File file, String parentName) throws ClusterFileTransferException { String myNodeName = this.mPartition.getNodeName(); ClusterNode myNodeAddress = this.mPartition.getClusterNode(); FileOutputStream output = null; try { log.info("Start pull of file " + file.getName() + " from cluster."); ArrayList response = mPartition.callMethodOnCoordinatorNode(SERVICE_NAME, "remotePullOpenFile", new Object[]{file, myNodeName, myNodeAddress, parentName}, new Class[]{java.io.File.class, java.lang.String.class,ClusterNode.class, java.lang.String.class}, true); if (response == null || response.size() < 1) { throw new ClusterFileTransferException("Did not receive response from remote machine trying to open file '" + file + "'. Check remote machine error log."); } FileContentChunk fileChunk = (FileContentChunk) response.get(0); if(null == fileChunk) { throw new ClusterFileTransferException("An error occured on remote machine trying to open file '" + file + "'. Check remote machine error log."); } File tempFile = new File(ClusterFileTransfer.getServerTempDir(), file.getName()); output = new FileOutputStream(tempFile); // get the remote file modification time and change our local copy to have the same time. long lastModification = fileChunk.lastModified(); while (fileChunk.mByteCount > 0) { output.write(fileChunk.mChunk, 0, fileChunk.mByteCount); response = mPartition.callMethodOnCoordinatorNode(SERVICE_NAME, "remotePullReadFile", new Object[]{file, myNodeName}, new Class[]{java.io.File.class, java.lang.String.class}, true); if (response.size() < 1) { if(! tempFile.delete()) throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Is remote still running? Also, we couldn't delete temp file "+ tempFile.getName()); throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Is remote still running?"); } fileChunk = (FileContentChunk) response.get(0); if (null == fileChunk) { if( !tempFile.delete()) throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Check remote machine error log. Also, we couldn't delete temp file "+ tempFile.getName()); throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Check remote machine error log."); } } output.close(); output = null; File target = new File(getParentFile(parentName), file.getName()); if (target.exists()) { if(!target.delete()) throw new ClusterFileTransferException("The destination file "+ target + " couldn't be deleted, the updated application will not be copied to this node"); } tempFile.setLastModified(lastModification); if (!localMove(tempFile,target)) { throw new ClusterFileTransferException("Could not move " + tempFile + " to " + target); } log.info("Finished cluster pull of file " + file.getName() + " to "+ target.getName()); } catch(IOException e) { throw new ClusterFileTransferException(e); } catch(ClusterFileTransferException e) { throw e; } catch(Exception e) { throw new ClusterFileTransferException(e); } finally { if( output != null) { try { output.close(); } catch(IOException e) {logException(e);} // we are already in the middle of a throw if output isn't null. } } } /** * This is remotely called by {@link #pull(File , String )} to open the file on the machine that * the file is being copied from. * * @param file is the file to pull. * @param originNodeName is the cluster node that is requesting the file. * @param parentName is the parent folder name for the file on both source and destination nodes. * @return FileContentChunk containing the first part of the file read after opening it. */ public FileContentChunk remotePullOpenFile(File file, String originNodeName, ClusterNode originNode, String parentName) { try { File target = new File(getParentFile(parentName), file.getName()); FileContentChunk fileChunk = new FileContentChunk(target, originNodeName,originNode); FilePullOperation filePullOperation = new FilePullOperation(fileChunk); // save the operation for the next call to remoteReadFile this.mPullsInProcess.put(CompositeKey(originNodeName, file.getName()), filePullOperation); filePullOperation.openInputFile(); fileChunk.readNext(filePullOperation.getInputStream()); return fileChunk; } catch (IOException e) { logException(e); } catch (Exception e) { logException(e); } return null; } /** * This is remotely called by {@link #pull(File, String )} to read the file on the machine that the file is being * copied from. * * @param file is the file to pull. * @param originNodeName is the cluster node that is requesting the file. * @return FileContentChunk containing the next part of the file read. */ public FileContentChunk remotePullReadFile(File file, String originNodeName) { try { FilePullOperation filePullOperation = (FilePullOperation) this.mPullsInProcess.get(CompositeKey(originNodeName, file.getName())); filePullOperation.getFileChunk().readNext(filePullOperation.getInputStream()); if (filePullOperation.getFileChunk().mByteCount < 1) { // last call to read, so clean up filePullOperation.getInputStream().close(); this.mPullsInProcess.remove(CompositeKey(originNodeName, file.getName())); } return filePullOperation.getFileChunk(); } catch (IOException e) { logException(e); } return null; } /** * Send specified file to cluster. * * @param file is the file to send. * @param leaveInTempFolder is true if the file should be left in the server temp folder. * @throws ClusterFileTransferException */ public void push(File file, String parentName, boolean leaveInTempFolder) throws ClusterFileTransferException { File target = new File(getParentFile(parentName), file.getName()); log.info("Start push of file " + file.getName() + " to cluster."); // check if trying to send explored archive (cannot send subdirectories) if (target.isDirectory()) { // let the user know why we are skipping this file and return. logMessage("You cannot send the contents of directories, consider archiving folder containing" + target.getName() + " instead."); return; } ClusterNode myNodeAddress = this.mPartition.getClusterNode(); FileContentChunk fileChunk = new FileContentChunk(target, this.mPartition.getNodeName(), myNodeAddress); try { InputStream input = fileChunk.openInputFile(); while (fileChunk.readNext(input) >= 0) { mPartition.callMethodOnCluster(SERVICE_NAME, "remotePushWriteFile", new Object[]{fileChunk, parentName}, new Class[]{fileChunk.getClass(), java.lang.String.class}, true); } // tell remote(s) to close the output file mPartition.callMethodOnCluster(SERVICE_NAME, "remotePushCloseFile", new Object[]{fileChunk, new Boolean(leaveInTempFolder), parentName}, new Class[]{fileChunk.getClass(), Boolean.class, java.lang.String.class}, true); input.close(); log.info("Finished push of file " + file.getName() + " to cluster."); } catch(FileNotFoundException e) { throw new ClusterFileTransferException(e); } catch(IOException e) { throw new ClusterFileTransferException(e); } catch(Exception e) { throw new ClusterFileTransferException(e); } } /** * Remote method for writing file a fragment at a time. * * @param fileChunk */ public void remotePushWriteFile(FileContentChunk fileChunk, String parentName) { try { String key = CompositeKey(fileChunk.getOriginatingNodeName(), fileChunk.getDestinationFile().getName()); FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess.get(key); // handle first call to write if (filePushOperation == null) { if (fileChunk.mChunkNumber != FileContentChunk.FIRST_CHUNK) { // we joined the cluster after the file transfer started logMessage("Ignoring file transfer of '" + fileChunk.getDestinationFile().getName() + "' from " + fileChunk.getOriginatingNodeName() + ", we missed the start of it."); return; } filePushOperation = new FilePushOperation(fileChunk.getOriginatingNodeName(), fileChunk.getOriginatingNode()); File tempFile = new File(ClusterFileTransfer.getServerTempDir(), fileChunk.getDestinationFile().getName()); filePushOperation.openOutputFile(tempFile); mPushsInProcess.put(key, filePushOperation); } filePushOperation.getOutputStream().write(fileChunk.mChunk, 0, fileChunk.mByteCount); } catch (FileNotFoundException e) { logException(e); } catch (IOException e) { logException(e); } } /** * Remote method for closing the file just transmitted. * * @param fileChunk * @param leaveInTempFolder is true if we should leave the file in the server temp folder */ public void remotePushCloseFile(FileContentChunk fileChunk, Boolean leaveInTempFolder, String parentName) { try { FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess.remove(CompositeKey(fileChunk.getOriginatingNodeName(), fileChunk.getDestinationFile().getName())); if ((filePushOperation != null) && (filePushOperation.getOutputStream() != null)) { filePushOperation.getOutputStream().close(); if (!leaveInTempFolder.booleanValue()) { File tempFile = new File(ClusterFileTransfer.getServerTempDir(), fileChunk.getDestinationFile().getName()); File target = new File(getParentFile(parentName), fileChunk.getDestinationFile().getName()); if (target.exists()) if(!target.delete()) logMessage("Could not delete target file " + target); tempFile.setLastModified(fileChunk.lastModified()); if (!localMove(tempFile,target)) { logMessage("Could not move " + tempFile + " to " + target); } } } } catch (IOException e) { logException(e); } } /** Called when a new partition topology occurs. see HAPartition.AsynchHAMembershipListener * * @param deadMembers A list of nodes that have died since the previous view * @param newMembers A list of nodes that have joined the partition since the previous view * @param allMembers A list of nodes that built the current view */ public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers) { // Are there any deadMembers contained in mPushsInProcess or in mPullsInProcess. // If so, cancel operations for them. // If contained in mPushsInProcess, then we can stop waiting for the rest of the file transfer. // If contained in mPullsInProcess, then we can stop supplying for the rest of the file transfer. if (mPushsInProcess.size() > 0) { synchronized(mPushsInProcess) { Collection values = mPushsInProcess.values(); Iterator iter = values.iterator(); while (iter.hasNext()) { FilePushOperation push = (FilePushOperation)iter.next(); if (deadMembers.contains(push.getOriginatingNode())) { // cancel the operation and remove the operation from mPushsInProcess push.cancel(); iter.remove(); } } } } if (mPullsInProcess.size() > 0) { synchronized(mPullsInProcess) { Collection values = mPullsInProcess.values(); Iterator iter = values.iterator(); while(iter.hasNext()) { FilePullOperation pull = (FilePullOperation)iter.next(); if (deadMembers.contains(pull.getFileChunk().getOriginatingNode())) { // cancel the operation and remove the operation from mPullsInProcess pull.cancel(); iter.remove(); } } } } } private static File getServerTempDir() { return TEMP_DIRECTORY; } private File getParentFile(String parentName) { return (File) mParentFolders.get(parentName); } private String CompositeKey(String originNodeName, String fileName) { return originNodeName + "#" + fileName; } private static void logMessage(String message) { log.info(message); } private static void logException(Throwable e) { //e.printStackTrace(); log.error(e); } /** * Represents file push operation. */ private static class FilePushOperation { public FilePushOperation(String originNodeName, ClusterNode originNode) { mOriginNodeName =originNodeName; mOriginNode = originNode; } public void openOutputFile(File file) throws FileNotFoundException { mOutput = new FileOutputStream(file); mOutputFile = file; } /** * Cancel the file push operation. To be called locally on each machine that is * receiving the file. */ public void cancel() { ClusterFileTransfer.logMessage("Canceling receive of file " + mOutputFile + " as remote server "+mOriginNodeName+" left the cluster. Partial results will be deleted."); try { // close the output stream and delete the file. mOutput.close(); if(!mOutputFile.delete()) logMessage("Could not delete output file " + mOutputFile); } catch(IOException e) { logException(e); } } /** * Get the IPAddress of the cluster node that is pushing file to the cluster. * @return IPAddress */ public ClusterNode getOriginatingNode() { return mOriginNode; } public OutputStream getOutputStream() { return mOutput; } private OutputStream mOutput; private String mOriginNodeName; private ClusterNode mOriginNode; private File mOutputFile; } /** * Represents file pull operation. */ private static class FilePullOperation { public FilePullOperation(FileContentChunk fileChunk) { mFileChunk = fileChunk; } public void openInputFile() throws FileNotFoundException { mInput = mFileChunk.openInputFile(); } public InputStream getInputStream() { return mInput; } /** * Cancel the file pull operation. To be called locally on the machine that is supplying the file. */ public void cancel() { logMessage("Canceling send of file " + mFileChunk.getDestinationFile() + " as remote server "+mFileChunk.getOriginatingNodeName()+" left the cluster."); try { mInput.close(); } catch(IOException e) { logException(e); } } public FileContentChunk getFileChunk() { return mFileChunk; } private FileContentChunk mFileChunk; private InputStream mInput; } /** * For representing filetransfer state on the wire. * The inputStream or OutputStream is expected to be maintained by the sender/receiver. */ private static class FileContentChunk implements Serializable { public FileContentChunk(File file, String originNodeName, ClusterNode originNode) { this.mDestinationFile = file; this.mLastModified = file.lastModified(); this.mOriginNode = originNode; this.mOriginNodeName = originNodeName; mChunkNumber = 0; long size = file.length(); if (size > MAX_CHUNK_BUFFER_SIZE) size = MAX_CHUNK_BUFFER_SIZE; else if (size <= 0) size = 1; mChunk = new byte[(int) size]; // set amount transferred at a time mByteCount = 0; } /** * Get the name of the cluster node that started the file transfer operation * * @return node name */ public String getOriginatingNodeName() { return this.mOriginNodeName; } /** * Get the address of the cluster node that started the file transfer operation. * @return ClusterNode */ public ClusterNode getOriginatingNode() { return mOriginNode; } public File getDestinationFile() { return this.mDestinationFile; } /** * Open input file * * @throws FileNotFoundException */ public InputStream openInputFile() throws FileNotFoundException { return new FileInputStream(this.mDestinationFile); } /** * Open output file * * @return * @throws FileNotFoundException */ public OutputStream openOutputFile() throws FileNotFoundException { File lFile = new File(ClusterFileTransfer.getServerTempDir(), this.mDestinationFile.getName()); FileOutputStream output = new FileOutputStream(lFile); return output; } /** * @return number of bytes read * @throws IOException */ public int readNext(InputStream input) throws IOException { this.mChunkNumber++; this.mByteCount = input.read(this.mChunk); return this.mByteCount; } public long lastModified() { return mLastModified; } static final long serialVersionUID = 3546447481674749363L; private File mDestinationFile; private long mLastModified; private String mOriginNodeName; private ClusterNode mOriginNode; private int mChunkNumber; private static final int FIRST_CHUNK = 1; private byte[] mChunk; private int mByteCount; } public static boolean localMove(File source, File destination) throws FileNotFoundException, IOException { if(source.renameTo(destination)) // if we can simply rename the file return true; // return success // otherwise, copy source to destination OutputStream out = new FileOutputStream(destination); InputStream in = new FileInputStream(source); byte buffer[] = new byte[32*1024]; int bytesRead = 0; while(bytesRead > -1) { // until we hit end of source file bytesRead = in.read(buffer); if(bytesRead > 0) { out.write(buffer,0, bytesRead); } } in.close(); out.close(); if(!source.delete()) logMessage("Could not delete file "+ source); return true; } /** * Exception wrapper class */ public static class ClusterFileTransferException extends Exception { public ClusterFileTransferException(String message) { super(message); } public ClusterFileTransferException(String message, Throwable cause) { super(message, cause); } public ClusterFileTransferException(Throwable cause) { super(cause); } } }