cmdpipe manual piping/parallel executing tested and done

This commit is contained in:
Edwin Eefting
2022-01-27 18:22:20 +01:00
parent a8b43c286f
commit 2ffd3baf77
3 changed files with 129 additions and 52 deletions

View File

@ -121,3 +121,55 @@ class TestCmdPipe(unittest2.TestCase):
self.assertEqual(out, [])
self.assertTrue(executed)
def test_no_handlers(self):
with self.assertRaises(Exception):
p=CmdPipe()
p.add(CmdItem([ "echo" ]))
p.execute()
#NOTE: this will give some resource warnings
def test_manual_pipes(self):
# manual piping means: a command in the pipe has a stdout_handler, which is responsible for sending the data into the next item of the pipe.
result=[]
def stdout_handler(line):
item2.process.stdin.write(line.encode('utf8'))
# item2.process.stdin.close()
item1=CmdItem(["echo", "test"], stdout_handler=stdout_handler)
item2=CmdItem(["tr", "e", "E"], stdout_handler=lambda line: result.append(line))
p=CmdPipe()
p.add(item1)
p.add(item2)
p.execute()
self.assertEqual(result, ["tEst"])
def test_multiprocess(self):
#dont do any piping at all, just run multiple processes and handle outputs
result1=[]
result2=[]
result3=[]
item1=CmdItem(["echo", "test1"], stdout_handler=lambda line: result1.append(line))
item2=CmdItem(["echo", "test2"], stdout_handler=lambda line: result2.append(line))
item3=CmdItem(["echo", "test3"], stdout_handler=lambda line: result3.append(line))
p=CmdPipe()
p.add(item1)
p.add(item2)
p.add(item3)
p.execute()
self.assertEqual(result1, ["test1"])
self.assertEqual(result2, ["test2"])
self.assertEqual(result3, ["test3"])

View File

@ -164,8 +164,8 @@ class TestExecuteNode(unittest2.TestCase):
#
# nodea=ExecuteNode(debug_output=True, ssh_to="localhost")
#
# cmd_pipe=nodea.script(lines=["echo line1", "echo line 2"])
# cmd_pipe.execute(stdout_handler)
# cmd_pipe=nodea.script(lines=["echo line1", "echo line 2"], stdout_handler=stdout_handler)
# cmd_pipe.execute()

View File

@ -1,8 +1,16 @@
# This is the low level process executing stuff.
# It makes piping multiple process easier.
# It makes piping and parallel process handling more easy.
# You can specify a handler for each line of stderr output for each item in the pipe.
# Every item also has its own exitcode handler.
# There is one stdout handler for the last item in the pipe. This is also called for each line.
# Normally you add a stdout_handler to the last item in the pipe.
# However: You can also add stdout_handler to other items in a pipe. This will turn that item in to a manual pipe: your
# handler is responsible for sending data into the next item of the pipe. (avaiable in item.next)
# You can also use manual pipe mode to just execute multiple command in parallel and handle their output parallel,
# without doing any actual pipe stuff. (because you dont HAVE to send data into the next item.)
import subprocess
import os
@ -33,7 +41,7 @@ class CmdItem:
self.exit_handler = exit_handler
self.shell = shell
self.process = None
self.next_item = None #next item in pipe, set by CmdPipe
self.next = None #next item in pipe, set by CmdPipe
def __str__(self):
"""return copy-pastable version of command."""
@ -97,55 +105,38 @@ class CmdPipe:
return self._should_execute
def execute(self):
"""run the pipe. returns True all exit handlers returned true"""
"""run the pipe. returns True all exit handlers returned true. (otherwise it will be False/None depending on exit handlers returncode) """
if not self._should_execute:
return True
selectors = []
selectors = self.__create()
# create processes
last_stdout = None
next_stdin = subprocess.PIPE # means we write input via python instead of an actual system pipe
first=True
prev_item=None
if not selectors:
raise (Exception("Cant use cmdpipe without any output handlers."))
self.__process_outputs(selectors)
# close filehandles
for item in self.items:
item.process.stderr.close()
item.process.stdout.close()
#creates the actual subprocess via subprocess.popen
item.create(next_stdin)
# call exit handlers
success = True
for item in self.items:
if item.exit_handler is not None:
success=item.exit_handler(item.process.returncode) and success
#we piped previous process? dont forget to close its stdout
if next_stdin != subprocess.PIPE:
next_stdin.close()
return success
selectors.append(item.process.stderr)
# we're the first process in the pipe
if first:
if self.inp is not None:
#write the input we have
item.process.stdin.write(self.inp.encode('utf-8'))
item.process.stdin.close()
first=False
#manual stdout handling or pipe it to the next process?
if item.stdout_handler is None:
# no manual stdout handling, pipe it to the next process via sytem pipe
next_stdin=item.process.stdout
else:
# manual stdout handling via python
selectors.append(item.process.stdout)
# next process will get input from python:
next_stdin= subprocess.PIPE
if prev_item is not None:
prev_item.next=item
prev_item=item
def __process_outputs(self, selectors):
"""watch all output selectors and call handlers"""
while True:
# wait for output on one of the stderrs or last_stdout
(read_ready, write_ready, ex_ready) = select.select(selectors, [], [])
eof_count = 0
done_count = 0
@ -165,6 +156,8 @@ class CmdPipe:
item.stdout_handler(line)
else:
eof_count = eof_count + 1
if item.next:
item.next.process.stdin.close()
if item.process.poll() is not None:
done_count = done_count + 1
@ -173,16 +166,48 @@ class CmdPipe:
if eof_count == len(selectors) and done_count == len(self.items):
break
# close filehandles
for item in self.items:
item.process.stderr.close()
item.process.stdout.close()
# item.process.stdin.close()
# call exit handlers
success = True
for item in self.items:
if item.exit_handler is not None:
success=item.exit_handler(item.process.returncode) and success
return success
def __create(self):
"""create actual processes, do piping and return selectors."""
selectors = []
next_stdin = subprocess.PIPE # means we write input via python instead of an actual system pipe
first = True
prev_item = None
for item in self.items:
# creates the actual subprocess via subprocess.popen
item.create(next_stdin)
# we piped previous process? dont forget to close its stdout
if next_stdin != subprocess.PIPE:
next_stdin.close()
if item.stderr_handler:
selectors.append(item.process.stderr)
# we're the first process in the pipe
if first:
if self.inp is not None:
# write the input we have
item.process.stdin.write(self.inp.encode('utf-8'))
item.process.stdin.close()
first = False
# manual stdout handling or pipe it to the next process?
if item.stdout_handler is None:
# no manual stdout handling, pipe it to the next process via sytem pipe
next_stdin = item.process.stdout
else:
# manual stdout handling via python
selectors.append(item.process.stdout)
# next process will get input from python:
next_stdin = subprocess.PIPE
if prev_item is not None:
prev_item.next = item
prev_item = item
return selectors