rewriting output handling
This commit is contained in:
127
zfs_autobackup
127
zfs_autobackup
@ -11,6 +11,8 @@ import pprint
|
|||||||
import time
|
import time
|
||||||
import argparse
|
import argparse
|
||||||
from pprint import pprint as p
|
from pprint import pprint as p
|
||||||
|
import select
|
||||||
|
|
||||||
|
|
||||||
import imp
|
import imp
|
||||||
try:
|
try:
|
||||||
@ -60,7 +62,7 @@ class Log:
|
|||||||
def debug(self, txt):
|
def debug(self, txt):
|
||||||
if self.show_debug:
|
if self.show_debug:
|
||||||
if use_color:
|
if use_color:
|
||||||
print(colorama.Fore.BLUE+ "# "+txt+colorama.Style.RESET_ALL)
|
print(colorama.Fore.GREEN+ "# "+txt+colorama.Style.RESET_ALL)
|
||||||
else:
|
else:
|
||||||
print("# "+txt)
|
print("# "+txt)
|
||||||
|
|
||||||
@ -118,10 +120,12 @@ class ExecuteNode:
|
|||||||
self.readonly=readonly
|
self.readonly=readonly
|
||||||
|
|
||||||
|
|
||||||
def run(self, cmd, input=None, tab_split=False, valid_exitcodes=[ 0 ], readonly=False, hide_errors=False):
|
def run(self, cmd, input=None, tab_split=False, valid_exitcodes=[ 0 ], readonly=False, hide_errors=False, pipe=False):
|
||||||
"""run a command on the node
|
"""run a command on the node
|
||||||
|
|
||||||
readonly: make this True if the command doesnt make any changes and is safe to execute in testmode
|
readonly: make this True if the command doesnt make any changes and is safe to execute in testmode
|
||||||
|
pipe: Instead of executing, return a pipe-handle to be used to input to another run() command. (just like a | in linux)
|
||||||
|
input: Can be None, a string or a pipe-handle you got from another run()
|
||||||
"""
|
"""
|
||||||
|
|
||||||
encoded_cmd=[]
|
encoded_cmd=[]
|
||||||
@ -142,38 +146,106 @@ class ExecuteNode:
|
|||||||
|
|
||||||
#debug and test stuff
|
#debug and test stuff
|
||||||
debug_txt=" ".join(encoded_cmd)
|
debug_txt=" ".join(encoded_cmd)
|
||||||
|
if pipe:
|
||||||
|
debug_txt=debug_txt+" |"
|
||||||
|
|
||||||
if self.readonly and not readonly:
|
if self.readonly and not readonly:
|
||||||
self.debug("[NOT RUNNING] "+debug_txt)
|
self.debug("SKIP > "+debug_txt)
|
||||||
else:
|
else:
|
||||||
self.debug("[RUN] "+ debug_txt)
|
self.debug("RUN > "+ debug_txt)
|
||||||
|
|
||||||
if input:
|
#determine stdin
|
||||||
self.debug("INPUT:\n"+input.rstrip())
|
if input==None:
|
||||||
stdin=subprocess.PIPE
|
|
||||||
else:
|
|
||||||
stdin=None
|
stdin=None
|
||||||
|
elif isinstance(input,str):
|
||||||
|
self.debug("INPUT > \n"+input.rstrip())
|
||||||
|
stdin=subprocess.PIPE
|
||||||
|
elif isinstance(input, subprocess.Popen):
|
||||||
|
self.debug("Piping input")
|
||||||
|
stdin=input.stdout
|
||||||
|
else:
|
||||||
|
abort("Incompatible input")
|
||||||
|
|
||||||
if self.readonly and not readonly:
|
if self.readonly and not readonly:
|
||||||
|
#todo: what happens if input is piped?
|
||||||
return
|
return
|
||||||
|
|
||||||
#execute and parse/return results
|
#execute and parse/return results
|
||||||
p=subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE)
|
p=subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE)
|
||||||
(output, errors)=p.communicate(input=input)
|
|
||||||
|
|
||||||
if not hide_errors:
|
#Note: make streaming?
|
||||||
for line in errors.splitlines():
|
if isinstance(input,str):
|
||||||
self.error(line)
|
p.stdin.write(input)
|
||||||
|
|
||||||
|
if pipe:
|
||||||
|
return(p)
|
||||||
|
|
||||||
|
if isinstance(input, subprocess.Popen):
|
||||||
|
selectors=[p.stdout, p.stderr, input.stderr ]
|
||||||
|
else:
|
||||||
|
selectors=[p.stdout, p.stderr ]
|
||||||
|
|
||||||
|
|
||||||
|
output_lines=[]
|
||||||
|
while True:
|
||||||
|
(read_ready, write_ready, ex_ready)=select.select(selectors, [], [])
|
||||||
|
eof_count=0
|
||||||
|
if p.stdout in read_ready:
|
||||||
|
line=p.stdout.readline()
|
||||||
|
if line!="":
|
||||||
|
output_lines.append(line.rstrip())
|
||||||
|
self.debug("STDOUT > "+line.rstrip())
|
||||||
|
else:
|
||||||
|
eof_count=eof_count+1
|
||||||
|
if p.stderr in read_ready:
|
||||||
|
line=p.stderr.readline()
|
||||||
|
if line!="" and not hide_errors:
|
||||||
|
self.error("STDERR > "+line.rstrip())
|
||||||
|
else:
|
||||||
|
eof_count=eof_count+1
|
||||||
|
if isinstance(input, subprocess.Popen) and (input.strerr in read_ready):
|
||||||
|
line=input.stderr.readline()
|
||||||
|
if line!="" and not hide_errors:
|
||||||
|
self.error("STDERR|> "+line.rstrip())
|
||||||
|
else:
|
||||||
|
eof_count=eof_count+1
|
||||||
|
|
||||||
|
|
||||||
|
#stop if both processes are done and all filehandles are eof:
|
||||||
|
if p.poll()!=None and ((not isinstance(input, subprocess.Popen)) or input.poll()!=None) and eof_count==len(selectors):
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
# if isinstance(input, subprocess.Popen):
|
||||||
|
# (output, errors)=p.communicate()
|
||||||
|
# else:
|
||||||
|
# (output, errors)=p.communicate(input=input)
|
||||||
|
|
||||||
|
|
||||||
|
#handle piped process error output and exit codes
|
||||||
|
if isinstance(input, subprocess.Popen):
|
||||||
|
# pipe_outputs=input.communicate()
|
||||||
|
# if not hide_errors:
|
||||||
|
# for line in pipe_outputs[1].splitlines():
|
||||||
|
# self.error("Pipe-error: "+line)
|
||||||
|
|
||||||
|
if input.returncode not in valid_exitcodes:
|
||||||
|
raise(subprocess.CalledProcessError(input.returncode, "(pipe)"))
|
||||||
|
|
||||||
|
#handle error output and exit codes
|
||||||
|
# if not hide_errors:
|
||||||
|
# for line in errors.splitlines():
|
||||||
|
# self.error(line)
|
||||||
|
|
||||||
if p.returncode not in valid_exitcodes:
|
if p.returncode not in valid_exitcodes:
|
||||||
raise(subprocess.CalledProcessError(p.returncode, encoded_cmd))
|
raise(subprocess.CalledProcessError(p.returncode, encoded_cmd))
|
||||||
|
|
||||||
lines=output.splitlines()
|
# lines=output.splitlines()
|
||||||
if not tab_split:
|
if not tab_split:
|
||||||
return(lines)
|
return(output_lines)
|
||||||
else:
|
else:
|
||||||
ret=[]
|
ret=[]
|
||||||
for line in lines:
|
for line in output_lines:
|
||||||
ret.append(line.split("\t"))
|
ret.append(line.split("\t"))
|
||||||
return(ret)
|
return(ret)
|
||||||
|
|
||||||
@ -356,25 +428,6 @@ class ZfsDataset():
|
|||||||
|
|
||||||
return(self.from_names(names[1:]))
|
return(self.from_names(names[1:]))
|
||||||
|
|
||||||
# def transfer_snapshots(self, source_dataset, source_start_snapshot, source_sends):
|
|
||||||
# """transfer bunch snapshots to this target"""
|
|
||||||
#
|
|
||||||
# receive_resume_token=getattr(source_dataset.properties, 'receive_resume_token', None)
|
|
||||||
# last_snapshot=source_start_snapshot
|
|
||||||
#
|
|
||||||
# for snapshot in source_sends:
|
|
||||||
# if receive_resume_token:
|
|
||||||
# resumed="[RESUMED]"
|
|
||||||
# else:
|
|
||||||
# resumed=""
|
|
||||||
#
|
|
||||||
# if (last_snapshot):
|
|
||||||
# source_dataset.verbose("incremental @{}...@{} {}".format(last_snapshot.snapshot_name, snapshot.snapshot_name, resumed))
|
|
||||||
# else:
|
|
||||||
# source_dataset.verbose("initial @{} {}".format(snapshot.snapshot_name, resumed))
|
|
||||||
#
|
|
||||||
# last_snapshot=snapshot
|
|
||||||
# receive_resume_token=None
|
|
||||||
|
|
||||||
def transfer_snapshot(self, target_dataset, prev_snapshot=None):
|
def transfer_snapshot(self, target_dataset, prev_snapshot=None):
|
||||||
"""transfer this snapshot to target_dataset. specify prev_snapshot for incremental transfer"""
|
"""transfer this snapshot to target_dataset. specify prev_snapshot for incremental transfer"""
|
||||||
@ -396,6 +449,9 @@ class ZfsDataset():
|
|||||||
else:
|
else:
|
||||||
target_dataset.verbose("receiving @{} {}".format(self.snapshot_name, resumed))
|
target_dataset.verbose("receiving @{} {}".format(self.snapshot_name, resumed))
|
||||||
|
|
||||||
|
pipe=self.zfs_node.run(["lsX"], pipe=True, readonly=True)
|
||||||
|
target_dataset.zfs_node.run(["cat"], input=pipe, readonly=True)
|
||||||
|
|
||||||
#update cache
|
#update cache
|
||||||
target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name+"@"+self.snapshot_name))
|
target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name+"@"+self.snapshot_name))
|
||||||
|
|
||||||
@ -591,6 +647,7 @@ class ZfsAutobackup:
|
|||||||
|
|
||||||
description="[Target]"
|
description="[Target]"
|
||||||
target_node=ZfsNode(self.args.backup_name, self, ssh_to=self.args.ssh_target, readonly=self.args.test, description=description)
|
target_node=ZfsNode(self.args.backup_name, self, ssh_to=self.args.ssh_target, readonly=self.args.test, description=description)
|
||||||
|
# target_node.run(["/root/outputtest"], readonly=True)
|
||||||
|
|
||||||
self.set_title("Selecting")
|
self.set_title("Selecting")
|
||||||
source_datasets=source_node.selected_datasets
|
source_datasets=source_node.selected_datasets
|
||||||
@ -617,7 +674,7 @@ class ZfsAutobackup:
|
|||||||
target_dataset=ZfsDataset(target_node, target_name)
|
target_dataset=ZfsDataset(target_node, target_name)
|
||||||
source_dataset.sync_snapshots(target_dataset)
|
source_dataset.sync_snapshots(target_dataset)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.error(str(e))
|
source_dataset.error(str(e))
|
||||||
if self.args.debug:
|
if self.args.debug:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user