wip
This commit is contained in:
@ -444,11 +444,11 @@ class ZfsDataset():
|
|||||||
return(self.from_names(names[1:]))
|
return(self.from_names(names[1:]))
|
||||||
|
|
||||||
|
|
||||||
def send_pipe(self, prev_snapshot=None, resume=True, resume_token=None):
|
def send_pipe(self, prev_snapshot=None, resume=True, resume_token=None, show_progress=False):
|
||||||
"""returns a pipe with zfs send output for this snapshot
|
"""returns a pipe with zfs send output for this snapshot
|
||||||
|
|
||||||
resume: Use resuming (both sides need to support it)
|
resume: Use resuming (both sides need to support it)
|
||||||
resume_token: resume sending from this token.
|
resume_token: resume sending from this token. (in that case we dont need to know snapshot names)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
#### build source command
|
#### build source command
|
||||||
@ -463,8 +463,9 @@ class ZfsDataset():
|
|||||||
if not resume:
|
if not resume:
|
||||||
cmd.append("-D") # dedupped stream, sends less duplicate data
|
cmd.append("-D") # dedupped stream, sends less duplicate data
|
||||||
|
|
||||||
#only verbose in debug mode, lots of output
|
#progress output
|
||||||
cmd.append("-v")
|
if show_progress:
|
||||||
|
cmd.append("-v")
|
||||||
|
|
||||||
#resume a previous send? (dont need more parameters in that case)
|
#resume a previous send? (dont need more parameters in that case)
|
||||||
if resume_token:
|
if resume_token:
|
||||||
@ -487,6 +488,7 @@ class ZfsDataset():
|
|||||||
#NOTE: this doenst start the send yet, it only returns a subprocess.Pipe
|
#NOTE: this doenst start the send yet, it only returns a subprocess.Pipe
|
||||||
return(self.zfs_node.run(cmd, pipe=True))
|
return(self.zfs_node.run(cmd, pipe=True))
|
||||||
|
|
||||||
|
|
||||||
def recv_pipe(self, pipe, resume=True):
|
def recv_pipe(self, pipe, resume=True):
|
||||||
"""starts a zfs recv on this dataset and uses pipe as input"""
|
"""starts a zfs recv on this dataset and uses pipe as input"""
|
||||||
#### build target command
|
#### build target command
|
||||||
@ -517,7 +519,7 @@ class ZfsDataset():
|
|||||||
# cmd.append("|mbuffer -m {}".format(args.buffer))
|
# cmd.append("|mbuffer -m {}".format(args.buffer))
|
||||||
|
|
||||||
|
|
||||||
def transfer_snapshot(self, target_dataset, prev_snapshot=None, resume=True):
|
def transfer_snapshot(self, target_dataset, prev_snapshot=None, resume=True, resume_token=None, show_progress=False):
|
||||||
"""transfer this snapshot to target_dataset. specify prev_snapshot for incremental transfer
|
"""transfer this snapshot to target_dataset. specify prev_snapshot for incremental transfer
|
||||||
|
|
||||||
connects a send_pipe() to recv_pipe()
|
connects a send_pipe() to recv_pipe()
|
||||||
@ -525,40 +527,45 @@ class ZfsDataset():
|
|||||||
|
|
||||||
self.debug("Transfer snapshot")
|
self.debug("Transfer snapshot")
|
||||||
|
|
||||||
if target_dataset.exists:
|
#initial or resume
|
||||||
resume_token=getattr(target_dataset.properties, 'receive_resume_token', None)
|
if not prev_snapshot or resume_token:
|
||||||
else:
|
if resume_token:
|
||||||
resume_token=None
|
target_dataset.verbose("(resumed)") #we dont know which one
|
||||||
|
else:
|
||||||
|
target_dataset.verbose("receiving @{}".format(self.snapshot_name))
|
||||||
|
|
||||||
if resume_token:
|
pipe=self.send_pipe(resume=resume, resume_token=resume_token, show_progress=show_progress)
|
||||||
resumed="[RESUMED]"
|
|
||||||
else:
|
|
||||||
resumed=""
|
|
||||||
|
|
||||||
if not prev_snapshot:
|
|
||||||
#initial
|
|
||||||
target_dataset.verbose("receiving @{} {}".format(self.snapshot_name, resumed))
|
|
||||||
pipe=self.send_pipe(resume=resume, resume_token=resume_token)
|
|
||||||
target_dataset.recv_pipe(pipe)
|
target_dataset.recv_pipe(pipe)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
#incemental
|
#incemental
|
||||||
target_dataset.verbose("receiving @{}...@{} {}".format(prev_snapshot.snapshot_name, self.snapshot_name, resumed))
|
target_dataset.verbose("receiving @{}...@{}".format(prev_snapshot.snapshot_name, self.snapshot_name))
|
||||||
|
|
||||||
|
|
||||||
#update cache
|
if resume_token:
|
||||||
target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name+"@"+self.snapshot_name))
|
#we dont know which snapshot it was, so invalidate cache
|
||||||
|
target_dataset.invalidate()
|
||||||
|
else:
|
||||||
|
#update cache
|
||||||
|
target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name+"@"+self.snapshot_name))
|
||||||
|
|
||||||
|
|
||||||
def sync_snapshots(self, target_dataset):
|
def sync_snapshots(self, target_dataset, show_progress=False):
|
||||||
"""sync our snapshots to target_dataset"""
|
"""sync our snapshots to target_dataset"""
|
||||||
|
|
||||||
self.debug("Sync snapshots")
|
self.debug("Sync snapshots")
|
||||||
|
|
||||||
# inital transfer
|
# inital transfer
|
||||||
if not target_dataset.exists:
|
if not target_dataset.exists:
|
||||||
|
self.debug("Initial transfer")
|
||||||
self.our_snapshots[0].transfer_snapshot(target_dataset)
|
self.our_snapshots[0].transfer_snapshot(target_dataset)
|
||||||
|
else:
|
||||||
|
#on resuming we dont need to know anything, the token is enough
|
||||||
|
if 'receive_resume_token' in target_dataset.properties:
|
||||||
|
self.debug("Resume transfer")
|
||||||
|
self.transfer_snapshot(target_dataset, resume_token=target_dataset.properties['receive_resume_token'], show_progress=show_progress)
|
||||||
|
|
||||||
|
self.debug("Incremental transfer")
|
||||||
latest_common_snapshot=None
|
latest_common_snapshot=None
|
||||||
for source_snapshot in self.our_snapshots:
|
for source_snapshot in self.our_snapshots:
|
||||||
target_snapshot=target_dataset.find_snapshot(source_snapshot.snapshot_name)
|
target_snapshot=target_dataset.find_snapshot(source_snapshot.snapshot_name)
|
||||||
@ -715,6 +722,7 @@ class ZfsAutobackup:
|
|||||||
parser.add_argument('--test', action='store_true', help='dont change anything, just show what would be done (still does all read-only operations)')
|
parser.add_argument('--test', action='store_true', help='dont change anything, just show what would be done (still does all read-only operations)')
|
||||||
parser.add_argument('--verbose', action='store_true', help='verbose output')
|
parser.add_argument('--verbose', action='store_true', help='verbose output')
|
||||||
parser.add_argument('--debug', action='store_true', help='debug output (shows commands that are executed)')
|
parser.add_argument('--debug', action='store_true', help='debug output (shows commands that are executed)')
|
||||||
|
parser.add_argument('--progress', action='store_true', help='show zfs progress output (to stderr)')
|
||||||
|
|
||||||
#note args is the only global variable we use, since its a global readonly setting anyway
|
#note args is the only global variable we use, since its a global readonly setting anyway
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
@ -767,7 +775,7 @@ class ZfsAutobackup:
|
|||||||
#determine corresponding target_dataset
|
#determine corresponding target_dataset
|
||||||
target_name=self.args.target_path + "/" + source_dataset.lstrip_path(self.args.strip_path)
|
target_name=self.args.target_path + "/" + source_dataset.lstrip_path(self.args.strip_path)
|
||||||
target_dataset=ZfsDataset(target_node, target_name)
|
target_dataset=ZfsDataset(target_node, target_name)
|
||||||
source_dataset.sync_snapshots(target_dataset)
|
source_dataset.sync_snapshots(target_dataset, show_progress=self.args.progress)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
source_dataset.error(str(e))
|
source_dataset.error(str(e))
|
||||||
if self.args.debug:
|
if self.args.debug:
|
||||||
|
|||||||
Reference in New Issue
Block a user