Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 90 additions & 22 deletions src/utils/ltfs_ordered_copy
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ import threading
from logging import getLogger, basicConfig, NOTSET, CRITICAL, ERROR, WARNING, INFO, DEBUG
from collections import deque

try:
from tqdm import tqdm
USE_TQDM = True
except ImportError:
USE_TQDM = False

class CopyItem:
""""""
def __init__(self, src, dst, vea_pre, cp_attr, cp_xattr, logger): #initialization
Expand Down Expand Up @@ -77,15 +83,20 @@ class CopyItem:

return (self.vuuid, self.part, self.start)

def run(self):
def _run_copy(self, progress):
with open(self.src, 'rb') as srcf, open(self.dst, 'wb') as dstf:
copyfileobj(srcf, dstf, progress.update)

def run(self, progress):
try:
if len(self.vuuid):
logger.debug('"{0}" ({2}) -> "{1}"'.format(self.src, self.dst, str(self.start)))
else:
logger.debug('"{0}" -> "{1}"'.format(self.src, self.dst))

progress.update_file(self.src)
if self.cp_attr: #Copy data and metadata
shutil.copy2(self.src, self.dst)
self._run_copy(progress)
shutil.copystat(self.src, self.dst)
if self.cp_xattr:
# Capture EAs of the source file
src_attributes = {}
Expand All @@ -96,7 +107,8 @@ class CopyItem:
for key in src_attributes:
xattr.set(self.dst, key, src_attributes[key])
else: #Only copy data
shutil.copy(self.src, self.dst)
self._run_copy(progress)
shutil.copymode(self.src, self.dst)
except Exception as e:
self.logger.error('Failed to copy "{0}" to "{1}": {2}'.format(self.src, self.dst, str(str(e))))
return False
Expand All @@ -116,9 +128,11 @@ class CopyQueue:
self.items = 0
self.logger = logger
self.sort_files = sort_files
self.total_bytes = 0

def add_copy_item(self, c):
(u, p, s) = c.eval()
self.total_bytes += os.path.getsize(c.src)
if u == '':
# Source is not on LTFS
self.direct.append(c)
Expand Down Expand Up @@ -206,24 +220,81 @@ class CopyQueue:
def get_size(self):
return self.items

RESULT_LOCK = threading.Lock()

# Based on shutil's code
def copyfileobj(fsrc, fdst, callback, length=shutil.COPY_BUFSIZE):
try:
# check for optimisation opportunity
if "b" in fsrc.mode and "b" in fdst.mode and fsrc.readinto:
return _copyfileobj_readinto(fsrc, fdst, callback, length)
except AttributeError:
# one or both file objects do not support a .mode or .readinto attribute
pass

fsrc_read = fsrc.read
fdst_write = fdst.write

while True:
buf = fsrc_read(length)
if not buf:
break
fdst_write(buf)
callback(len(buf))

def _copyfileobj_readinto(fsrc, fdst, callback, length=shutil.COPY_BUFSIZE):
"""readinto()/memoryview() based variant of copyfileobj().
*fsrc* must support readinto() method and both files must be
open in binary mode.
"""
# Localize variable access to minimize overhead.
fsrc_readinto = fsrc.readinto
fdst_write = fdst.write
with memoryview(bytearray(length)) as mv:
while True:
n = fsrc_readinto(mv)
if not n:
break
elif n < length:
with mv[:n] as smv:
fdst.write(smv)
else:
fdst_write(mv)
callback(n)

class Progress:
def __init__(self, logger, title, num): #initialization
def __init__(self, logger, title, num_f, num_b): #initialization
self.logger = logger
self.title = title
self.num = num
self.cur = 0
self.num_f = num_f
self.num_b = num_b
self.cur_f = 0
self.tqdm = None

def update_file(self, name):
# Delay the initialization of tqdm to prevent console spam
if self.logger.getEffectiveLevel() == INFO and USE_TQDM and self.tqdm is None:
self.tqdm = tqdm(total=self.num_b, unit='B', unit_scale=True, unit_divisor=1024)

def update(self, step = 1):
self.cur_f += 1
if self.logger.getEffectiveLevel() == INFO:
self.cur = self.cur + 1
sys.stderr.write('\r{}: {}/{}'.format(self.title, self.cur, self.num))
sys.stderr.flush()
if self.tqdm is not None:
self.tqdm.set_description(f'{name} [{self.cur_f} / {self.num_f}]')
else:
sys.stderr.write('\r{}: {}/{}'.format(self.title, self.cur_f, self.num_f))
sys.stderr.flush()

def update(self, bytes_add):
if self.tqdm:
self.tqdm.update(bytes_add)

def finish(self):
if self.logger.getEffectiveLevel() == INFO:
logger.info("")
if self.tqdm is not None:
self.tqdm.close()
else:
if self.logger.getEffectiveLevel() == INFO:
logger.info("")

RESULT_LOCK = threading.Lock()

def writer(logger, prog, q, r):
while True:
Expand All @@ -236,9 +307,7 @@ def writer(logger, prog, q, r):
logger.error('writer thread error: ' + str(e))
exit(1)

prog.update()

result = ci.run()
result = ci.run(prog)

with RESULT_LOCK:
if result:
Expand Down Expand Up @@ -363,7 +432,7 @@ direct_write_threads = 8
try:
sig = xattr.get(args.DEST, VEA_PREFIX + LTFS_SIG_VEA)

if sig.startswith("LTFS"):
if sig.startswith(b"LTFS"):
logger.info("Destination {0} is LTFS".format(args.DEST))
direct_write_threads = 1
else:
Expand Down Expand Up @@ -425,7 +494,7 @@ success = 0
fail = 0

direct = copyq.pop_direct()
prog_disk = Progress(logger, 'File copy from disk is on going', len(direct))
prog_disk = Progress(logger, 'File copy from disk is on going', len(direct), copyq.total_bytes)
if len(direct):
logger.info("Copying on {} disk files with {} threads".format(len(direct), direct_write_threads))
writers = []
Expand All @@ -444,7 +513,7 @@ if len(direct):
prog_disk.finish()

# Copy files on LTFS
prog_tape = Progress(logger, 'File copy from tape is on going', copyq.get_size())
prog_tape = Progress(logger, 'File copy from tape is on going', copyq.get_size(), copyq.total_bytes)
(tape_key, tape) = copyq.pop_tape()
while tape != None:
logger.log(NOTSET + 1, "Processing {}".format(len(tape)))
Expand All @@ -461,8 +530,7 @@ while tape != None:
for start_block_key in start_block_list:
file_ind = partition[start_block_key]
for cp in file_ind:
prog_tape.update()
result = cp.run()
result = cp.run(prog_tape)
if result:
success = success + 1
else:
Expand Down