refactorred stdout piping a bit to allow manual piping
This commit is contained in:
		| @ -17,17 +17,23 @@ except ImportError: | ||||
| class CmdItem: | ||||
|     """one command item, to be added to a CmdPipe""" | ||||
|  | ||||
|     def __init__(self, cmd, readonly=False, stderr_handler=None, exit_handler=None, shell=False): | ||||
|     def __init__(self, cmd, readonly=False, stderr_handler=None, exit_handler=None, stdout_handler=None, shell=False): | ||||
|         """create item. caller has to make sure cmd is properly escaped when using shell. | ||||
|  | ||||
|         If stdout_handler is None, it will connect the stdout to the stdin of the next item in the pipe, like | ||||
|         and actual system pipe. (no python overhead) | ||||
|  | ||||
|         :type cmd: list of str | ||||
|         """ | ||||
|  | ||||
|         self.cmd = cmd | ||||
|         self.readonly = readonly | ||||
|         self.stderr_handler = stderr_handler | ||||
|         self.stdout_handler = stdout_handler | ||||
|         self.exit_handler = exit_handler | ||||
|         self.shell = shell | ||||
|         self.process = None | ||||
|         self.next_item = None #next item in pipe, set by CmdPipe | ||||
|  | ||||
|     def __str__(self): | ||||
|         """return copy-pastable version of command.""" | ||||
| @ -90,38 +96,52 @@ class CmdPipe: | ||||
|     def should_execute(self): | ||||
|         return self._should_execute | ||||
|  | ||||
|     def execute(self, stdout_handler): | ||||
|     def execute(self): | ||||
|         """run the pipe. returns True all exit handlers returned true""" | ||||
|  | ||||
|         if not self._should_execute: | ||||
|             return True | ||||
|  | ||||
|         # first process should have actual user input as stdin: | ||||
|         selectors = [] | ||||
|  | ||||
|         # create processes | ||||
|         last_stdout = None | ||||
|         stdin = subprocess.PIPE | ||||
|         next_stdin = subprocess.PIPE # means we write input via python instead of an actual system pipe | ||||
|         first=True | ||||
|         prev_item=None | ||||
|         for item in self.items: | ||||
|  | ||||
|             item.create(stdin) | ||||
|             #creates the actual subprocess via subprocess.popen | ||||
|             item.create(next_stdin) | ||||
|  | ||||
|             #we piped previous process? dont forget to close its stdout | ||||
|             if next_stdin != subprocess.PIPE: | ||||
|                 next_stdin.close() | ||||
|  | ||||
|             selectors.append(item.process.stderr) | ||||
|  | ||||
|             if last_stdout is None: | ||||
|                 # we're the first process in the pipe, do we have some input? | ||||
|             # we're the first process in the pipe | ||||
|             if first: | ||||
|                 if self.inp is not None: | ||||
|                     # TODO: make streaming to support big inputs? | ||||
|                     #write the input we have | ||||
|                     item.process.stdin.write(self.inp.encode('utf-8')) | ||||
|                 item.process.stdin.close() | ||||
|                 first=False | ||||
|  | ||||
|             #manual stdout handling or pipe it to the next process? | ||||
|             if item.stdout_handler is None: | ||||
|                 # no manual stdout handling, pipe it to the next process via sytem pipe | ||||
|                 next_stdin=item.process.stdout | ||||
|             else: | ||||
|                 # last stdout was piped to this stdin already, so close it because we dont need it anymore | ||||
|                 last_stdout.close() | ||||
|                 # manual stdout handling via python | ||||
|                 selectors.append(item.process.stdout) | ||||
|                 # next process will get input from python: | ||||
|                 next_stdin= subprocess.PIPE | ||||
|  | ||||
|             last_stdout = item.process.stdout | ||||
|             stdin = last_stdout | ||||
|             if prev_item is not None: | ||||
|                 prev_item.next=item | ||||
|  | ||||
|         # monitor last stdout as well | ||||
|         selectors.append(last_stdout) | ||||
|             prev_item=item | ||||
|  | ||||
|         while True: | ||||
|             # wait for output on one of the stderrs or last_stdout | ||||
| @ -130,12 +150,6 @@ class CmdPipe: | ||||
|             done_count = 0 | ||||
|  | ||||
|             # read line and call appropriate handlers | ||||
|             if last_stdout in read_ready: | ||||
|                 line = last_stdout.readline().decode('utf-8').rstrip() | ||||
|                 if line != "": | ||||
|                     stdout_handler(line) | ||||
|                 else: | ||||
|                     eof_count = eof_count + 1 | ||||
|  | ||||
|             for item in self.items: | ||||
|                 if item.process.stderr in read_ready: | ||||
| @ -145,6 +159,13 @@ class CmdPipe: | ||||
|                     else: | ||||
|                         eof_count = eof_count + 1 | ||||
|  | ||||
|                 if item.process.stdout in read_ready: | ||||
|                     line = item.process.stdout.readline().decode('utf-8').rstrip() | ||||
|                     if line != "": | ||||
|                         item.stdout_handler(line) | ||||
|                     else: | ||||
|                         eof_count = eof_count + 1 | ||||
|  | ||||
|                 if item.process.poll() is not None: | ||||
|                     done_count = done_count + 1 | ||||
|  | ||||
| @ -153,9 +174,10 @@ class CmdPipe: | ||||
|                 break | ||||
|  | ||||
|         # close filehandles | ||||
|         last_stdout.close() | ||||
|         for item in self.items: | ||||
|             item.process.stderr.close() | ||||
|             item.process.stdout.close() | ||||
|             # item.process.stdin.close() | ||||
|  | ||||
|         # call exit handlers | ||||
|         success = True | ||||
|  | ||||
| @ -144,23 +144,28 @@ class ExecuteNode(LogStub): | ||||
|  | ||||
|             return True | ||||
|  | ||||
|         # add shell command and handlers to pipe | ||||
|         cmd_item=CmdItem(cmd=self._shell_cmd(cmd, cwd), readonly=readonly, stderr_handler=stderr_handler, exit_handler=exit_handler, shell=self.is_local()) | ||||
|         cmd_pipe.add(cmd_item) | ||||
|  | ||||
|         # return pipe instead of executing? | ||||
|         if pipe: | ||||
|             return cmd_pipe | ||||
|  | ||||
|         # stdout parser | ||||
|         output_lines = [] | ||||
|  | ||||
|         def stdout_handler(line): | ||||
|             if tab_split: | ||||
|                 output_lines.append(line.rstrip().split('\t')) | ||||
|             else: | ||||
|                 output_lines.append(line.rstrip()) | ||||
|             self._parse_stdout(line) | ||||
|         if pipe: | ||||
|             # dont specify output handler, so it will get piped to next process | ||||
|             stdout_handler=None | ||||
|         else: | ||||
|             # handle output manually, dont pipe it | ||||
|             def stdout_handler(line): | ||||
|                 if tab_split: | ||||
|                     output_lines.append(line.rstrip().split('\t')) | ||||
|                 else: | ||||
|                     output_lines.append(line.rstrip()) | ||||
|                 self._parse_stdout(line) | ||||
|  | ||||
|         # add shell command and handlers to pipe | ||||
|         cmd_item=CmdItem(cmd=self._shell_cmd(cmd, cwd), readonly=readonly, stderr_handler=stderr_handler, exit_handler=exit_handler, shell=self.is_local(), stdout_handler=stdout_handler) | ||||
|         cmd_pipe.add(cmd_item) | ||||
|  | ||||
|         # return CmdPipe instead of executing? | ||||
|         if pipe: | ||||
|             return cmd_pipe | ||||
|  | ||||
|         if cmd_pipe.should_execute(): | ||||
|             self.debug("CMD    > {}".format(cmd_pipe)) | ||||
| @ -168,7 +173,7 @@ class ExecuteNode(LogStub): | ||||
|             self.debug("CMDSKIP> {}".format(cmd_pipe)) | ||||
|  | ||||
|         # execute and calls handlers in CmdPipe | ||||
|         if not cmd_pipe.execute(stdout_handler=stdout_handler): | ||||
|         if not cmd_pipe.execute(): | ||||
|             raise(ExecuteError("Last command returned error")) | ||||
|  | ||||
|         if return_all: | ||||
|  | ||||
		Reference in New Issue
	
	Block a user