wip
This commit is contained in:
@ -326,20 +326,20 @@ class ExecuteNode:
|
|||||||
else:
|
else:
|
||||||
return(self.ssh_to)
|
return(self.ssh_to)
|
||||||
|
|
||||||
def parse_stdout(self, line):
|
def _parse_stdout(self, line):
|
||||||
"""parse stdout. can be overridden in subclass"""
|
"""parse stdout. can be overridden in subclass"""
|
||||||
if self.debug_output:
|
if self.debug_output:
|
||||||
self.debug("STDOUT > "+line.rstrip())
|
self.debug("STDOUT > "+line.rstrip())
|
||||||
|
|
||||||
|
|
||||||
def parse_stderr(self, line, hide_errors):
|
def _parse_stderr(self, line, hide_errors):
|
||||||
"""parse stderr. can be overridden in subclass"""
|
"""parse stderr. can be overridden in subclass"""
|
||||||
if hide_errors:
|
if hide_errors:
|
||||||
self.debug("STDERR > "+line.rstrip())
|
self.debug("STDERR > "+line.rstrip())
|
||||||
else:
|
else:
|
||||||
self.error("STDERR > "+line.rstrip())
|
self.error("STDERR > "+line.rstrip())
|
||||||
|
|
||||||
def parse_stderr_pipe(self, line, hide_errors):
|
def _parse_stderr_pipe(self, line, hide_errors):
|
||||||
"""parse stderr from pipe input process. can be overridden in subclass"""
|
"""parse stderr from pipe input process. can be overridden in subclass"""
|
||||||
if hide_errors:
|
if hide_errors:
|
||||||
self.debug("STDERR|> "+line.rstrip())
|
self.debug("STDERR|> "+line.rstrip())
|
||||||
@ -425,19 +425,19 @@ class ExecuteNode:
|
|||||||
line=p.stdout.readline()
|
line=p.stdout.readline()
|
||||||
if line!="":
|
if line!="":
|
||||||
output_lines.append(line.rstrip())
|
output_lines.append(line.rstrip())
|
||||||
self.parse_stdout(line)
|
self._parse_stdout(line)
|
||||||
else:
|
else:
|
||||||
eof_count=eof_count+1
|
eof_count=eof_count+1
|
||||||
if p.stderr in read_ready:
|
if p.stderr in read_ready:
|
||||||
line=p.stderr.readline()
|
line=p.stderr.readline()
|
||||||
if line!="":
|
if line!="":
|
||||||
self.parse_stderr(line, hide_errors)
|
self._parse_stderr(line, hide_errors)
|
||||||
else:
|
else:
|
||||||
eof_count=eof_count+1
|
eof_count=eof_count+1
|
||||||
if isinstance(input, subprocess.Popen) and (input.stderr in read_ready):
|
if isinstance(input, subprocess.Popen) and (input.stderr in read_ready):
|
||||||
line=input.stderr.readline()
|
line=input.stderr.readline()
|
||||||
if line!="":
|
if line!="":
|
||||||
self.parse_stderr_pipe(line, hide_errors)
|
self._parse_stderr_pipe(line, hide_errors)
|
||||||
else:
|
else:
|
||||||
eof_count=eof_count+1
|
eof_count=eof_count+1
|
||||||
|
|
||||||
@ -472,6 +472,7 @@ class ExecuteNode:
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class ZfsDataset():
|
class ZfsDataset():
|
||||||
"""a zfs dataset (filesystem/volume/snapshot/clone)
|
"""a zfs dataset (filesystem/volume/snapshot/clone)
|
||||||
Note that a dataset doesnt have to actually exist (yet/anymore)
|
Note that a dataset doesnt have to actually exist (yet/anymore)
|
||||||
@ -777,6 +778,7 @@ class ZfsDataset():
|
|||||||
if show_progress:
|
if show_progress:
|
||||||
cmd.append("-v")
|
cmd.append("-v")
|
||||||
cmd.append("-P")
|
cmd.append("-P")
|
||||||
|
self.zfs_node.reset_progress()
|
||||||
|
|
||||||
|
|
||||||
#resume a previous send? (dont need more parameters in that case)
|
#resume a previous send? (dont need more parameters in that case)
|
||||||
@ -829,6 +831,8 @@ class ZfsDataset():
|
|||||||
|
|
||||||
cmd.append(self.filesystem_name)
|
cmd.append(self.filesystem_name)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
self.zfs_node.run(cmd, input=pipe)
|
self.zfs_node.run(cmd, input=pipe)
|
||||||
|
|
||||||
#invalidate cache, but we at least know we exist now
|
#invalidate cache, but we at least know we exist now
|
||||||
@ -910,6 +914,8 @@ class ZfsDataset():
|
|||||||
def sync_snapshots(self, target_dataset, show_progress=False, resume=True):
|
def sync_snapshots(self, target_dataset, show_progress=False, resume=True):
|
||||||
"""sync our snapshots to target_dataset"""
|
"""sync our snapshots to target_dataset"""
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#resume something first?
|
#resume something first?
|
||||||
if self.resume_transfer(target_dataset, show_progress):
|
if self.resume_transfer(target_dataset, show_progress):
|
||||||
#running in readonly mode and no snapshots yet? assume initial snapshot (otherwise we cant find common snapshot in next step)
|
#running in readonly mode and no snapshots yet? assume initial snapshot (otherwise we cant find common snapshot in next step)
|
||||||
@ -964,14 +970,19 @@ class ZfsDataset():
|
|||||||
#does target actually want it?
|
#does target actually want it?
|
||||||
if target_snapshot in target_keeps:
|
if target_snapshot in target_keeps:
|
||||||
source_snapshot.transfer_snapshot(target_snapshot, prev_snapshot=prev_source_snapshot, show_progress=show_progress, resume=resume)
|
source_snapshot.transfer_snapshot(target_snapshot, prev_snapshot=prev_source_snapshot, show_progress=show_progress, resume=resume)
|
||||||
else:
|
|
||||||
source_snapshot.debug("skipped (target doesnt need it)")
|
|
||||||
|
|
||||||
#we may destroy the previous snapshot now, if we dont want it anymore
|
#we may destroy the previous snapshot now, if we dont want it anymore
|
||||||
if prev_source_snapshot and (prev_source_snapshot not in source_keeps):
|
if prev_source_snapshot and (prev_source_snapshot not in source_keeps):
|
||||||
prev_source_snapshot.destroy()
|
prev_source_snapshot.destroy()
|
||||||
|
|
||||||
prev_source_snapshot=source_snapshot
|
prev_source_snapshot=source_snapshot
|
||||||
|
else:
|
||||||
|
source_snapshot.debug("skipped (target doesnt need it)")
|
||||||
|
#destroy it if we also dont want it anymore:
|
||||||
|
if source_snapshot not in source_keeps:
|
||||||
|
prev_source_snapshot.destroy()
|
||||||
|
|
||||||
|
|
||||||
source_snapshot=self.find_our_next_snapshot(source_snapshot)
|
source_snapshot=self.find_our_next_snapshot(source_snapshot)
|
||||||
|
|
||||||
|
|
||||||
@ -998,8 +1009,54 @@ class ZfsNode(ExecuteNode):
|
|||||||
|
|
||||||
self.thinner=thinner
|
self.thinner=thinner
|
||||||
|
|
||||||
|
|
||||||
ExecuteNode.__init__(self, ssh_to=ssh_to, readonly=readonly, debug_output=debug_output)
|
ExecuteNode.__init__(self, ssh_to=ssh_to, readonly=readonly, debug_output=debug_output)
|
||||||
|
|
||||||
|
|
||||||
|
def reset_progress(self):
|
||||||
|
"""reset progress output counters"""
|
||||||
|
self._progress_total_bytes=0
|
||||||
|
self._progress_prev_bytes=0
|
||||||
|
|
||||||
|
def _parse_stderr_pipe(self, line, hide_errors):
|
||||||
|
"""try to parse progress output of a piped zfs recv -Pv """
|
||||||
|
|
||||||
|
|
||||||
|
#is it progress output?
|
||||||
|
progress_fields=line.rstrip().split("\t")
|
||||||
|
|
||||||
|
if (line.find("nvlist version")==0 or
|
||||||
|
line.find("resume token contents")==0 or
|
||||||
|
len(progress_fields)!=1):
|
||||||
|
|
||||||
|
#always output for debugging offcourse
|
||||||
|
self.debug("STDERR|> "+line.rstrip())
|
||||||
|
|
||||||
|
if len(progress_fields)>=3:
|
||||||
|
if progress_fields[0]=='full' or progress_fields[0]=='size':
|
||||||
|
self._progress_total_bytes=int(progress_fields[2])
|
||||||
|
elif progress_fields[0]=='incremental':
|
||||||
|
self._progress_total_bytes=int(progress_fields[3])
|
||||||
|
else:
|
||||||
|
bytes=int(progress_fields[1])
|
||||||
|
|
||||||
|
percentage=int(bytes*100/self._progress_total_bytes)
|
||||||
|
|
||||||
|
# print("{}% \r", end='')
|
||||||
|
print(" {}% \r".format(percentage), end='')
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
# #is it progress output?
|
||||||
|
# if progress_output.find("nv")
|
||||||
|
|
||||||
|
|
||||||
|
#normal output without progress stuff
|
||||||
|
if hide_errors:
|
||||||
|
self.debug("STDERR|> "+line.rstrip())
|
||||||
|
else:
|
||||||
|
self.error("STDERR|> "+line.rstrip())
|
||||||
|
|
||||||
def verbose(self,txt):
|
def verbose(self,txt):
|
||||||
self.zfs_autobackup.verbose("{} {}".format(self.description, txt))
|
self.zfs_autobackup.verbose("{} {}".format(self.description, txt))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user