This commit is contained in:
Edwin Eefting
2019-10-23 23:10:43 +02:00
parent 66727c55b0
commit 5f5e2a8433

View File

@ -456,12 +456,13 @@ class ZfsDataset():
""" """
def __init__(self, zfs_node, name, exists=None): def __init__(self, zfs_node, name, force_exists=None):
"""name: full path of the zfs dataset """name: full path of the zfs dataset
exists: specifiy if you already know a dataset exists or not. for performance reasons. (othewise it will have to check with zfs list when needed) exists: specifiy if you already know a dataset exists or not. for performance reasons. (othewise it will have to check with zfs list when needed)
""" """
self.zfs_node=zfs_node self.zfs_node=zfs_node
self.name=name #full name self.name=name #full name
self.force_exists=force_exists
def __repr__(self): def __repr__(self):
return("{}: {}".format(self.zfs_node, self.name)) return("{}: {}".format(self.zfs_node, self.name))
@ -486,6 +487,7 @@ class ZfsDataset():
"""clear cache""" """clear cache"""
#TODO: nicer? #TODO: nicer?
self._cached_properties={} self._cached_properties={}
self.force_exists=None
def lstrip_path(self,count): def lstrip_path(self,count):
@ -531,13 +533,45 @@ class ZfsDataset():
return(ZfsDataset(self.zfs_node, self.rstrip_path(1))) return(ZfsDataset(self.zfs_node, self.rstrip_path(1)))
def find_our_prev_snapshot(self, snapshot):
"""find our previous snapshot in this dataset. None if it doesnt exist"""
if self.is_snapshot():
raise(Exception("Please call this on a dataset.")
try:
index=self.our_snapshots.index(snapshot)
if index>0:
return(self.our_snapshots[index-1])
else:
return(None)
except:
return(None)
def find_our_next_snapshot(self, snapshot):
"""find our next snapshot in this dataset. None if it doesnt exist"""
if self.is_snapshot():
raise(Exception("Please call this on a dataset.")
try:
index=self.our_snapshots.index(snapshot)
if index>=0 and index<len(self.our_snapshots)-1:
return(self.our_snapshots[index+1])
else:
return(None)
except:
return(None)
@cached_property @cached_property
def exists(self, force=None): def exists(self):
"""check if dataset exists. """check if dataset exists.
Use force to force a specific value to be cached, if you already know. Usefull for performance reasons""" Use force to force a specific value to be cached, if you already know. Usefull for performance reasons"""
if force!=None: if self.force_exists!=None:
return(force) return(self.force_exists)
self.debug("Checking if filesystem exists") self.debug("Checking if filesystem exists")
return(self.zfs_node.run(tab_split=True, cmd=[ "zfs", "list", self.name], readonly=True, valid_exitcodes=[ 0,1 ], hide_errors=True) and True) return(self.zfs_node.run(tab_split=True, cmd=[ "zfs", "list", self.name], readonly=True, valid_exitcodes=[ 0,1 ], hide_errors=True) and True)
@ -560,6 +594,7 @@ class ZfsDataset():
self.debug("Destroying") self.debug("Destroying")
self.zfs_node.run(["zfs", "destroy", self.name]) self.zfs_node.run(["zfs", "destroy", self.name])
self.invalidate() self.invalidate()
self.force_exists=False
@cached_property @cached_property
@ -730,7 +765,7 @@ class ZfsDataset():
def recv_pipe(self, pipe, resume=True): def recv_pipe(self, pipe, resume=True):
"""starts a zfs recv on this dataset and uses pipe as input""" """starts a zfs recv for this snapshot and uses pipe as input"""
#### build target command #### build target command
cmd=[] cmd=[]
@ -751,21 +786,25 @@ class ZfsDataset():
#support resuming #support resuming
cmd.append("-s") cmd.append("-s")
cmd.append(self.name) cmd.append(self.filesystem_name)
self.zfs_node.run(cmd, input=pipe) self.zfs_node.run(cmd, input=pipe)
#invalidate cache, but we at least know we exist now
self.invalidate()
self.force_exists=True
# if args.buffer and args.ssh_target!="local": # if args.buffer and args.ssh_target!="local":
# cmd.append("|mbuffer -m {}".format(args.buffer)) # cmd.append("|mbuffer -m {}".format(args.buffer))
def transfer_snapshot(self, target_dataset, prev_snapshot=None, resume=True, resume_token=None, show_progress=False): def transfer_snapshot(self, target_snapshot, prev_snapshot=None, resume=True, resume_token=None, show_progress=False):
"""transfer this snapshot to target_dataset. specify prev_snapshot for incremental transfer """transfer this snapshot to target_snapshot. specify prev_snapshot for incremental transfer
connects a send_pipe() to recv_pipe() connects a send_pipe() to recv_pipe()
""" """
self.debug("Transfer snapshot to {}".format(target_dataset)) self.debug("Transfer snapshot to {}".format(target_snapshot.filesystem_name))
if resume_token: if resume_token:
target_dataset.verbose("resuming") target_dataset.verbose("resuming")
@ -779,83 +818,79 @@ class ZfsDataset():
#do it #do it
pipe=self.send_pipe(resume=resume, show_progress=show_progress, resume_token=resume_token, prev_snapshot=prev_snapshot) pipe=self.send_pipe(resume=resume, show_progress=show_progress, resume_token=resume_token, prev_snapshot=prev_snapshot)
target_dataset.recv_pipe(pipe) target_snapshot.recv_pipe(pipe)
#update cache
target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name+"@"+self.snapshot_name))
def thin(self, keep=[]): def thin(self, keeps=[]):
"""determines list of snapshots that should be kept or deleted based on the thinning schedule. cull the herd! """determines list of snapshots that should be kept or deleted based on the thinning schedule. cull the herd!
keep: list of snapshots to always keep keep: list of snapshots to always keep (usually the last)
""" """
return(self.zfs_node.thinner.thin(self.our_snapshots, keep_objects=keep)) return(self.zfs_node.thinner.thin(self.our_snapshots, keep_objects=keeps))
def find_common_snapshot(self, target_dataset):
"""find latest coommon snapshot between us and target
returns None if its an initial transfer
"""
if not target_dataset.our_snapshots:
#we have nothing yet
return(None)
else:
snapshot=self.find_snapshot(target_dataset.our_snapshots[:-1].snapshot_name)
if not snapshot:
raise(Exception("Cant find latest target snapshot on source"))
def sync_snapshots(self, target_dataset, show_progress=False): def sync_snapshots(self, target_dataset, show_progress=False):
"""sync our snapshots to target_dataset""" """sync our snapshots to target_dataset"""
if 'receive_resume_token' in target_dataset.properties: #determine start snapshot (the first snapshot after the common snapshot)
resume_token=target_dataset.properties['receive_resume_token'] self.verbose("Determining start snapshot")
else: common_snapshot=self.find_common_snapshot(target_dataset)
resume_token=None if not common_snapshot:
#start from beginning
#determine snapshot we should start sending from
if not target_dataset.exists:
#we have nothing, so start from the first one
start_snapshot=self.our_snapshots[0] start_snapshot=self.our_snapshots[0]
elif if not target_dataset.snapshots:
# we have no snapshots on target (yet?). can we resume?
if 'receive_resume_token' in target_dataset.properties:
resume_token=target_dataset.properties['receive_resume_token']
#no snapshots yet
start_snapshot=target_dataset.our_snapshots
#dertermine the snapshots that are obsolete so we can clean along the way.
(source_keeps, source_obsoletes)=self.thin()
#XXX: pre-create target snapshot list with exist=False so the thinner can "plan ahead" what the target eventually wants
(target_keeps, target_obsoletes)=self.thin()
# inital transfer
resume_token=None
if not target_dataset.exists:
self.debug("Sync snapshots: Initial transfer")
self.our_snapshots[0].transfer_snapshot(target_dataset, show_progress=show_progress)
else: else:
#filesystem exists, need to resume something? start_snapshot=self.find_our_next_snapshot(common_snapshot)
if 'receive_resume_token' in target_dataset.properties:
self.debug("Sync snapshots: Found resume token")
resume_token=target_dataset.properties['receive_resume_token']
#resume initial transfer? #create virtual target snapshots
if len(target_dataset.our_snapshots)==0: self.verbose("Creating virtual target snapshots")
self.debug("Sync snapshots: Resuming inital transfer") source_snapshot=start_snapshot
self.our_snapshots[0].transfer_snapshot(target_dataset, show_progress=show_progress, resume_token=resume_token) while source_snapshot:
resume_token=None #create virtual target snapshot
virtual_snapshot=ZfsDataset(target_dataset.zfs_node, target_dataset.filesystem_name+"@"+source_snapshot.snapshot_name,force_exists=False)
target_dataset.snapshots.append(virtual_snapshot)
source_snapshot=self.find_our_next_snapshot(source_snapshot)
#increments #now let thinner decide what we want on both sides
self.debug("Sync snapshots: Incremental transfer") self.verbose("Create thinning list")
latest_common_snapshot=None (source_keeps, source_obsoletes)=self.thin(keeps=[self.our_snapshots[:-1]])
(target_keeps, target_obsoletes)=target_dataset.thin(keeps=[target_dataset.our_snapshots[:-1]])
#stuff that is before common snapshot can be deleted rightaway
if common_snapshot:
for source_snapshot in self.our_snapshots: for source_snapshot in self.our_snapshots:
target_snapshot=target_dataset.find_snapshot(source_snapshot.snapshot_name) if source_snapshot.snapshot_name==common_snapshot.snapshot_name:
#already transferred break
if target_snapshot:
latest_common_snapshot=source_snapshot
else:
if latest_common_snapshot:
# do we still want it on target?
#transfer it
source_snapshot.transfer_snapshot(target_dataset, latest_common_snapshot, show_progress=True, resume_token=resume_token)
resume_token=None
latest_common_snapshot=source_snapshot
# if source_snapshot in source_obsoletes:
source_snapshot.destroy()
for target_snapshot in target_dataset.our_snapshots:
if target_snapshot.snapshot_name==common_snapshot.snapshot_name:
break
if target_snapshot in target_obsoletes:
target_snapshot.destroy()
#now send/destroy the rest on the source
prev_source_snapshot=common_snapshot
source_snapshot=start_snapshot
while source_snapshot:
#does target want it?
if target_dataset.find_snapshot(source_snapshot.snapshot_name):
self.transfer_snapshot(target_dataset, show_progress=show_progress, resume_token=resume_token)
source_snapshot=self.find_our_next_snapshot(source_snapshot)
if not latest_common_snapshot:
raise(Exception("Cant find a common snapshot. (hint: zfs destroy {})".format(target_dataset)))