take into account pool features and supported options (fixes regressions). also automaticly enable resume if its supported
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@ -7,3 +7,5 @@ zfs_autobackup.egg-info
|
|||||||
.eggs/
|
.eggs/
|
||||||
__pycache__
|
__pycache__
|
||||||
.coverage
|
.coverage
|
||||||
|
*.pyc
|
||||||
|
|
||||||
|
|||||||
@ -461,6 +461,72 @@ class ExecuteNode(Logger):
|
|||||||
return(output_lines)
|
return(output_lines)
|
||||||
|
|
||||||
|
|
||||||
|
class ZfsPool():
|
||||||
|
"""a zfs pool"""
|
||||||
|
|
||||||
|
def __init__(self, zfs_node, name):
|
||||||
|
"""name: name of the pool
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.zfs_node=zfs_node
|
||||||
|
self.name=name
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return("{}: {}".format(self.zfs_node, self.name))
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return(self.name)
|
||||||
|
|
||||||
|
def __eq__(self, obj):
|
||||||
|
if not isinstance(obj, ZfsPool):
|
||||||
|
return(False)
|
||||||
|
|
||||||
|
return(self.name == obj.name)
|
||||||
|
|
||||||
|
def verbose(self,txt):
|
||||||
|
self.zfs_node.verbose("zpool {}: {}".format(self.name, txt))
|
||||||
|
|
||||||
|
def error(self,txt):
|
||||||
|
self.zfs_node.error("zpool {}: {}".format(self.name, txt))
|
||||||
|
|
||||||
|
def debug(self,txt):
|
||||||
|
self.zfs_node.debug("zpool {}: {}".format(self.name, txt))
|
||||||
|
|
||||||
|
|
||||||
|
@cached_property
|
||||||
|
def properties(self):
|
||||||
|
"""all zpool properties"""
|
||||||
|
|
||||||
|
self.debug("Getting zpool properties")
|
||||||
|
|
||||||
|
cmd=[
|
||||||
|
"zpool", "get", "-H", "-o", "property,value", "-p", "all", self.name
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
ret={}
|
||||||
|
|
||||||
|
for pair in self.zfs_node.run(tab_split=True, cmd=cmd, readonly=True, valid_exitcodes=[ 0 ]):
|
||||||
|
if len(pair)==2:
|
||||||
|
ret[pair[0]]=pair[1]
|
||||||
|
|
||||||
|
return(ret)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def features(self):
|
||||||
|
"""get list of active zpool features"""
|
||||||
|
|
||||||
|
ret=[]
|
||||||
|
for (key,value) in self.properties.items():
|
||||||
|
if key.startswith("feature@"):
|
||||||
|
feature=key.split("@")[1]
|
||||||
|
if value=='enabled' or value=='active':
|
||||||
|
ret.append(feature)
|
||||||
|
|
||||||
|
return(ret)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -842,10 +908,9 @@ class ZfsDataset():
|
|||||||
return(self.from_names(names[1:]))
|
return(self.from_names(names[1:]))
|
||||||
|
|
||||||
|
|
||||||
def send_pipe(self, prev_snapshot=None, resume=True, resume_token=None, show_progress=False, raw=False):
|
def send_pipe(self, features, prev_snapshot=None, resume_token=None, show_progress=False, raw=False):
|
||||||
"""returns a pipe with zfs send output for this snapshot
|
"""returns a pipe with zfs send output for this snapshot
|
||||||
|
|
||||||
resume: Use resuming (both sides need to support it)
|
|
||||||
resume_token: resume sending from this token. (in that case we don't need to know snapshot names)
|
resume_token: resume sending from this token. (in that case we don't need to know snapshot names)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@ -854,16 +919,17 @@ class ZfsDataset():
|
|||||||
|
|
||||||
cmd.extend(["zfs", "send", ])
|
cmd.extend(["zfs", "send", ])
|
||||||
|
|
||||||
# #all kind of performance options:
|
#all kind of performance options:
|
||||||
# if "-L" in self.zfs_node.supported_send_options:
|
if 'large_blocks' in features and "-L" in self.zfs_node.supported_send_options:
|
||||||
# cmd.append("-L") # large block support
|
cmd.append("-L") # large block support (only if recordsize>128k which is seldomly used)
|
||||||
|
|
||||||
# if "-e" in self.zfs_node.supported_send_options:
|
if 'embedded_data' in features and "-e" in self.zfs_node.supported_send_options:
|
||||||
# cmd.append("-e") # WRITE_EMBEDDED, more compact stream
|
cmd.append("-e") # WRITE_EMBEDDED, more compact stream
|
||||||
|
|
||||||
# if "-c" in self.zfs_node.supported_send_options:
|
if "-c" in self.zfs_node.supported_send_options:
|
||||||
# cmd.append("-c") # use compressed WRITE records
|
cmd.append("-c") # use compressed WRITE records
|
||||||
|
|
||||||
|
#NOTE: performance is usually worse with this option, according to manual
|
||||||
# if not resume:
|
# if not resume:
|
||||||
# if "-D" in self.zfs_node.supported_send_options:
|
# if "-D" in self.zfs_node.supported_send_options:
|
||||||
# cmd.append("-D") # dedupped stream, sends less duplicate data
|
# cmd.append("-D") # dedupped stream, sends less duplicate data
|
||||||
@ -901,11 +967,11 @@ class ZfsDataset():
|
|||||||
return(self.zfs_node.run(cmd, pipe=True))
|
return(self.zfs_node.run(cmd, pipe=True))
|
||||||
|
|
||||||
|
|
||||||
def recv_pipe(self, pipe, resume=True, filter_properties=[], set_properties=[], ignore_exit_code=False):
|
def recv_pipe(self, pipe, features, filter_properties=[], set_properties=[], ignore_exit_code=False):
|
||||||
"""starts a zfs recv for this snapshot and uses pipe as input
|
"""starts a zfs recv for this snapshot and uses pipe as input
|
||||||
|
|
||||||
note: you can also call both a snapshot and filesystem object.
|
note: you can it both on a snapshot or filesystem object.
|
||||||
the resulting zfs command is the same, only our object cache is invalidated differently.
|
The resulting zfs command is the same, only our object cache is invalidated differently.
|
||||||
"""
|
"""
|
||||||
#### build target command
|
#### build target command
|
||||||
cmd=[]
|
cmd=[]
|
||||||
@ -924,8 +990,9 @@ class ZfsDataset():
|
|||||||
#verbose output
|
#verbose output
|
||||||
cmd.append("-v")
|
cmd.append("-v")
|
||||||
|
|
||||||
if resume:
|
if 'extensible_dataset' in features:
|
||||||
#support resuming
|
#support resuming
|
||||||
|
self.debug("Enabled resume support")
|
||||||
cmd.append("-s")
|
cmd.append("-s")
|
||||||
|
|
||||||
cmd.append(self.filesystem_name)
|
cmd.append(self.filesystem_name)
|
||||||
@ -954,7 +1021,7 @@ class ZfsDataset():
|
|||||||
# cmd.append("|mbuffer -m {}".format(args.buffer))
|
# cmd.append("|mbuffer -m {}".format(args.buffer))
|
||||||
|
|
||||||
|
|
||||||
def transfer_snapshot(self, target_snapshot, prev_snapshot=None, resume=True, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False, resume_token=None, raw=False):
|
def transfer_snapshot(self, target_snapshot, features, prev_snapshot=None, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False, resume_token=None, raw=False):
|
||||||
"""transfer this snapshot to target_snapshot. specify prev_snapshot for incremental transfer
|
"""transfer this snapshot to target_snapshot. specify prev_snapshot for incremental transfer
|
||||||
|
|
||||||
connects a send_pipe() to recv_pipe()
|
connects a send_pipe() to recv_pipe()
|
||||||
@ -973,8 +1040,8 @@ class ZfsDataset():
|
|||||||
target_snapshot.verbose("receiving incremental".format(self.snapshot_name))
|
target_snapshot.verbose("receiving incremental".format(self.snapshot_name))
|
||||||
|
|
||||||
#do it
|
#do it
|
||||||
pipe=self.send_pipe(resume=resume, show_progress=show_progress, prev_snapshot=prev_snapshot, resume_token=resume_token, raw=raw)
|
pipe=self.send_pipe(features=features, show_progress=show_progress, prev_snapshot=prev_snapshot, resume_token=resume_token, raw=raw)
|
||||||
target_snapshot.recv_pipe(pipe, resume=resume, filter_properties=filter_properties, set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code)
|
target_snapshot.recv_pipe(pipe, features=features, filter_properties=filter_properties, set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code)
|
||||||
|
|
||||||
def abort_resume(self):
|
def abort_resume(self):
|
||||||
"""abort current resume state"""
|
"""abort current resume state"""
|
||||||
@ -1108,7 +1175,7 @@ class ZfsDataset():
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
def sync_snapshots(self, target_dataset, show_progress=False, resume=True, filter_properties=[], set_properties=[], ignore_recv_exit_code=False, source_holds=True, rollback=False, raw=False, other_snapshots=False, no_send=False, destroy_incompatible=False):
|
def sync_snapshots(self, target_dataset, features, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False, source_holds=True, rollback=False, raw=False, other_snapshots=False, no_send=False, destroy_incompatible=False):
|
||||||
"""sync this dataset's snapshots to target_dataset, while also thinning out old snapshots along the way."""
|
"""sync this dataset's snapshots to target_dataset, while also thinning out old snapshots along the way."""
|
||||||
|
|
||||||
#determine common and start snapshot
|
#determine common and start snapshot
|
||||||
@ -1206,7 +1273,7 @@ class ZfsDataset():
|
|||||||
#does target actually want it?
|
#does target actually want it?
|
||||||
if target_snapshot not in target_obsoletes:
|
if target_snapshot not in target_obsoletes:
|
||||||
( allowed_filter_properties, allowed_set_properties ) = self.get_allowed_properties(filter_properties, set_properties) #NOTE: should we let transfer_snapshot handle this?
|
( allowed_filter_properties, allowed_set_properties ) = self.get_allowed_properties(filter_properties, set_properties) #NOTE: should we let transfer_snapshot handle this?
|
||||||
source_snapshot.transfer_snapshot(target_snapshot, prev_snapshot=prev_source_snapshot, show_progress=show_progress, resume=resume, filter_properties=allowed_filter_properties, set_properties=allowed_set_properties, ignore_recv_exit_code=ignore_recv_exit_code, resume_token=resume_token, raw=raw)
|
source_snapshot.transfer_snapshot(target_snapshot, features=features, prev_snapshot=prev_source_snapshot, show_progress=show_progress, filter_properties=allowed_filter_properties, set_properties=allowed_set_properties, ignore_recv_exit_code=ignore_recv_exit_code, resume_token=resume_token, raw=raw)
|
||||||
resume_token=None
|
resume_token=None
|
||||||
|
|
||||||
#hold the new common snapshots and release the previous ones
|
#hold the new common snapshots and release the previous ones
|
||||||
@ -1271,9 +1338,12 @@ class ZfsNode(ExecuteNode):
|
|||||||
|
|
||||||
self.thinner=thinner
|
self.thinner=thinner
|
||||||
|
|
||||||
|
#list of ZfsPools
|
||||||
|
self.__pools={}
|
||||||
|
|
||||||
ExecuteNode.__init__(self, ssh_config=ssh_config, ssh_to=ssh_to, readonly=readonly, debug_output=debug_output)
|
ExecuteNode.__init__(self, ssh_config=ssh_config, ssh_to=ssh_to, readonly=readonly, debug_output=debug_output)
|
||||||
|
|
||||||
|
|
||||||
@cached_property
|
@cached_property
|
||||||
def supported_send_options(self):
|
def supported_send_options(self):
|
||||||
"""list of supported options, for optimizing sends"""
|
"""list of supported options, for optimizing sends"""
|
||||||
@ -1297,6 +1367,13 @@ class ZfsNode(ExecuteNode):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
#TODO: also create a get_zfs_dataset() function that stores all the objects in a dict. This should optimize caching a bit and is more consistent.
|
||||||
|
def get_zfs_pool(self, name):
|
||||||
|
"""get a ZfsPool() object from specified name. stores objects internally to enable caching"""
|
||||||
|
|
||||||
|
return(self.__pools.setdefault(name, ZfsPool(self, name)))
|
||||||
|
|
||||||
|
|
||||||
def reset_progress(self):
|
def reset_progress(self):
|
||||||
"""reset progress output counters"""
|
"""reset progress output counters"""
|
||||||
self._progress_total_bytes=0
|
self._progress_total_bytes=0
|
||||||
@ -1474,7 +1551,7 @@ class ZfsAutobackup:
|
|||||||
#not sure if this ever was useful:
|
#not sure if this ever was useful:
|
||||||
# parser.add_argument('--ignore-new', action='store_true', help='Ignore filesystem if there are already newer snapshots for it on the target (use with caution)')
|
# parser.add_argument('--ignore-new', action='store_true', help='Ignore filesystem if there are already newer snapshots for it on the target (use with caution)')
|
||||||
|
|
||||||
parser.add_argument('--resume', action='store_true', help='Support resuming of interrupted transfers by using the zfs extensible_dataset feature (both zpools should have it enabled) Disadvantage is that you need to use zfs recv -A if another snapshot is created on the target during a receive. Otherwise it will keep failing.')
|
parser.add_argument('--resume', action='store_true', help=argparse.SUPPRESS)
|
||||||
parser.add_argument('--strip-path', default=0, type=int, help='Number of directory to strip from path (use 1 when cloning zones between 2 SmartOS machines)')
|
parser.add_argument('--strip-path', default=0, type=int, help='Number of directory to strip from path (use 1 when cloning zones between 2 SmartOS machines)')
|
||||||
# parser.add_argument('--buffer', default="", help='Use mbuffer with specified size to speedup zfs transfer. (e.g. --buffer 1G) Will also show nice progress output.')
|
# parser.add_argument('--buffer', default="", help='Use mbuffer with specified size to speedup zfs transfer. (e.g. --buffer 1G) Will also show nice progress output.')
|
||||||
|
|
||||||
@ -1518,6 +1595,9 @@ class ZfsAutobackup:
|
|||||||
|
|
||||||
self.log=Log(show_debug=self.args.debug, show_verbose=self.args.verbose)
|
self.log=Log(show_debug=self.args.debug, show_verbose=self.args.verbose)
|
||||||
|
|
||||||
|
if args.resume:
|
||||||
|
self.verbose("NOTE: The --resume option isn't needed anymore (its autodetected now)")
|
||||||
|
|
||||||
|
|
||||||
def verbose(self,txt,titles=[]):
|
def verbose(self,txt,titles=[]):
|
||||||
self.log.verbose(txt)
|
self.log.verbose(txt)
|
||||||
@ -1617,7 +1697,13 @@ class ZfsAutobackup:
|
|||||||
if not self.args.no_send and not target_dataset.parent.exists:
|
if not self.args.no_send and not target_dataset.parent.exists:
|
||||||
target_dataset.parent.create_filesystem(parents=True)
|
target_dataset.parent.create_filesystem(parents=True)
|
||||||
|
|
||||||
source_dataset.sync_snapshots(target_dataset, show_progress=self.args.progress, resume=self.args.resume, filter_properties=filter_properties, set_properties=set_properties, ignore_recv_exit_code=self.args.ignore_transfer_errors, source_holds= not self.args.no_holds, rollback=self.args.rollback, raw=self.args.raw, other_snapshots=self.args.other_snapshots, no_send=self.args.no_send, destroy_incompatible=self.args.destroy_incompatible)
|
#determine common zpool features
|
||||||
|
source_features=source_node.get_zfs_pool(source_dataset.split_path()[0]).features
|
||||||
|
target_features=target_node.get_zfs_pool(target_dataset.split_path()[0]).features
|
||||||
|
common_features=source_features and target_features
|
||||||
|
# source_dataset.debug("Common features: {}".format(common_features))
|
||||||
|
|
||||||
|
source_dataset.sync_snapshots(target_dataset, show_progress=self.args.progress, features=common_features, filter_properties=filter_properties, set_properties=set_properties, ignore_recv_exit_code=self.args.ignore_transfer_errors, source_holds= not self.args.no_holds, rollback=self.args.rollback, raw=self.args.raw, other_snapshots=self.args.other_snapshots, no_send=self.args.no_send, destroy_incompatible=self.args.destroy_incompatible)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
fail_count=fail_count+1
|
fail_count=fail_count+1
|
||||||
self.error("DATASET FAILED: "+str(e))
|
self.error("DATASET FAILED: "+str(e))
|
||||||
|
|||||||
@ -1,12 +1,13 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
|
|
||||||
if [ "$USER" != "root" ]; then
|
if [ "$USER" != "root" ]; then
|
||||||
echo "Need root to do proper zfs testing"
|
echo "Need root to do proper zfs testing"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
#reactivate python environment, if any (usefull in Travis)
|
#reactivate python environment, if any (usefull in Travis)
|
||||||
source $VIRTUAL_ENV/bin/activate || true
|
[ "$VIRTUAL_ENV" ] && source $VIRTUAL_ENV/bin/activate
|
||||||
|
|
||||||
# test needs ssh access to localhost for testing
|
# test needs ssh access to localhost for testing
|
||||||
if ! [ -e /root/.ssh/id_rsa ]; then
|
if ! [ -e /root/.ssh/id_rsa ]; then
|
||||||
|
|||||||
Reference in New Issue
Block a user