Source code for XRootD.client.copyprocess

#-------------------------------------------------------------------------------
# Copyright (c) 2012-2014 by European Organization for Nuclear Research (CERN)
# Author: Justin Salmon <jsalmon@cern.ch>
#-------------------------------------------------------------------------------
# This file is part of the XRootD software suite.
#
# XRootD is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# XRootD is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with XRootD.  If not, see <http://www.gnu.org/licenses/>.
#
# In applying this licence, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.
#-------------------------------------------------------------------------------
from __future__ import absolute_import, division, print_function

from pyxrootd import client
from XRootD.client.url import URL
from XRootD.client.responses import XRootDStatus

class ProgressHandlerWrapper(object):
  """Internal progress handler wrapper to convert parameters to friendly 
  types"""
  def __init__(self, handler):
    self.handler = handler

  def begin(self, jobId, total, source, target):
    if self.handler:
      self.handler.begin(jobId, total, URL(source), URL(target))

  def end(self, jobId, results):
    if 'status' in results:
      results['status'] = XRootDStatus(results['status'])
    if self.handler:
      self.handler.end(jobId, results)

  def update(self, jobId, processed, total):
    if self.handler:
      self.handler.update(jobId, processed, total)

  def should_cancel(self, jobId):
    if self.handler:
      return self.handler.should_cancel(jobId)
    else:
      return False

[docs]class CopyProcess(object): """Add multiple individually-configurable copy jobs to a "copy process" and run them in parallel (yes, in parallel, because ``xrootd`` isn't limited by the `GIL`.""" def __init__(self): self.__process = client.CopyProcess() def parallel(self, parallel): """ Add a config job to the copy process in order to set the number of parallel copy jobs. :param parallel: number of parallel copy jobs :type parallel: integer """ self.__process.parallel(parallel)
[docs] def add_job(self, source, target, sourcelimit = 1, force = False, posc = False, coerce = False, mkdir = False, thirdparty = 'none', checksummode = 'none', checksumtype = '', checksumpreset = '', dynamicsource = False, chunksize = 8388608, parallelchunks = 4, inittimeout = 600, tpctimeout = 1800, rmBadCksum = False ): """Add a job to the copy process. :param source: original source URL :type source: string :param target: target directory or file :type target: string :param sourcelimit: max number of download sources :type sourcelimit: integer :param force: overwrite target if it exists :type force: boolean :param posc: persist on successful close :type posc: boolean :param coerce: ignore file usage rules, i.e. apply `FORCE` flag to ``open()`` :type coerce: boolean :param mkdir: create the parent directories when creating a file :type mkdir: boolean :param thirdparty: third party copy mode :type thirdparty: string :param checksummode: checksum mode to be used :type checksummode: string :param checksumtype: type of the checksum to be computed :type checksumtype: string :param checksumpreset: pre-set checksum instead of computing it :type checksumpreset: string :param dynamicsource: read as much data from source as is available without checking the size :type dynamicsource: boolean :param chunksize: chunk size for remote transfers :type chunksize: integer :param parallelchunks: number of chunks that should be requested in parallel :type parallelchunks: integer :param inittimeout: copy initialization timeout :type inittimeout: integer :param tpctimeout: timeout for a third-party copy to finish :type tpctimeout: integer :param rmBadCksum: remove target file on bad checksum :type rmBadCksum: boolean """ self.__process.add_job(source, target, sourcelimit, force, posc, coerce, mkdir, thirdparty, checksummode, checksumtype, checksumpreset, dynamicsource, chunksize, parallelchunks, inittimeout, tpctimeout, rmBadCksum)
[docs] def prepare(self): """Prepare the copy jobs. **Must be called before** ``run()``.""" status = self.__process.prepare() return XRootDStatus(status)
[docs] def run(self, handler=None): """Run the copy jobs with an optional progress handler. :param handler: a copy progress handler. You can subclass :mod:`XRootD.client.utils.CopyProgressHandler` and implement the three methods (``begin()``, ``progress()`` and ``end()`` ) to get regular progress updates for your copy jobs. """ status, results = self.__process.run(ProgressHandlerWrapper(handler)) for x in results: if 'status' in x: x['status'] = XRootDStatus(x['status']) return XRootDStatus(status), results