diff --git a/src/utils/ltfs_ordered_copy b/src/utils/ltfs_ordered_copy index 5537964e..f962b7fc 100755 --- a/src/utils/ltfs_ordered_copy +++ b/src/utils/ltfs_ordered_copy @@ -44,6 +44,12 @@ import threading from logging import getLogger, basicConfig, NOTSET, CRITICAL, ERROR, WARNING, INFO, DEBUG from collections import deque +try: + from tqdm import tqdm + USE_TQDM = True +except ImportError: + USE_TQDM = False + class CopyItem: """""" def __init__(self, src, dst, vea_pre, cp_attr, cp_xattr, logger): #initialization @@ -77,15 +83,20 @@ class CopyItem: return (self.vuuid, self.part, self.start) - def run(self): + def _run_copy(self, progress): + with open(self.src, 'rb') as srcf, open(self.dst, 'wb') as dstf: + copyfileobj(srcf, dstf, progress.update) + + def run(self, progress): try: if len(self.vuuid): logger.debug('"{0}" ({2}) -> "{1}"'.format(self.src, self.dst, str(self.start))) else: logger.debug('"{0}" -> "{1}"'.format(self.src, self.dst)) - + progress.update_file(self.src) if self.cp_attr: #Copy data and metadata - shutil.copy2(self.src, self.dst) + self._run_copy(progress) + shutil.copystat(self.src, self.dst) if self.cp_xattr: # Capture EAs of the source file src_attributes = {} @@ -96,7 +107,8 @@ class CopyItem: for key in src_attributes: xattr.set(self.dst, key, src_attributes[key]) else: #Only copy data - shutil.copy(self.src, self.dst) + self._run_copy(progress) + shutil.copymode(self.src, self.dst) except Exception as e: self.logger.error('Failed to copy "{0}" to "{1}": {2}'.format(self.src, self.dst, str(str(e)))) return False @@ -116,9 +128,11 @@ class CopyQueue: self.items = 0 self.logger = logger self.sort_files = sort_files + self.total_bytes = 0 def add_copy_item(self, c): (u, p, s) = c.eval() + self.total_bytes += os.path.getsize(c.src) if u == '': # Source is not on LTFS self.direct.append(c) @@ -206,24 +220,81 @@ class CopyQueue: def get_size(self): return self.items +RESULT_LOCK = threading.Lock() + +# Based on shutil's code +def copyfileobj(fsrc, fdst, callback, length=shutil.COPY_BUFSIZE): + try: + # check for optimisation opportunity + if "b" in fsrc.mode and "b" in fdst.mode and fsrc.readinto: + return _copyfileobj_readinto(fsrc, fdst, callback, length) + except AttributeError: + # one or both file objects do not support a .mode or .readinto attribute + pass + + fsrc_read = fsrc.read + fdst_write = fdst.write + + while True: + buf = fsrc_read(length) + if not buf: + break + fdst_write(buf) + callback(len(buf)) + +def _copyfileobj_readinto(fsrc, fdst, callback, length=shutil.COPY_BUFSIZE): + """readinto()/memoryview() based variant of copyfileobj(). + *fsrc* must support readinto() method and both files must be + open in binary mode. + """ + # Localize variable access to minimize overhead. + fsrc_readinto = fsrc.readinto + fdst_write = fdst.write + with memoryview(bytearray(length)) as mv: + while True: + n = fsrc_readinto(mv) + if not n: + break + elif n < length: + with mv[:n] as smv: + fdst.write(smv) + else: + fdst_write(mv) + callback(n) + class Progress: - def __init__(self, logger, title, num): #initialization + def __init__(self, logger, title, num_f, num_b): #initialization self.logger = logger self.title = title - self.num = num - self.cur = 0 + self.num_f = num_f + self.num_b = num_b + self.cur_f = 0 + self.tqdm = None + + def update_file(self, name): + # Delay the initialization of tqdm to prevent console spam + if self.logger.getEffectiveLevel() == INFO and USE_TQDM and self.tqdm is None: + self.tqdm = tqdm(total=self.num_b, unit='B', unit_scale=True, unit_divisor=1024) - def update(self, step = 1): + self.cur_f += 1 if self.logger.getEffectiveLevel() == INFO: - self.cur = self.cur + 1 - sys.stderr.write('\r{}: {}/{}'.format(self.title, self.cur, self.num)) - sys.stderr.flush() + if self.tqdm is not None: + self.tqdm.set_description(f'{name} [{self.cur_f} / {self.num_f}]') + else: + sys.stderr.write('\r{}: {}/{}'.format(self.title, self.cur_f, self.num_f)) + sys.stderr.flush() + + def update(self, bytes_add): + if self.tqdm: + self.tqdm.update(bytes_add) def finish(self): - if self.logger.getEffectiveLevel() == INFO: - logger.info("") + if self.tqdm is not None: + self.tqdm.close() + else: + if self.logger.getEffectiveLevel() == INFO: + logger.info("") -RESULT_LOCK = threading.Lock() def writer(logger, prog, q, r): while True: @@ -236,9 +307,7 @@ def writer(logger, prog, q, r): logger.error('writer thread error: ' + str(e)) exit(1) - prog.update() - - result = ci.run() + result = ci.run(prog) with RESULT_LOCK: if result: @@ -363,7 +432,7 @@ direct_write_threads = 8 try: sig = xattr.get(args.DEST, VEA_PREFIX + LTFS_SIG_VEA) - if sig.startswith("LTFS"): + if sig.startswith(b"LTFS"): logger.info("Destination {0} is LTFS".format(args.DEST)) direct_write_threads = 1 else: @@ -425,7 +494,7 @@ success = 0 fail = 0 direct = copyq.pop_direct() -prog_disk = Progress(logger, 'File copy from disk is on going', len(direct)) +prog_disk = Progress(logger, 'File copy from disk is on going', len(direct), copyq.total_bytes) if len(direct): logger.info("Copying on {} disk files with {} threads".format(len(direct), direct_write_threads)) writers = [] @@ -444,7 +513,7 @@ if len(direct): prog_disk.finish() # Copy files on LTFS -prog_tape = Progress(logger, 'File copy from tape is on going', copyq.get_size()) +prog_tape = Progress(logger, 'File copy from tape is on going', copyq.get_size(), copyq.total_bytes) (tape_key, tape) = copyq.pop_tape() while tape != None: logger.log(NOTSET + 1, "Processing {}".format(len(tape))) @@ -461,8 +530,7 @@ while tape != None: for start_block_key in start_block_list: file_ind = partition[start_block_key] for cp in file_ind: - prog_tape.update() - result = cp.run() + result = cp.run(prog_tape) if result: success = success + 1 else: