seperated piping

This commit is contained in:
Edwin Eefting
2021-04-05 22:18:14 +02:00
parent 966df73d2f
commit 4910b1dfb5
5 changed files with 163 additions and 66 deletions

View File

@ -73,39 +73,39 @@ class TestExecuteNode(unittest2.TestCase):
def pipe(self, nodea, nodeb): def pipe(self, nodea, nodeb):
with self.subTest("pipe data"): with self.subTest("pipe data"):
output=nodea.run(["dd", "if=/dev/zero", "count=1000"], pipe=True) output=nodea.get_pipe(["dd", "if=/dev/zero", "count=1000"])
self.assertEqual(nodeb.run(["md5sum"], inp=output), ["816df6f64deba63b029ca19d880ee10a -"]) self.assertEqual(nodeb.run(["md5sum"], inp=output), ["816df6f64deba63b029ca19d880ee10a -"])
with self.subTest("exit code both ends of pipe ok"): with self.subTest("exit code both ends of pipe ok"):
output=nodea.run(["true"], pipe=True) output=nodea.get_pipe(["true"])
nodeb.run(["true"], inp=output) nodeb.run(["true"], inp=output)
with self.subTest("error on pipe input side"): with self.subTest("error on pipe input side"):
with self.assertRaises(subprocess.CalledProcessError): with self.assertRaises(subprocess.CalledProcessError):
output=nodea.run(["false"], pipe=True) output=nodea.get_pipe(["false"])
nodeb.run(["true"], inp=output) nodeb.run(["true"], inp=output)
with self.subTest("error on pipe output side "): with self.subTest("error on pipe output side "):
with self.assertRaises(subprocess.CalledProcessError): with self.assertRaises(subprocess.CalledProcessError):
output=nodea.run(["true"], pipe=True) output=nodea.get_pipe(["true"])
nodeb.run(["false"], inp=output) nodeb.run(["false"], inp=output)
with self.subTest("error on both sides of pipe"): with self.subTest("error on both sides of pipe"):
with self.assertRaises(subprocess.CalledProcessError): with self.assertRaises(subprocess.CalledProcessError):
output=nodea.run(["false"], pipe=True) output=nodea.get_pipe(["false"])
nodeb.run(["false"], inp=output) nodeb.run(["false"], inp=output)
with self.subTest("check stderr on pipe output side"): with self.subTest("check stderr on pipe output side"):
output=nodea.run(["true"], pipe=True) output=nodea.get_pipe(["true"])
(stdout, stderr)=nodeb.run(["ls", "nonexistingfile"], inp=output, return_stderr=True, valid_exitcodes=[0,2]) (stdout, stderr)=nodeb.run(["ls", "nonexistingfile"], inp=output, return_stderr=True, valid_exitcodes=[0,2])
self.assertEqual(stdout,[]) self.assertEqual(stdout,[])
self.assertRegex(stderr[0], "nonexistingfile" ) self.assertRegex(stderr[0], "nonexistingfile" )
with self.subTest("check stderr on pipe input side (should be only printed)"): with self.subTest("check stderr on pipe input side (should be only printed)"):
output=nodea.run(["ls", "nonexistingfile"], pipe=True) output=nodea.get_pipe(["ls", "nonexistingfile"])
(stdout, stderr)=nodeb.run(["true"], inp=output, return_stderr=True, valid_exitcodes=[0,2]) (stdout, stderr)=nodeb.run(["true"], inp=output, return_stderr=True, valid_exitcodes=[0,2])
self.assertEqual(stdout,[]) self.assertEqual(stdout,[])
self.assertEqual(stderr,[] ) self.assertEqual(stderr,[])

View File

@ -38,7 +38,7 @@ class TestZfsScaling(unittest2.TestCase):
#this triggers if you make a change with an impact of more than O(snapshot_count/2) #this triggers if you make a change with an impact of more than O(snapshot_count/2)
expected_runs=343 expected_runs=240
print("ACTUAL RUNS: {}".format(run_counter)) print("ACTUAL RUNS: {}".format(run_counter))
self.assertLess(abs(run_counter-expected_runs), snapshot_count/2) self.assertLess(abs(run_counter-expected_runs), snapshot_count/2)
@ -77,7 +77,7 @@ class TestZfsScaling(unittest2.TestCase):
#this triggers if you make a change with an impact of more than O(snapshot_count/2) #this triggers if you make a change with an impact of more than O(snapshot_count/2)
expected_runs=743 expected_runs=640
print("ACTUAL RUNS: {}".format(run_counter)) print("ACTUAL RUNS: {}".format(run_counter))
self.assertLess(abs(run_counter-expected_runs), dataset_count/2) self.assertLess(abs(run_counter-expected_runs), dataset_count/2)
@ -90,6 +90,6 @@ class TestZfsScaling(unittest2.TestCase):
#this triggers if you make a change with a performance impact of more than O(snapshot_count/2) #this triggers if you make a change with a performance impact of more than O(snapshot_count/2)
expected_runs=947 expected_runs=844
print("ACTUAL RUNS: {}".format(run_counter)) print("ACTUAL RUNS: {}".format(run_counter))
self.assertLess(abs(run_counter-expected_runs), dataset_count/2) self.assertLess(abs(run_counter-expected_runs), dataset_count/2)

View File

@ -48,7 +48,7 @@ class ExecuteNode(LogStub):
def _encode_cmd(self, cmd): def _encode_cmd(self, cmd):
"""returns cmd in encoded and escaped form that can be used with popen.""" """returns cmd in encoded and escaped form that can be used with popen."""
encoded_cmd=[] encoded_cmd = []
# make sure the command gets all the data in utf8 format: # make sure the command gets all the data in utf8 format:
# (this is necessary if LC_ALL=en_US.utf8 is not set in the environment) # (this is necessary if LC_ALL=en_US.utf8 is not set in the environment)
@ -75,27 +75,17 @@ class ExecuteNode(LogStub):
return encoded_cmd return encoded_cmd
def run(self, cmd, inp=None, tab_split=False, valid_exitcodes=None, readonly=False, hide_errors=False, pipe=False, def is_local(self):
return_stderr=False): return self.ssh_to is None
"""run a command on the node.
:param cmd: the actual command, should be a list, where the first item is the command def get_pipe(self, cmd, inp=None, readonly=False):
and the rest are parameters. """return a pipehandle to a process.
:param inp: Can be None, a string or a pipe-handle you got from another run()
:param tab_split: split tabbed files in output into a list
:param valid_exitcodes: list of valid exit codes for this command (checks exit code of both sides of a pipe)
Use [] to accept all exit codes.
:param readonly: make this True if the command doesn't make any changes and is safe to execute in testmode
:param hide_errors: don't show stderr output as error, instead show it as debugging output (use to hide expected errors)
:param pipe: Instead of executing, return a pipe-handle to be used to
input to another run() command. (just like a | in linux)
:param return_stderr: return both stdout and stderr as a tuple. (normally only returns stdout)
The caller should pass this handle as inp= to run() via inp= at some point to actually execute it.
Returns None if we're in test-mode. (but still prints important debug output)
""" """
if valid_exitcodes is None:
valid_exitcodes = [0]
encoded_cmd = self._encode_cmd(cmd) encoded_cmd = self._encode_cmd(cmd)
# debug and test stuff # debug and test stuff
@ -103,35 +93,29 @@ class ExecuteNode(LogStub):
for c in encoded_cmd: for c in encoded_cmd:
debug_txt = debug_txt + " " + c.decode() debug_txt = debug_txt + " " + c.decode()
if pipe:
debug_txt = debug_txt + " |"
if self.readonly and not readonly:
self.debug("SKIP > " + debug_txt)
else:
if pipe:
self.debug("PIPE > " + debug_txt)
else:
self.debug("RUN > " + debug_txt)
# determine stdin # determine stdin
if inp is None: if inp is None:
# NOTE: Not None, otherwise it reads stdin from terminal! # NOTE: Not None, otherwise it reads stdin from terminal!
stdin = subprocess.PIPE stdin = subprocess.PIPE
elif isinstance(inp, str) or type(inp) == 'unicode': elif isinstance(inp, str) or type(inp) == 'unicode':
self.debug("INPUT > \n" + inp.rstrip()) self.debug("STDIN: \n" + inp.rstrip())
stdin = subprocess.PIPE stdin = subprocess.PIPE
elif isinstance(inp, subprocess.Popen): elif isinstance(inp, subprocess.Popen):
self.debug("Piping input") self.debug("PIPE to:")
stdin = inp.stdout stdin = inp.stdout
else: else:
raise (Exception("Program error: Incompatible input")) raise (Exception("Program error: Incompatible input"))
if self.readonly and not readonly: if self.readonly and not readonly:
# todo: what happens if input is piped? self.debug("CMDSKIP> " + debug_txt)
return return None
else:
self.debug("CMD > " + debug_txt)
# execute and parse/return results if self.readonly and not readonly:
return None
# create pipe
p = subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE) p = subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE)
# Note: make streaming? # Note: make streaming?
@ -141,9 +125,80 @@ class ExecuteNode(LogStub):
if p.stdin: if p.stdin:
p.stdin.close() p.stdin.close()
# return pipe return p
if pipe:
return p def run(self, cmd, inp=None, tab_split=False, valid_exitcodes=None, readonly=False, hide_errors=False,
return_stderr=False):
"""run a command on the node , checks output and parses/handle output and returns it
:param cmd: the actual command, should be a list, where the first item is the command
and the rest are parameters.
:param inp: Can be None, a string or a pipe-handle you got from get_pipe()
:param tab_split: split tabbed files in output into a list
:param valid_exitcodes: list of valid exit codes for this command (checks exit code of both sides of a pipe)
Use [] to accept all exit codes.
:param readonly: make this True if the command doesn't make any changes and is safe to execute in testmode
:param hide_errors: don't show stderr output as error, instead show it as debugging output (use to hide expected errors)
:param return_stderr: return both stdout and stderr as a tuple. (normally only returns stdout)
"""
if valid_exitcodes is None:
valid_exitcodes = [0]
# encoded_cmd = self._encode_cmd(cmd)
#
# # debug and test stuff
# debug_txt = ""
# for c in encoded_cmd:
# debug_txt = debug_txt + " " + c.decode()
#
# if pipe:
# debug_txt = debug_txt + " |"
#
# if self.readonly and not readonly:
# self.debug("SKIP > " + debug_txt)
# else:
# if pipe:
# self.debug("PIPE > " + debug_txt)
# else:
# self.debug("RUN > " + debug_txt)
#
# # determine stdin
# if inp is None:
# # NOTE: Not None, otherwise it reads stdin from terminal!
# stdin = subprocess.PIPE
# elif isinstance(inp, str) or type(inp) == 'unicode':
# self.debug("INPUT > \n" + inp.rstrip())
# stdin = subprocess.PIPE
# elif isinstance(inp, subprocess.Popen):
# self.debug("Piping input")
# stdin = inp.stdout
# else:
# raise (Exception("Program error: Incompatible input"))
# if self.readonly and not readonly:
# # todo: what happens if input is piped?
# return
#
# # execute and parse/return results
# p = subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE)
# # Note: make streaming?
# if isinstance(inp, str) or type(inp) == 'unicode':
# p.stdin.write(inp.encode('utf-8'))
#
# if p.stdin:
# p.stdin.close()
#
# # return pipe
# if pipe:
# return p
p = self.get_pipe(cmd, inp=inp, readonly=readonly)
if p is None:
return None
# handle all outputs # handle all outputs
if isinstance(inp, subprocess.Popen): if isinstance(inp, subprocess.Popen):
@ -206,9 +261,24 @@ class ExecuteNode(LogStub):
raise (subprocess.CalledProcessError(inp.returncode, "(pipe)")) raise (subprocess.CalledProcessError(inp.returncode, "(pipe)"))
if valid_exitcodes and p.returncode not in valid_exitcodes: if valid_exitcodes and p.returncode not in valid_exitcodes:
raise (subprocess.CalledProcessError(p.returncode, encoded_cmd)) raise (subprocess.CalledProcessError(p.returncode, self._encode_cmd(cmd)))
if return_stderr: if return_stderr:
return output_lines, error_lines return output_lines, error_lines
else: else:
return output_lines return output_lines
# def run_pipe(self, cmds, *args, **kwargs):
# """run a array of commands piped together. """
#
# #i
# if self.zfs_node.is_local():
# output_pipe = self.zfs_node.run(cmd, pipe=True)
# for pipe_cmd in output_pipes:
# output_pipe=self.zfs_node.run(pipe_cmd, inp=output_pipe, pipe=True, )
# return output_pipe
# #remote, so add with actual | and let remote shell handle it
# else:
# for pipe_cmd in output_pipes:
# cmd.append("|")
# cmd.extend(pipe_cmd)

View File

@ -104,11 +104,11 @@ class ZfsAutobackup:
help='show zfs progress output. Enabled automaticly on ttys. (use --no-progress to disable)') help='show zfs progress output. Enabled automaticly on ttys. (use --no-progress to disable)')
parser.add_argument('--no-progress', action='store_true', help=argparse.SUPPRESS) # needed to workaround a zfs recv -v bug parser.add_argument('--no-progress', action='store_true', help=argparse.SUPPRESS) # needed to workaround a zfs recv -v bug
parser.add_argument('--output-pipe', metavar="COMMAND", default=[], action='append', parser.add_argument('--send-pipe', metavar="COMMAND", default=[], action='append',
help='add zfs send output pipe command') help='pipe zfs send output through COMMAND')
parser.add_argument('--input-pipe', metavar="COMMAND", default=[], action='append', parser.add_argument('--recv-pipe', metavar="COMMAND", default=[], action='append',
help='add zfs recv input pipe command') help='pipe zfs recv input through COMMAND')
# note args is the only global variable we use, since its a global readonly setting anyway # note args is the only global variable we use, since its a global readonly setting anyway
args = parser.parse_args(argv) args = parser.parse_args(argv)
@ -222,7 +222,11 @@ class ZfsAutobackup:
# NOTE: this method also uses self.args. args that need extra processing are passed as function parameters: # NOTE: this method also uses self.args. args that need extra processing are passed as function parameters:
def sync_datasets(self, source_node, source_datasets, target_node): def sync_datasets(self, source_node, source_datasets, target_node):
"""Sync datasets, or thin-only on both sides""" """Sync datasets, or thin-only on both sides
:type target_node: ZfsNode
:type source_datasets: list of ZfsDataset
:type source_node: ZfsNode
"""
fail_count = 0 fail_count = 0
target_datasets = [] target_datasets = []
@ -255,7 +259,7 @@ class ZfsAutobackup:
raw=self.args.raw, also_other_snapshots=self.args.other_snapshots, raw=self.args.raw, also_other_snapshots=self.args.other_snapshots,
no_send=self.args.no_send, no_send=self.args.no_send,
destroy_incompatible=self.args.destroy_incompatible, destroy_incompatible=self.args.destroy_incompatible,
no_thinning=self.args.no_thinning) no_thinning=self.args.no_thinning, output_pipes=self.args.send_pipe, input_pipes=self.args.recv_pipe)
except Exception as e: except Exception as e:
fail_count = fail_count + 1 fail_count = fail_count + 1
source_dataset.error("FAILED: " + str(e)) source_dataset.error("FAILED: " + str(e))
@ -383,6 +387,7 @@ class ZfsAutobackup:
source_datasets=source_datasets, source_datasets=source_datasets,
target_node=target_node) target_node=target_node)
#no target specified, run in snapshot-only mode
else: else:
if not self.args.no_thinning: if not self.args.no_thinning:
self.thin_source(source_datasets) self.thin_source(source_datasets)
@ -390,7 +395,7 @@ class ZfsAutobackup:
if not fail_count: if not fail_count:
if self.args.test: if self.args.test:
self.set_title("All tests successfull.") self.set_title("All tests successful.")
else: else:
self.set_title("All operations completed successfully") self.set_title("All operations completed successfully")
if not self.args.target_path: if not self.args.target_path:

View File

@ -393,6 +393,7 @@ class ZfsDataset:
ZfsDataset ) ZfsDataset )
Args: Args:
:rtype: ZfsDataset
:type snapshot: str or ZfsDataset :type snapshot: str or ZfsDataset
""" """
@ -492,13 +493,14 @@ class ZfsDataset:
return self.from_names(names[1:]) return self.from_names(names[1:])
def send_pipe(self, features, prev_snapshot=None, resume_token=None, show_progress=False, raw=False): def send_pipe(self, features, prev_snapshot, resume_token, show_progress, raw, output_pipes):
"""returns a pipe with zfs send output for this snapshot """returns a pipe with zfs send output for this snapshot
resume_token: resume sending from this token. (in that case we don't resume_token: resume sending from this token. (in that case we don't
need to know snapshot names) need to know snapshot names)
Args: Args:
:type output_pipes: list of str
:type features: list of str :type features: list of str
:type prev_snapshot: ZfsDataset :type prev_snapshot: ZfsDataset
:type resume_token: str :type resume_token: str
@ -549,8 +551,22 @@ class ZfsDataset:
cmd.append(self.name) cmd.append(self.name)
# NOTE: this doesn't start the send yet, it only returns a subprocess.Pipe # #add custom output pipes?
return self.zfs_node.run(cmd, pipe=True) # if output_pipes:
# #local so do our own piping
# if self.zfs_node.is_local():
# output_pipe = self.zfs_node.run(cmd)
# for pipe_cmd in output_pipes:
# output_pipe=self.zfs_node.run(pipe_cmd, inp=output_pipe, )
# return output_pipe
# #remote, so add with actual | and let remote shell handle it
# else:
# for pipe_cmd in output_pipes:
# cmd.append("|")
# cmd.extend(pipe_cmd)
return self.zfs_node.get_pipe(cmd)
def recv_pipe(self, pipe, features, filter_properties=None, set_properties=None, ignore_exit_code=False): def recv_pipe(self, pipe, features, filter_properties=None, set_properties=None, ignore_exit_code=False):
"""starts a zfs recv for this snapshot and uses pipe as input """starts a zfs recv for this snapshot and uses pipe as input
@ -618,15 +634,17 @@ class ZfsDataset:
self.error("error during transfer") self.error("error during transfer")
raise (Exception("Target doesn't exist after transfer, something went wrong.")) raise (Exception("Target doesn't exist after transfer, something went wrong."))
def transfer_snapshot(self, target_snapshot, features, prev_snapshot=None, show_progress=False, def transfer_snapshot(self, target_snapshot, features, prev_snapshot, show_progress,
filter_properties=None, set_properties=None, ignore_recv_exit_code=False, resume_token=None, filter_properties, set_properties, ignore_recv_exit_code, resume_token,
raw=False): raw, output_pipes, input_pipes):
"""transfer this snapshot to target_snapshot. specify prev_snapshot for """transfer this snapshot to target_snapshot. specify prev_snapshot for
incremental transfer incremental transfer
connects a send_pipe() to recv_pipe() connects a send_pipe() to recv_pipe()
Args: Args:
:type output_pipes: list of str
:type input_pipes: list of str
:type target_snapshot: ZfsDataset :type target_snapshot: ZfsDataset
:type features: list of str :type features: list of str
:type prev_snapshot: ZfsDataset :type prev_snapshot: ZfsDataset
@ -657,7 +675,7 @@ class ZfsDataset:
# do it # do it
pipe = self.send_pipe(features=features, show_progress=show_progress, prev_snapshot=prev_snapshot, pipe = self.send_pipe(features=features, show_progress=show_progress, prev_snapshot=prev_snapshot,
resume_token=resume_token, raw=raw) resume_token=resume_token, raw=raw, output_pipes=output_pipes)
target_snapshot.recv_pipe(pipe, features=features, filter_properties=filter_properties, target_snapshot.recv_pipe(pipe, features=features, filter_properties=filter_properties,
set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code) set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code)
@ -898,6 +916,7 @@ class ZfsDataset:
"""plan where to start syncing and what to sync and what to keep """plan where to start syncing and what to sync and what to keep
Args: Args:
:rtype: ( ZfsDataset, ZfsDataset, list of ZfsDataset, list of ZfsDataset, list of ZfsDataset, list of ZfsDataset )
:type target_dataset: ZfsDataset :type target_dataset: ZfsDataset
:type also_other_snapshots: bool :type also_other_snapshots: bool
""" """
@ -945,11 +964,13 @@ class ZfsDataset:
def sync_snapshots(self, target_dataset, features, show_progress, filter_properties, set_properties, def sync_snapshots(self, target_dataset, features, show_progress, filter_properties, set_properties,
ignore_recv_exit_code, holds, rollback, raw, also_other_snapshots, ignore_recv_exit_code, holds, rollback, raw, also_other_snapshots,
no_send, destroy_incompatible, no_thinning): no_send, destroy_incompatible, no_thinning, output_pipes, input_pipes):
"""sync this dataset's snapshots to target_dataset, while also thinning """sync this dataset's snapshots to target_dataset, while also thinning
out old snapshots along the way. out old snapshots along the way.
Args: Args:
:type output_pipes: list of str
:type input_pipes: list of str
:type target_dataset: ZfsDataset :type target_dataset: ZfsDataset
:type features: list of str :type features: list of str
:type show_progress: bool :type show_progress: bool
@ -1000,13 +1021,14 @@ class ZfsDataset:
if target_snapshot not in target_obsoletes: if target_snapshot not in target_obsoletes:
# NOTE: should we let transfer_snapshot handle this? # NOTE: should we let transfer_snapshot handle this?
(allowed_filter_properties, allowed_set_properties) = self.get_allowed_properties(filter_properties, (allowed_filter_properties, allowed_set_properties) = self.get_allowed_properties(filter_properties,
set_properties) set_properties)
source_snapshot.transfer_snapshot(target_snapshot, features=features, source_snapshot.transfer_snapshot(target_snapshot, features=features,
prev_snapshot=prev_source_snapshot, show_progress=show_progress, prev_snapshot=prev_source_snapshot, show_progress=show_progress,
filter_properties=allowed_filter_properties, filter_properties=allowed_filter_properties,
set_properties=allowed_set_properties, set_properties=allowed_set_properties,
ignore_recv_exit_code=ignore_recv_exit_code, ignore_recv_exit_code=ignore_recv_exit_code,
resume_token=resume_token, raw=raw) resume_token=resume_token, raw=raw, output_pipes=output_pipes, input_pipes=input_pipes)
resume_token = None resume_token = None
# hold the new common snapshots and release the previous ones # hold the new common snapshots and release the previous ones