Compare commits

...

67 Commits

Author SHA1 Message Date
244509a006 added console reference 2022-03-08 17:51:23 +01:00
f9d3576752 nicer errors 2022-03-08 17:35:51 +01:00
75161c1bd2 refactorred ZfsCheck.py for better sigpipe handling 2022-03-08 17:22:08 +01:00
5d7d6f6a6c remove random 2022-03-07 23:11:46 +01:00
7c372cf211 test check skipping 2022-03-07 22:59:50 +01:00
8854303b7a test skipping 2022-03-07 21:57:36 +01:00
233745c345 reworking block skipper 2022-03-07 21:08:56 +01:00
b68ca19e5f wip 2022-03-07 19:34:13 +01:00
28ed44b1c8 wip 2022-03-07 19:34:01 +01:00
1cedea5f5f zfscheck wip 2022-02-23 21:31:00 +01:00
d99c202e75 fix 2022-02-23 21:21:07 +01:00
44c6896ddd merged v3.1.2-rc2 2022-02-23 20:43:49 +01:00
e4356cb516 Added -F (--force) to allow 1:1 replication. 2022-02-23 18:36:03 +01:00
cab2f98bb8 Better strip path handling and collision checking. Now also supports stripping so much it ends up on a pool-target.
Fixes #102, #117
2022-02-23 17:47:50 +01:00
8276d07feb fix 2022-02-22 19:52:16 +01:00
82ad7c2480 more tests 2022-02-22 19:25:15 +01:00
f29cf13db3 test compare as well 2022-02-22 18:48:51 +01:00
0c6c75bf58 cleaner progress clearing 2022-02-22 18:41:54 +01:00
f4e81bddb7 progress output 2022-02-22 18:00:06 +01:00
f530cf40f3 fixes. supports stdin 2022-02-22 17:40:38 +01:00
e7e1590919 can also be used on paths and files now 2022-02-22 17:18:15 +01:00
0d882ec031 comparing input now functions 2022-02-22 16:59:08 +01:00
6a58a294a3 now yields errors and mismatches 2022-02-22 14:47:15 +01:00
3f755fcc69 moved tests 2022-02-21 22:38:56 +01:00
d7d76032de more tests 2022-02-21 22:37:13 +01:00
b7e10242b9 itertools is nice :) 2022-02-21 21:39:03 +01:00
bcc7983492 tree compare 2022-02-21 17:51:23 +01:00
490b293ba1 block compare 2022-02-21 14:27:22 +01:00
2d42d1d1a5 forgot a test 2022-02-21 14:02:45 +01:00
a2f85690a3 extract BlockHasher and TreeHasher classes 2022-02-21 13:49:05 +01:00
a807ec320e zfs-check broken pipe handling tests for volumes 2022-02-21 13:01:45 +01:00
3e6a327647 zfs-check broken pipe handling tests 2022-02-21 12:31:19 +01:00
ed61f03b4b zfs-check fixes and tests 2022-02-21 11:40:40 +01:00
f397e7be59 python2 compat 2022-02-21 11:01:07 +01:00
b60dd4c109 wip (will usse zfs-check to do actual hashing) 2022-02-21 00:46:54 +01:00
10a85ff0b7 fixes 2022-02-21 00:46:36 +01:00
770389156a test basicas of zfscheck 2022-02-21 00:44:38 +01:00
bb9ce25a37 correct brokenpipe handling 2022-02-21 00:02:30 +01:00
2fe008acf5 zfs-check basic version complete 2022-02-20 18:03:17 +01:00
14c45d2b34 zfs check initial version (wip) 2022-02-20 17:39:17 +01:00
a115f0bd17 zfs check initial version (wip) 2022-02-20 17:30:02 +01:00
626c84fe47 test data 2022-02-20 13:04:49 +01:00
4d27b3b6ea incremental block hasher (for zfs-verify) 2022-02-20 12:59:43 +01:00
3ca1bce9b2 extracted clibase class (for zfs-check tool) 2022-02-20 11:32:43 +01:00
f0d00aa4e8 extracted clibase class (for zfs-check tool) 2022-02-20 11:03:57 +01:00
60560b884b cleaned up progress stuff 2022-02-19 18:10:10 +01:00
af9d768410 Merge pull request #118 from xrobau/master
Fix MB/s calculations on multiple transfers
2022-02-19 18:00:02 +01:00
f990c2565a Update README.md 2022-02-19 08:09:16 +01:00
af179fa424 Update README.md 2022-02-19 08:03:05 +01:00
355aa0e84b Create codeql-analysis.yml 2022-02-19 07:45:55 +01:00
494b41f4f1 Fix MB/s calculations on multiple transfers 2022-02-17 16:15:05 +10:00
ef532d3ffb cleanup 2022-02-09 14:25:22 +01:00
7109873884 added pipe=true parameter to script 2022-02-09 14:18:10 +01:00
acb0172ddf more tests 2022-02-09 12:24:24 +01:00
53db61de96 Merge pull request #116 from parke/master
Fix two typos in README.md.
2022-02-05 08:40:55 +01:00
3a947e5fee Fix two typos in README.md. 2022-02-04 22:50:47 -08:00
8233e7b35e script mode testing and fixes 2022-01-29 10:10:18 +01:00
e1fb7a37be script mode testing and fixes 2022-01-28 23:59:50 +01:00
2ffd3baf77 cmdpipe manual piping/parallel executing tested and done 2022-01-27 18:22:20 +01:00
a8b43c286f suppress exclude recieved warning when its already specified. #101 2022-01-27 16:12:17 +01:00
609ad19dd9 refactorred stdout piping a bit to allow manual piping 2022-01-27 13:02:41 +01:00
f2761ecee8 Merge remote-tracking branch 'origin/master' 2022-01-27 11:16:32 +01:00
86706ca24f script mode wip 2022-01-27 11:16:19 +01:00
88d856d813 previous changes and this fix improved caching (less runs in test_scaling.py) 2022-01-27 11:02:11 +01:00
81d0bee7ae comments 2022-01-26 23:59:13 +01:00
fa3f44a045 replaced tar verification with much better find/md5sum. 2022-01-24 23:25:55 +01:00
02dca218b8 ExecuteNode.py now supports running from a certain directory 2022-01-24 23:08:09 +01:00
34 changed files with 1968 additions and 446 deletions

70
.github/workflows/codeql-analysis.yml vendored Normal file
View File

@ -0,0 +1,70 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
on:
push:
branches: [ master ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master ]
schedule:
- cron: '26 23 * * 3'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'python' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://git.io/codeql-language-support
steps:
- name: Checkout repository
uses: actions/checkout@v2
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1

View File

@ -2,6 +2,7 @@
# ZFS autobackup
[![Tests](https://github.com/psy0rz/zfs_autobackup/workflows/Regression%20tests/badge.svg)](https://github.com/psy0rz/zfs_autobackup/actions?query=workflow%3A%22Regression+tests%22) [![Coverage Status](https://coveralls.io/repos/github/psy0rz/zfs_autobackup/badge.svg)](https://coveralls.io/github/psy0rz/zfs_autobackup) [![Python Package](https://github.com/psy0rz/zfs_autobackup/workflows/Upload%20Python%20Package/badge.svg)](https://pypi.org/project/zfs-autobackup/)
[![CodeQL](https://github.com/psy0rz/zfs_autobackup/actions/workflows/codeql-analysis.yml/badge.svg)](https://github.com/psy0rz/zfs_autobackup/actions/workflows/codeql-analysis.yml)
## Introduction
@ -13,9 +14,9 @@ You can select what to backup by setting a custom `ZFS property`. This makes it
Other settings are just specified on the commandline: Simply setup and test your zfs-autobackup command and fix all the issues you might encounter. When you're done you can just copy/paste your command to a cron or script.
Since its using ZFS commands, you can see what it's actually doing by specifying `--debug`. This also helps a lot if you run into some strange problem or error. You can just copy-paste the command that fails and play around with it on the commandline. (something I missed in other tools)
Since it's using ZFS commands, you can see what it's actually doing by specifying `--debug`. This also helps a lot if you run into some strange problem or error. You can just copy-paste the command that fails and play around with it on the commandline. (something I missed in other tools)
An important feature thats missing from other tools is a reliable `--test` option: This allows you to see what zfs-autobackup will do and tune your parameters. It will do everything, except make changes to your system.
An important feature that's missing from other tools is a reliable `--test` option: This allows you to see what zfs-autobackup will do and tune your parameters. It will do everything, except make changes to your system.
## Features
@ -41,7 +42,7 @@ An important feature thats missing from other tools is a reliable `--test` optio
* Uses progressive thinning for older snapshots.
* Uses zfs-holds on important snapshots to prevent accidental deletion.
* Automatic resuming of failed transfers.
* Easy migration from existing zfs backups.
* Easy migration from other zfs backup systems to zfs-autobackup.
* Gracefully handles datasets that no longer exist on source.
* Complete and clean logging.
* Easy installation:

View File

@ -20,6 +20,7 @@ setuptools.setup(
[
'zfs-autobackup = zfs_autobackup.ZfsAutobackup:cli',
'zfs-autoverify = zfs_autobackup.ZfsAutoverify:cli',
'zfs-check = zfs_autobackup.ZfsCheck:cli',
]
},
packages=setuptools.find_packages(),

View File

@ -12,6 +12,8 @@ import time
from pprint import *
from zfs_autobackup.ZfsAutobackup import *
from zfs_autobackup.ZfsAutoverify import *
from zfs_autobackup.ZfsCheck import *
from zfs_autobackup.util import *
from mock import *
import contextlib
import sys

0
tests/data/empty Normal file
View File

1
tests/data/partial Normal file
View File

@ -0,0 +1 @@
xC<78><43>ʟ<EFBFBD>ZG<5A><47>М<EFBFBD><D09C><EFBFBD>?<3F><><1D>ZG<>#<0F><>,<>ƻ<>Q=<3D>><3E>ك1<D983>NU<4E><15>u<>{Zj;<3B>`<60><19><19><>Dv<44><76>Q<EFBFBD>j<EFBFBD>voQFN<46><4E><EFBFBD><EFBFBD><EFBFBD>;3Sa<53>R<EFBFBD>^2Z<32><5A>

BIN
tests/data/whole Normal file

Binary file not shown.

BIN
tests/data/whole2 Normal file

Binary file not shown.

BIN
tests/data/whole_whole2 Normal file

Binary file not shown.

Binary file not shown.

View File

@ -18,6 +18,7 @@ if ! [ -e /root/.ssh/id_rsa ]; then
ssh -oStrictHostKeyChecking=no localhost true || exit 1
fi
umount /tmp/ZfsCheck*
coverage run --branch --source zfs_autobackup -m unittest discover -vvvvf $SCRIPTDIR $@ 2>&1
EXIT=$?

157
tests/test_blockhasher.py Normal file
View File

@ -0,0 +1,157 @@
from basetest import *
from zfs_autobackup.BlockHasher import BlockHasher
# make VERY sure this works correctly under all circumstances.
# sha1 sums of files, (bs=4096)
# da39a3ee5e6b4b0d3255bfef95601890afd80709 empty
# 642027d63bb0afd7e0ba197f2c66ad03e3d70de1 partial
# 3c0bf91170d873b8e327d3bafb6bc074580d11b7 whole
# 2e863f1fcccd6642e4e28453eba10d2d3f74d798 whole2
# 959e6b58078f0cfd2fb3d37e978fda51820473ff whole_whole2
# 309ffffba2e1977d12f3b7469971f30d28b94bd8 whole_whole2_partial
class TestBlockHasher(unittest2.TestCase):
def setUp(self):
pass
def test_empty(self):
block_hasher = BlockHasher(count=1)
self.assertEqual(
list(block_hasher.generate("tests/data/empty")),
[]
)
def test_partial(self):
block_hasher = BlockHasher(count=1)
self.assertEqual(
list(block_hasher.generate("tests/data/partial")),
[(0, "642027d63bb0afd7e0ba197f2c66ad03e3d70de1")]
)
def test_whole(self):
block_hasher = BlockHasher(count=1)
self.assertEqual(
list(block_hasher.generate("tests/data/whole")),
[(0, "3c0bf91170d873b8e327d3bafb6bc074580d11b7")]
)
def test_whole2(self):
block_hasher = BlockHasher(count=1)
self.assertEqual(
list(block_hasher.generate("tests/data/whole_whole2")),
[
(0, "3c0bf91170d873b8e327d3bafb6bc074580d11b7"),
(1, "2e863f1fcccd6642e4e28453eba10d2d3f74d798")
]
)
def test_wwp(self):
block_hasher = BlockHasher(count=1)
self.assertEqual(
list(block_hasher.generate("tests/data/whole_whole2_partial")),
[
(0, "3c0bf91170d873b8e327d3bafb6bc074580d11b7"), # whole
(1, "2e863f1fcccd6642e4e28453eba10d2d3f74d798"), # whole2
(2, "642027d63bb0afd7e0ba197f2c66ad03e3d70de1") # partial
]
)
def test_wwp_count2(self):
block_hasher = BlockHasher(count=2)
self.assertEqual(
list(block_hasher.generate("tests/data/whole_whole2_partial")),
[
(0, "959e6b58078f0cfd2fb3d37e978fda51820473ff"), # whole_whole2
(1, "642027d63bb0afd7e0ba197f2c66ad03e3d70de1") # partial
]
)
def test_big(self):
block_hasher = BlockHasher(count=10)
self.assertEqual(
list(block_hasher.generate("tests/data/whole_whole2_partial")),
[
(0, "309ffffba2e1977d12f3b7469971f30d28b94bd8"), # whole_whole2_partial
])
def test_blockhash_compare(self):
#no errors
block_hasher = BlockHasher(count=1)
generator = block_hasher.generate("tests/data/whole_whole2_partial")
self.assertEqual([], list(block_hasher.compare("tests/data/whole_whole2_partial", generator)))
#compare file is smaller (EOF errors)
block_hasher = BlockHasher(count=1)
generator = block_hasher.generate("tests/data/whole_whole2_partial")
self.assertEqual(
[(1, '2e863f1fcccd6642e4e28453eba10d2d3f74d798', 'EOF'),
(2, '642027d63bb0afd7e0ba197f2c66ad03e3d70de1', 'EOF')],
list(block_hasher.compare("tests/data/whole", generator)))
#no errors, huge chunks
block_hasher = BlockHasher(count=10)
generator = block_hasher.generate("tests/data/whole_whole2_partial")
self.assertEqual([], list(block_hasher.compare("tests/data/whole_whole2_partial", generator)))
# different order to make sure seek functions are ok
block_hasher = BlockHasher(count=1)
checksums = list(block_hasher.generate("tests/data/whole_whole2_partial"))
checksums.reverse()
self.assertEqual([], list(block_hasher.compare("tests/data/whole_whole2_partial", checksums)))
def test_skip1(self):
block_hasher = BlockHasher(count=1, skip=1)
self.assertEqual(
list(block_hasher.generate("tests/data/whole_whole2_partial")),
[
(0, "3c0bf91170d873b8e327d3bafb6bc074580d11b7"), # whole
# (1, "2e863f1fcccd6642e4e28453eba10d2d3f74d798"), # whole2
(2, "642027d63bb0afd7e0ba197f2c66ad03e3d70de1") # partial
]
)
#should continue the pattern on the next file:
self.assertEqual(
list(block_hasher.generate("tests/data/whole_whole2_partial")),
[
# (0, "3c0bf91170d873b8e327d3bafb6bc074580d11b7"), # whole
(1, "2e863f1fcccd6642e4e28453eba10d2d3f74d798"), # whole2
# (2, "642027d63bb0afd7e0ba197f2c66ad03e3d70de1") # partial
]
)
def test_skip6(self):
block_hasher = BlockHasher(count=1, skip=6)
self.assertEqual(
list(block_hasher.generate("tests/data/whole_whole2_partial")),
[
(0, "3c0bf91170d873b8e327d3bafb6bc074580d11b7"), # whole
# (1, "2e863f1fcccd6642e4e28453eba10d2d3f74d798"), # whole2
# (2, "642027d63bb0afd7e0ba197f2c66ad03e3d70de1") # partial
]
)
#all blocks of next file are skipped
self.assertEqual(
list(block_hasher.generate("tests/data/whole_whole2_partial")),
[
# (0, "3c0bf91170d873b8e327d3bafb6bc074580d11b7"), # whole
# (1, "2e863f1fcccd6642e4e28453eba10d2d3f74d798"), # whole2
# (2, "642027d63bb0afd7e0ba197f2c66ad03e3d70de1") # partial
]
)
#first block of this one is the 6th to be skipped:
self.assertEqual(
list(block_hasher.generate("tests/data/whole_whole2_partial")),
[
# (0, "3c0bf91170d873b8e327d3bafb6bc074580d11b7"), # whole
(1, "2e863f1fcccd6642e4e28453eba10d2d3f74d798"), # whole2
# (2, "642027d63bb0afd7e0ba197f2c66ad03e3d70de1") # partial
]
)
#NOTE: compare doesnt use skip. thats the job of its input generator

View File

@ -9,8 +9,8 @@ class TestCmdPipe(unittest2.TestCase):
p=CmdPipe(readonly=False, inp=None)
err=[]
out=[]
p.add(CmdItem(["ls", "-d", "/", "/", "/nonexistent"], stderr_handler=lambda line: err.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2)))
executed=p.execute(stdout_handler=lambda line: out.append(line))
p.add(CmdItem(["ls", "-d", "/", "/", "/nonexistent"], stderr_handler=lambda line: err.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2), stdout_handler=lambda line: out.append(line)))
executed=p.execute()
self.assertEqual(err, ["ls: cannot access '/nonexistent': No such file or directory"])
self.assertEqual(out, ["/","/"])
@ -21,8 +21,8 @@ class TestCmdPipe(unittest2.TestCase):
p=CmdPipe(readonly=False, inp="test")
err=[]
out=[]
p.add(CmdItem(["cat"], stderr_handler=lambda line: err.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0)))
executed=p.execute(stdout_handler=lambda line: out.append(line))
p.add(CmdItem(["cat"], stderr_handler=lambda line: err.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0), stdout_handler=lambda line: out.append(line) ))
executed=p.execute()
self.assertEqual(err, [])
self.assertEqual(out, ["test"])
@ -37,8 +37,8 @@ class TestCmdPipe(unittest2.TestCase):
out=[]
p.add(CmdItem(["echo", "test"], stderr_handler=lambda line: err1.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0)))
p.add(CmdItem(["tr", "e", "E"], stderr_handler=lambda line: err2.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0)))
p.add(CmdItem(["tr", "t", "T"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0)))
executed=p.execute(stdout_handler=lambda line: out.append(line))
p.add(CmdItem(["tr", "t", "T"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0), stdout_handler=lambda line: out.append(line)))
executed=p.execute()
self.assertEqual(err1, [])
self.assertEqual(err2, [])
@ -58,8 +58,8 @@ class TestCmdPipe(unittest2.TestCase):
out=[]
p.add(CmdItem(["ls", "/nonexistent1"], stderr_handler=lambda line: err1.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2)))
p.add(CmdItem(["ls", "/nonexistent2"], stderr_handler=lambda line: err2.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2)))
p.add(CmdItem(["ls", "/nonexistent3"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2)))
executed=p.execute(stdout_handler=lambda line: out.append(line))
p.add(CmdItem(["ls", "/nonexistent3"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2), stdout_handler=lambda line: out.append(line)))
executed=p.execute()
self.assertEqual(err1, ["ls: cannot access '/nonexistent1': No such file or directory"])
self.assertEqual(err2, ["ls: cannot access '/nonexistent2': No such file or directory"])
@ -76,8 +76,8 @@ class TestCmdPipe(unittest2.TestCase):
out=[]
p.add(CmdItem(["bash", "-c", "exit 1"], stderr_handler=lambda line: err1.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,1)))
p.add(CmdItem(["bash", "-c", "exit 2"], stderr_handler=lambda line: err2.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2)))
p.add(CmdItem(["bash", "-c", "exit 3"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,3)))
executed=p.execute(stdout_handler=lambda line: out.append(line))
p.add(CmdItem(["bash", "-c", "exit 3"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,3), stdout_handler=lambda line: out.append(line)))
executed=p.execute()
self.assertEqual(err1, [])
self.assertEqual(err2, [])
@ -97,8 +97,8 @@ class TestCmdPipe(unittest2.TestCase):
return True
p.add(CmdItem(["echo", "test1"], stderr_handler=lambda line: err1.append(line), exit_handler=true_exit, readonly=True))
p.add(CmdItem(["echo", "test2"], stderr_handler=lambda line: err2.append(line), exit_handler=true_exit, readonly=True))
executed=p.execute(stdout_handler=lambda line: out.append(line))
p.add(CmdItem(["echo", "test2"], stderr_handler=lambda line: err2.append(line), exit_handler=true_exit, readonly=True, stdout_handler=lambda line: out.append(line)))
executed=p.execute()
self.assertEqual(err1, [])
self.assertEqual(err2, [])
@ -113,11 +113,63 @@ class TestCmdPipe(unittest2.TestCase):
err2=[]
out=[]
p.add(CmdItem(["echo", "test1"], stderr_handler=lambda line: err1.append(line), readonly=False))
p.add(CmdItem(["echo", "test2"], stderr_handler=lambda line: err2.append(line), readonly=True))
executed=p.execute(stdout_handler=lambda line: out.append(line))
p.add(CmdItem(["echo", "test2"], stderr_handler=lambda line: err2.append(line), readonly=True, stdout_handler=lambda line: out.append(line)))
executed=p.execute()
self.assertEqual(err1, [])
self.assertEqual(err2, [])
self.assertEqual(out, [])
self.assertTrue(executed)
def test_no_handlers(self):
with self.assertRaises(Exception):
p=CmdPipe()
p.add(CmdItem([ "echo" ]))
p.execute()
#NOTE: this will give some resource warnings
def test_manual_pipes(self):
# manual piping means: a command in the pipe has a stdout_handler, which is responsible for sending the data into the next item of the pipe.
result=[]
def stdout_handler(line):
item2.process.stdin.write(line.encode('utf8'))
# item2.process.stdin.close()
item1=CmdItem(["echo", "test"], stdout_handler=stdout_handler)
item2=CmdItem(["tr", "e", "E"], stdout_handler=lambda line: result.append(line))
p=CmdPipe()
p.add(item1)
p.add(item2)
p.execute()
self.assertEqual(result, ["tEst"])
def test_multiprocess(self):
#dont do any piping at all, just run multiple processes and handle outputs
result1=[]
result2=[]
result3=[]
item1=CmdItem(["echo", "test1"], stdout_handler=lambda line: result1.append(line))
item2=CmdItem(["echo", "test2"], stdout_handler=lambda line: result2.append(line))
item3=CmdItem(["echo", "test3"], stdout_handler=lambda line: result3.append(line))
p=CmdPipe()
p.add(item1)
p.add(item2)
p.add(item3)
p.execute()
self.assertEqual(result1, ["test1"])
self.assertEqual(result2, ["test2"])
self.assertEqual(result3, ["test3"])

View File

@ -144,5 +144,70 @@ class TestExecuteNode(unittest2.TestCase):
self.pipe(nodea, nodeb)
def test_cwd(self):
nodea=ExecuteNode(ssh_to="localhost", debug_output=True)
nodeb=ExecuteNode(debug_output=True)
#change to a directory with a space and execute a system pipe, check if all piped commands are executed in correct directory.
shelltest("mkdir '/tmp/space test' 2>/dev/null; true")
self.assertEqual(nodea.run(cmd=["pwd", ExecuteNode.PIPE, "cat"], cwd="/tmp/space test"), ["/tmp/space test"])
self.assertEqual(nodea.run(cmd=["cat", ExecuteNode.PIPE, "pwd"], cwd="/tmp/space test"), ["/tmp/space test"])
self.assertEqual(nodeb.run(cmd=["pwd", ExecuteNode.PIPE, "cat"], cwd="/tmp/space test"), ["/tmp/space test"])
self.assertEqual(nodeb.run(cmd=["cat", ExecuteNode.PIPE, "pwd"], cwd="/tmp/space test"), ["/tmp/space test"])
def test_script_handlers(self):
def test(node):
results = []
node.script(lines=["echo line1", "echo line2 1>&2", "exit 123"],
stdout_handler=lambda line: results.append(line),
stderr_handler=lambda line: results.append(line),
exit_handler=lambda exit_code: results.append(exit_code),
valid_exitcodes=[123]
)
self.assertEqual(results, ["line1", "line2", 123 ])
with self.subTest("remote"):
test(ExecuteNode(ssh_to="localhost", debug_output=True))
#
with self.subTest("local"):
test(ExecuteNode(debug_output=True))
def test_script_defaults(self):
result=[]
nodea=ExecuteNode(debug_output=True)
nodea.script(lines=["echo test"], stdout_handler=lambda line: result.append(line))
self.assertEqual(result, ["test"])
def test_script_pipe(self):
result=[]
nodea=ExecuteNode()
cmd_pipe=nodea.script(lines=["echo test"], pipe=True)
nodea.script(lines=["tr e E"], inp=cmd_pipe,stdout_handler=lambda line: result.append(line))
self.assertEqual(result, ["tEst"])
def test_mixed(self):
#should be able to mix run() and script()
node=ExecuteNode()
result=[]
pipe=node.run(["echo", "test"], pipe=True)
node.script(["tr e E"], inp=pipe, stdout_handler=lambda line: result.append(line))
self.assertEqual(result, ["tEst"])
if __name__ == '__main__':
unittest.main()

View File

@ -78,7 +78,8 @@ class TestZfsScaling(unittest2.TestCase):
#this triggers if you make a change with an impact of more than O(snapshot_count/2)
expected_runs=743
expected_runs=636
print("EXPECTED RUNS: {}".format(expected_runs))
print("ACTUAL RUNS: {}".format(run_counter))
self.assertLess(abs(run_counter-expected_runs), dataset_count/2)
@ -92,6 +93,7 @@ class TestZfsScaling(unittest2.TestCase):
#this triggers if you make a change with a performance impact of more than O(snapshot_count/2)
expected_runs=947
expected_runs=842
print("EXPECTED RUNS: {}".format(expected_runs))
print("ACTUAL RUNS: {}".format(run_counter))
self.assertLess(abs(run_counter-expected_runs), dataset_count/2)

84
tests/test_treehasher.py Normal file
View File

@ -0,0 +1,84 @@
from basetest import *
from zfs_autobackup.BlockHasher import BlockHasher
# sha1 sums of files, (bs=4096)
# da39a3ee5e6b4b0d3255bfef95601890afd80709 empty
# 642027d63bb0afd7e0ba197f2c66ad03e3d70de1 partial
# 3c0bf91170d873b8e327d3bafb6bc074580d11b7 whole
# 2e863f1fcccd6642e4e28453eba10d2d3f74d798 whole2
# 959e6b58078f0cfd2fb3d37e978fda51820473ff whole_whole2
# 309ffffba2e1977d12f3b7469971f30d28b94bd8 whole_whole2_partial
class TestTreeHasher(unittest2.TestCase):
def test_treehasher(self):
shelltest("rm -rf /tmp/treehashertest; mkdir /tmp/treehashertest")
shelltest("cp tests/data/whole /tmp/treehashertest")
shelltest("mkdir /tmp/treehashertest/emptydir")
shelltest("mkdir /tmp/treehashertest/dir")
shelltest("cp tests/data/whole_whole2_partial /tmp/treehashertest/dir")
# it should ignore these:
shelltest("ln -s / /tmp/treehashertest/symlink")
shelltest("mknod /tmp/treehashertest/c c 1 1")
shelltest("mknod /tmp/treehashertest/b b 1 1")
shelltest("mkfifo /tmp/treehashertest/f")
block_hasher = BlockHasher(count=1, skip=0)
tree_hasher = TreeHasher(block_hasher)
with self.subTest("Test output, count 1, skip 0"):
self.assertEqual(list(tree_hasher.generate("/tmp/treehashertest")), [
('whole', 0, '3c0bf91170d873b8e327d3bafb6bc074580d11b7'),
('dir/whole_whole2_partial', 0, '3c0bf91170d873b8e327d3bafb6bc074580d11b7'),
('dir/whole_whole2_partial', 1, '2e863f1fcccd6642e4e28453eba10d2d3f74d798'),
('dir/whole_whole2_partial', 2, '642027d63bb0afd7e0ba197f2c66ad03e3d70de1')
])
block_hasher = BlockHasher(count=1, skip=1)
tree_hasher = TreeHasher(block_hasher)
with self.subTest("Test output, count 1, skip 1"):
self.assertEqual(list(tree_hasher.generate("/tmp/treehashertest")), [
('whole', 0, '3c0bf91170d873b8e327d3bafb6bc074580d11b7'),
# ('dir/whole_whole2_partial', 0, '3c0bf91170d873b8e327d3bafb6bc074580d11b7'),
('dir/whole_whole2_partial', 1, '2e863f1fcccd6642e4e28453eba10d2d3f74d798'),
# ('dir/whole_whole2_partial', 2, '642027d63bb0afd7e0ba197f2c66ad03e3d70de1')
])
block_hasher = BlockHasher(count=2)
tree_hasher = TreeHasher(block_hasher)
with self.subTest("Test output, count 2, skip 0"):
self.assertEqual(list(tree_hasher.generate("/tmp/treehashertest")), [
('whole', 0, '3c0bf91170d873b8e327d3bafb6bc074580d11b7'),
('dir/whole_whole2_partial', 0, '959e6b58078f0cfd2fb3d37e978fda51820473ff'),
('dir/whole_whole2_partial', 1, '642027d63bb0afd7e0ba197f2c66ad03e3d70de1')
])
with self.subTest("Test compare"):
generator = tree_hasher.generate("/tmp/treehashertest")
errors = list(tree_hasher.compare("/tmp/treehashertest", generator))
self.assertEqual(errors, [])
with self.subTest("Test mismatch"):
generator = list(tree_hasher.generate("/tmp/treehashertest"))
shelltest("cp tests/data/whole2 /tmp/treehashertest/whole")
self.assertEqual(list(tree_hasher.compare("/tmp/treehashertest", generator)),
[('whole',
0,
'3c0bf91170d873b8e327d3bafb6bc074580d11b7',
'2e863f1fcccd6642e4e28453eba10d2d3f74d798')])
with self.subTest("Test missing file compare"):
generator = list(tree_hasher.generate("/tmp/treehashertest"))
shelltest("rm /tmp/treehashertest/whole")
self.assertEqual(list(tree_hasher.compare("/tmp/treehashertest", generator)),
[('whole', '-', '-', "ERROR: [Errno 2] No such file or directory: '/tmp/treehashertest/whole'")])

View File

@ -16,10 +16,12 @@ from basetest import *
# - test all directions (local, remote/local, local/remote, remote/remote)
#
class TestZfsEncryption(unittest2.TestCase):
class TestZfsVerify(unittest2.TestCase):
def setUp(self):
self.skipTest("WIP")
prepare_zpools()
#create actual test files and data
@ -86,14 +88,15 @@ class TestZfsEncryption(unittest2.TestCase):
runchecked("rsync, local", "test test_target1 --verbose --exclude-received --fs-compare=rsync")
runchecked("tar, remote source and remote target",
"test test_target1 --ssh-source=localhost --ssh-target=localhost --verbose --exclude-received --fs-compare=tar")
"test test_target1 --ssh-source=localhost --ssh-target=localhost --verbose --exclude-received --fs-compare=find")
runchecked("tar, remote source",
"test test_target1 --ssh-source=localhost --verbose --exclude-received --fs-compare=tar")
"test test_target1 --ssh-source=localhost --verbose --exclude-received --fs-compare=find")
runchecked("tar, remote target",
"test test_target1 --ssh-target=localhost --verbose --exclude-received --fs-compare=tar")
runchecked("tar, local", "test test_target1 --verbose --exclude-received --fs-compare=tar")
"test test_target1 --ssh-target=localhost --verbose --exclude-received --fs-compare=find")
runchecked("tar, local", "test test_target1 --verbose --exclude-received --fs-compare=find")
with self.subTest("no common snapshot"):
#destroy common snapshot, now 3 should fail
shelltest("zfs destroy test_source1/fs1/ok_zvol@test-20101111000000")
self.assertEqual(3, ZfsAutoverify("test test_target1 --verbose --exclude-received".split(" ")).run())

View File

@ -420,33 +420,13 @@ test_target1/fs2/sub
test_target1/fs2/sub@test-20101111000000
""")
# def test_strippath_toomuch(self):
# with patch('time.strftime', return_value="test-20101111000000"):
# self.assertFalse(
# ZfsAutobackup("test test_target1 --verbose --strip-path=2 --no-progress".split(" ")).run())
#
# r = shelltest("zfs list -H -o name -r -t all " + TEST_POOLS)
# self.assertMultiLineEqual(r, """
# test_source1
# test_source1/fs1
# test_source1/fs1@test-20101111000000
# test_source1/fs1/sub
# test_source1/fs1/sub@test-20101111000000
# test_source2
# test_source2/fs2
# test_source2/fs2/sub
# test_source2/fs2/sub@test-20101111000000
# test_source2/fs3
# test_source2/fs3/sub
# test_target1
# test_target1/fs1
# test_target1/fs1@test-20101111000000
# test_target1/fs1/sub
# test_target1/fs1/sub@test-20101111000000
# test_target1/fs2
# test_target1/fs2/sub
# test_target1/fs2/sub@test-20101111000000
# """)
def test_strippath_collision(self):
with self.assertRaisesRegexp(Exception,"collision"):
ZfsAutobackup("test test_target1 --verbose --strip-path=2 --no-progress --debug".split(" ")).run()
def test_strippath_toomuch(self):
with self.assertRaisesRegexp(Exception,"too much"):
ZfsAutobackup("test test_target1 --verbose --strip-path=3 --no-progress --debug".split(" ")).run()
def test_clearrefres(self):

View File

@ -79,3 +79,19 @@ test_target1/b/test_target1/a/test_source1/fs1/sub@test-20101111000000
self.assertFalse(
ZfsAutobackup("test test_target1 --no-progress --verbose --debug --zfs-compressed".split(" ")).run())
def test_force(self):
"""test 1:1 replication"""
shelltest("zfs set autobackup:test=true test_source1")
with patch('time.strftime', return_value="test-20101111000000"):
self.assertFalse(
ZfsAutobackup("test test_target1 --no-progress --verbose --debug --force --strip-path=1".split(" ")).run())
r=shelltest("zfs list -H -o name -r -t snapshot test_target1")
self.assertMultiLineEqual(r,"""
test_target1@test-20101111000000
test_target1/fs1@test-20101111000000
test_target1/fs1/sub@test-20101111000000
test_target1/fs2/sub@test-20101111000000
""")

216
tests/test_zfscheck.py Normal file
View File

@ -0,0 +1,216 @@
from basetest import *
from zfs_autobackup.BlockHasher import BlockHasher
class TestZfsCheck(unittest2.TestCase):
def setUp(self):
pass
def test_volume(self):
prepare_zpools()
shelltest("zfs create -V200M test_source1/vol")
shelltest("zfs snapshot test_source1/vol@test")
with self.subTest("Generate"):
with OutputIO() as buf:
with redirect_stdout(buf):
self.assertFalse(ZfsCheck("test_source1/vol@test".split(" "),print_arguments=False).run())
print(buf.getvalue())
self.assertEqual("""0 2c2ceccb5ec5574f791d45b63c940cff20550f9a
1 2c2ceccb5ec5574f791d45b63c940cff20550f9a
""", buf.getvalue())
#store on disk for next step, add one error.
with open("/tmp/testhashes", "w") as fh:
fh.write(buf.getvalue()+"1\t2c2ceccb5ec5574f791d45b63c940cff20550f9X")
with self.subTest("Compare"):
with OutputIO() as buf:
with redirect_stdout(buf):
self.assertEqual(1, ZfsCheck("test_source1/vol@test --check=/tmp/testhashes".split(" "),print_arguments=False).run())
print(buf.getvalue())
self.assertEqual("Chunk 1 failed: 2c2ceccb5ec5574f791d45b63c940cff20550f9X 2c2ceccb5ec5574f791d45b63c940cff20550f9a\n", buf.getvalue())
def test_filesystem(self):
prepare_zpools()
shelltest("cp tests/data/whole /test_source1/testfile")
shelltest("mkdir /test_source1/emptydir")
shelltest("mkdir /test_source1/dir")
shelltest("cp tests/data/whole2 /test_source1/dir/testfile")
#it should ignore these:
shelltest("ln -s / /test_source1/symlink")
shelltest("mknod /test_source1/c c 1 1")
shelltest("mknod /test_source1/b b 1 1")
shelltest("mkfifo /test_source1/f")
shelltest("zfs snapshot test_source1@test")
with self.subTest("Generate"):
with OutputIO() as buf:
with redirect_stdout(buf):
self.assertFalse(ZfsCheck("test_source1@test".split(" "), print_arguments=False).run())
print(buf.getvalue())
self.assertEqual("""testfile 0 3c0bf91170d873b8e327d3bafb6bc074580d11b7
dir/testfile 0 2e863f1fcccd6642e4e28453eba10d2d3f74d798
""", buf.getvalue())
#store on disk for next step, add error
with open("/tmp/testhashes", "w") as fh:
fh.write(buf.getvalue()+"dir/testfile 0 2e863f1fcccd6642e4e28453eba10d2d3f74d79X")
with self.subTest("Compare"):
with OutputIO() as buf:
with redirect_stdout(buf):
self.assertEqual(1, ZfsCheck("test_source1@test --check=/tmp/testhashes".split(" "),print_arguments=False).run())
print(buf.getvalue())
self.assertEqual("dir/testfile: Chunk 0 failed: 2e863f1fcccd6642e4e28453eba10d2d3f74d79X 2e863f1fcccd6642e4e28453eba10d2d3f74d798\n", buf.getvalue())
def test_file(self):
with self.subTest("Generate"):
with OutputIO() as buf:
with redirect_stdout(buf):
self.assertFalse(ZfsCheck("tests/data/whole".split(" "), print_arguments=False).run())
print(buf.getvalue())
self.assertEqual("""0 3c0bf91170d873b8e327d3bafb6bc074580d11b7
""", buf.getvalue())
# store on disk for next step, add error
with open("/tmp/testhashes", "w") as fh:
fh.write(buf.getvalue()+"0 3c0bf91170d873b8e327d3bafb6bc074580d11bX")
with self.subTest("Compare"):
with OutputIO() as buf:
with redirect_stdout(buf):
self.assertEqual(1,ZfsCheck("tests/data/whole --check=/tmp/testhashes".split(" "), print_arguments=False).run())
print(buf.getvalue())
self.assertEqual("Chunk 0 failed: 3c0bf91170d873b8e327d3bafb6bc074580d11bX 3c0bf91170d873b8e327d3bafb6bc074580d11b7\n", buf.getvalue())
def test_tree(self):
shelltest("rm -rf /tmp/testtree; mkdir /tmp/testtree")
shelltest("cp tests/data/whole /tmp/testtree")
shelltest("cp tests/data/whole_whole2 /tmp/testtree")
shelltest("cp tests/data/whole2 /tmp/testtree")
shelltest("cp tests/data/partial /tmp/testtree")
shelltest("cp tests/data/whole_whole2_partial /tmp/testtree")
####################################
with self.subTest("Generate, skip 1"):
with OutputIO() as buf:
with redirect_stdout(buf):
self.assertFalse(ZfsCheck("/tmp/testtree --skip=1".split(" "), print_arguments=False).run())
#since order varies, just check count (there is one empty line for some reason, only when testing like this)
print(buf.getvalue().split("\n"))
self.assertEqual(len(buf.getvalue().split("\n")),4)
######################################
with self.subTest("Compare, all incorrect, skip 1"):
# store on disk for next step, add error
with open("/tmp/testhashes", "w") as fh:
fh.write("""
partial 0 642027d63bb0afd7e0ba197f2c66ad03e3d70deX
whole 0 3c0bf91170d873b8e327d3bafb6bc074580d11bX
whole2 0 2e863f1fcccd6642e4e28453eba10d2d3f74d79X
whole_whole2 0 959e6b58078f0cfd2fb3d37e978fda51820473fX
whole_whole2_partial 0 309ffffba2e1977d12f3b7469971f30d28b94bdX
""")
with OutputIO() as buf:
with redirect_stdout(buf):
self.assertEqual(ZfsCheck("/tmp/testtree --check=/tmp/testhashes --skip=1".split(" "), print_arguments=False).run(), 3)
print(buf.getvalue())
self.assertMultiLineEqual("""partial: Chunk 0 failed: 642027d63bb0afd7e0ba197f2c66ad03e3d70deX 642027d63bb0afd7e0ba197f2c66ad03e3d70de1
whole2: Chunk 0 failed: 2e863f1fcccd6642e4e28453eba10d2d3f74d79X 2e863f1fcccd6642e4e28453eba10d2d3f74d798
whole_whole2_partial: Chunk 0 failed: 309ffffba2e1977d12f3b7469971f30d28b94bdX 309ffffba2e1977d12f3b7469971f30d28b94bd8
""",buf.getvalue())
####################################
with self.subTest("Generate"):
with OutputIO() as buf:
with redirect_stdout(buf):
self.assertFalse(ZfsCheck("/tmp/testtree".split(" "), print_arguments=False).run())
#file order on disk can vary, so sort it..
sorted=buf.getvalue().split("\n")
sorted.sort()
sorted="\n".join(sorted)+"\n"
print(sorted)
self.assertEqual("""
partial 0 642027d63bb0afd7e0ba197f2c66ad03e3d70de1
whole 0 3c0bf91170d873b8e327d3bafb6bc074580d11b7
whole2 0 2e863f1fcccd6642e4e28453eba10d2d3f74d798
whole_whole2 0 959e6b58078f0cfd2fb3d37e978fda51820473ff
whole_whole2_partial 0 309ffffba2e1977d12f3b7469971f30d28b94bd8
""", sorted)
# store on disk for next step, add error
with open("/tmp/testhashes", "w") as fh:
fh.write(buf.getvalue() + "whole_whole2_partial 0 309ffffba2e1977d12f3b7469971f30d28b94bdX")
####################################
with self.subTest("Compare"):
with OutputIO() as buf:
with redirect_stdout(buf):
self.assertEqual(1, ZfsCheck("/tmp/testtree --check=/tmp/testhashes".split(" "),
print_arguments=False).run())
print(buf.getvalue())
self.assertEqual(
"whole_whole2_partial: Chunk 0 failed: 309ffffba2e1977d12f3b7469971f30d28b94bdX 309ffffba2e1977d12f3b7469971f30d28b94bd8\n",
buf.getvalue())
def test_brokenpipe_cleanup_filesystem(self):
"""test if stuff is cleaned up correctly, in debugging mode , when a pipe breaks. """
prepare_zpools()
shelltest("cp tests/data/whole /test_source1/testfile")
shelltest("zfs snapshot test_source1@test")
#breaks pipe when grep exists:
#important to use --debug, since that generates extra output which would be problematic if we didnt do correct SIGPIPE handling
shelltest("python -m zfs_autobackup.ZfsCheck test_source1@test --debug | grep -m1 'Hashing tree'")
# time.sleep(5)
#should NOT be mounted anymore if cleanup went ok:
self.assertNotRegex(shelltest("mount"), "test_source1@test")
def test_brokenpipe_cleanup_volume(self):
prepare_zpools()
shelltest("zfs create -V200M test_source1/vol")
shelltest("zfs snapshot test_source1/vol@test")
#breaks pipe when grep exists:
#important to use --debug, since that generates extra output which would be problematic if we didnt do correct SIGPIPE handling
shelltest("python -m zfs_autobackup.ZfsCheck test_source1/vol@test --debug | grep -m1 'Hashing file'")
# time.sleep(1)
r = shelltest("zfs list -H -o name -r -t all " + TEST_POOLS)
self.assertMultiLineEqual("""
test_source1
test_source1/fs1
test_source1/fs1/sub
test_source1/vol
test_source1/vol@test
test_source2
test_source2/fs2
test_source2/fs2/sub
test_source2/fs3
test_source2/fs3/sub
test_target1
""",r )

View File

@ -0,0 +1,127 @@
import hashlib
import os
class BlockHasher():
"""This class was created to checksum huge files and blockdevices (TB's)
Instead of one sha1sum of the whole file, it generates sha1susms of chunks of the file.
The chunksize is count*bs (bs is the read blocksize from disk)
Its also possible to only read a certain percentage of blocks to just check a sample.
Input and output generators are in the format ( chunk_nr, hexdigest )
NOTE: skipping is only used on the generator side. The compare side just compares what it gets from the input generator.
"""
def __init__(self, count=10000, bs=4096, hash_class=hashlib.sha1, skip=0):
self.count = count
self.bs = bs
self.chunk_size=bs*count
self.hash_class = hash_class
# self.coverage=coverage
self.skip=skip
self._skip_count=0
self.stats_total_bytes=0
def _seek_next_chunk(self, fh, fsize):
"""seek fh to next chunk and update skip counter.
returns chunk_nr
return false it should skip the rest of the file
"""
#ignore rempty files
if fsize==0:
return False
# need to skip chunks?
if self._skip_count > 0:
chunks_left = ((fsize - fh.tell()) // self.chunk_size) + 1
# not enough chunks left in this file?
if self._skip_count >= chunks_left:
# skip rest of this file
self._skip_count = self._skip_count - chunks_left
return False
else:
# seek to next chunk, reset skip count
fh.seek(self.chunk_size * self._skip_count, os.SEEK_CUR)
self._skip_count = self.skip
return fh.tell()//self.chunk_size
else:
# should read this chunk, reset skip count
self._skip_count = self.skip
return fh.tell() // self.chunk_size
def generate(self, fname):
"""Generates checksums
yields(chunk_nr, hexdigest)
yields nothing for empty files.
"""
with open(fname, "rb") as fh:
fh.seek(0, os.SEEK_END)
fsize=fh.tell()
fh.seek(0)
while fh.tell()<fsize:
chunk_nr=self._seek_next_chunk(fh, fsize)
if chunk_nr is False:
return
#read chunk
hash = self.hash_class()
block_nr = 0
while block_nr != self.count:
block=fh.read(self.bs)
if block==b"":
break
hash.update(block)
block_nr = block_nr + 1
yield (chunk_nr, hash.hexdigest())
def compare(self, fname, generator):
"""reads from generator and compares blocks
Yields mismatches in the form: ( chunk_nr, hexdigest, actual_hexdigest)
Yields errors in the form: ( chunk_nr, hexdigest, "message" )
"""
try:
checked = 0
with open(fname, "rb") as f:
for (chunk_nr, hexdigest) in generator:
try:
checked = checked + 1
hash = self.hash_class()
f.seek(int(chunk_nr) * self.bs * self.count)
block_nr = 0
for block in iter(lambda: f.read(self.bs), b""):
hash.update(block)
block_nr = block_nr + 1
if block_nr == self.count:
break
if block_nr == 0:
yield (chunk_nr, hexdigest, 'EOF')
elif (hash.hexdigest() != hexdigest):
yield (chunk_nr, hexdigest, hash.hexdigest())
except Exception as e:
yield ( chunk_nr , hexdigest, 'ERROR: '+str(e))
except Exception as e:
yield ( '-', '-', 'ERROR: '+ str(e))

109
zfs_autobackup/CliBase.py Normal file
View File

@ -0,0 +1,109 @@
import argparse
import os.path
import sys
from .LogConsole import LogConsole
class CliBase(object):
"""Base class for all cli programs
Overridden in subclasses that add stuff for the specific programs."""
# also used by setup.py
VERSION = "3.2-alpha2"
HEADER = "{} v{} - (c)2022 E.H.Eefting (edwin@datux.nl)".format(os.path.basename(sys.argv[0]), VERSION)
def __init__(self, argv, print_arguments=True):
self.parser=self.get_parser()
self.args = self.parse_args(argv)
# helps with investigating failed regression tests:
if print_arguments:
print("ARGUMENTS: " + " ".join(argv))
def parse_args(self, argv):
"""parses the arguments and does additional checks, might print warnings or notes
Overridden in subclasses with extra checks.
"""
args = self.parser.parse_args(argv)
if args.help:
self.parser.print_help()
sys.exit(255)
if args.version:
print(self.HEADER)
sys.exit(255)
# auto enable progress?
if sys.stderr.isatty() and not args.no_progress:
args.progress = True
if args.debug_output:
args.debug = True
if args.test:
args.verbose = True
if args.debug:
args.verbose = True
self.log = LogConsole(show_debug=args.debug, show_verbose=args.verbose, color=sys.stdout.isatty())
self.verbose(self.HEADER)
self.verbose("")
return args
def get_parser(self):
"""build up the argument parser
Overridden in subclasses that add extra arguments
"""
parser = argparse.ArgumentParser(description=self.HEADER, add_help=False,
epilog='Full manual at: https://github.com/psy0rz/zfs_autobackup')
# Basic options
group=parser.add_argument_group("Common options")
group.add_argument('--help', '-h', action='store_true', help='show help')
group.add_argument('--test', '--dry-run', '-n', action='store_true',
help='Dry run, dont change anything, just show what would be done (still does all read-only '
'operations)')
group.add_argument('--verbose', '-v', action='store_true', help='verbose output')
group.add_argument('--debug', '-d', action='store_true',
help='Show zfs commands that are executed, stops after an exception.')
group.add_argument('--debug-output', action='store_true',
help='Show zfs commands and their output/exit codes. (noisy)')
group.add_argument('--progress', action='store_true',
help='show zfs progress output. Enabled automaticly on ttys. (use --no-progress to disable)')
group.add_argument('--no-progress', action='store_true',
help=argparse.SUPPRESS) # needed to workaround a zfs recv -v bug
group.add_argument('--version', action='store_true',
help='Show version.')
return parser
def verbose(self, txt):
self.log.verbose(txt)
def warning(self, txt):
self.log.warning(txt)
def error(self, txt):
self.log.error(txt)
def debug(self, txt):
self.log.debug(txt)
def progress(self, txt):
self.log.progress(txt)
def clear_progress(self):
self.log.clear_progress()
def set_title(self, title):
self.log.verbose("")
self.log.verbose("#### " + title)

View File

@ -1,3 +1,17 @@
# This is the low level process executing stuff.
# It makes piping and parallel process handling more easy.
# You can specify a handler for each line of stderr output for each item in the pipe.
# Every item also has its own exitcode handler.
# Normally you add a stdout_handler to the last item in the pipe.
# However: You can also add stdout_handler to other items in a pipe. This will turn that item in to a manual pipe: your
# handler is responsible for sending data into the next item of the pipe. (avaiable in item.next)
# You can also use manual pipe mode to just execute multiple command in parallel and handle their output parallel,
# without doing any actual pipe stuff. (because you dont HAVE to send data into the next item.)
import subprocess
import os
import select
@ -11,17 +25,23 @@ except ImportError:
class CmdItem:
"""one command item, to be added to a CmdPipe"""
def __init__(self, cmd, readonly=False, stderr_handler=None, exit_handler=None, shell=False):
def __init__(self, cmd, readonly=False, stderr_handler=None, exit_handler=None, stdout_handler=None, shell=False):
"""create item. caller has to make sure cmd is properly escaped when using shell.
If stdout_handler is None, it will connect the stdout to the stdin of the next item in the pipe, like
and actual system pipe. (no python overhead)
:type cmd: list of str
"""
self.cmd = cmd
self.readonly = readonly
self.stderr_handler = stderr_handler
self.stdout_handler = stdout_handler
self.exit_handler = exit_handler
self.shell = shell
self.process = None
self.next = None #next item in pipe, set by CmdPipe
def __str__(self):
"""return copy-pastable version of command."""
@ -84,72 +104,23 @@ class CmdPipe:
def should_execute(self):
return self._should_execute
def execute(self, stdout_handler):
"""run the pipe. returns True all exit handlers returned true"""
def execute(self):
"""run the pipe. returns True all exit handlers returned true. (otherwise it will be False/None depending on exit handlers returncode) """
if not self._should_execute:
return True
# first process should have actual user input as stdin:
selectors = []
selectors = self.__create()
# create processes
last_stdout = None
stdin = subprocess.PIPE
for item in self.items:
if not selectors:
raise (Exception("Cant use cmdpipe without any output handlers."))
item.create(stdin)
selectors.append(item.process.stderr)
if last_stdout is None:
# we're the first process in the pipe, do we have some input?
if self.inp is not None:
# TODO: make streaming to support big inputs?
item.process.stdin.write(self.inp.encode('utf-8'))
item.process.stdin.close()
else:
# last stdout was piped to this stdin already, so close it because we dont need it anymore
last_stdout.close()
last_stdout = item.process.stdout
stdin = last_stdout
# monitor last stdout as well
selectors.append(last_stdout)
while True:
# wait for output on one of the stderrs or last_stdout
(read_ready, write_ready, ex_ready) = select.select(selectors, [], [])
eof_count = 0
done_count = 0
# read line and call appropriate handlers
if last_stdout in read_ready:
line = last_stdout.readline().decode('utf-8').rstrip()
if line != "":
stdout_handler(line)
else:
eof_count = eof_count + 1
for item in self.items:
if item.process.stderr in read_ready:
line = item.process.stderr.readline().decode('utf-8').rstrip()
if line != "":
item.stderr_handler(line)
else:
eof_count = eof_count + 1
if item.process.poll() is not None:
done_count = done_count + 1
# all filehandles are eof and all processes are done (poll() is not None)
if eof_count == len(selectors) and done_count == len(self.items):
break
self.__process_outputs(selectors)
# close filehandles
last_stdout.close()
for item in self.items:
item.process.stderr.close()
item.process.stdout.close()
# call exit handlers
success = True
@ -158,3 +129,86 @@ class CmdPipe:
success=item.exit_handler(item.process.returncode) and success
return success
def __process_outputs(self, selectors):
"""watch all output selectors and call handlers"""
while True:
# wait for output on one of the stderrs or last_stdout
(read_ready, write_ready, ex_ready) = select.select(selectors, [], [])
eof_count = 0
done_count = 0
# read line and call appropriate handlers
for item in self.items:
if item.process.stdout in read_ready:
line = item.process.stdout.readline().decode('utf-8').rstrip()
if line != "":
item.stdout_handler(line)
else:
eof_count = eof_count + 1
if item.next:
item.next.process.stdin.close()
if item.process.stderr in read_ready:
line = item.process.stderr.readline().decode('utf-8').rstrip()
if line != "":
item.stderr_handler(line)
else:
eof_count = eof_count + 1
if item.process.poll() is not None:
done_count = done_count + 1
# all filehandles are eof and all processes are done (poll() is not None)
if eof_count == len(selectors) and done_count == len(self.items):
break
def __create(self):
"""create actual processes, do piping and return selectors."""
selectors = []
next_stdin = subprocess.PIPE # means we write input via python instead of an actual system pipe
first = True
prev_item = None
for item in self.items:
# creates the actual subprocess via subprocess.popen
item.create(next_stdin)
# we piped previous process? dont forget to close its stdout
if next_stdin != subprocess.PIPE:
next_stdin.close()
if item.stderr_handler:
selectors.append(item.process.stderr)
# we're the first process in the pipe
if first:
if self.inp is not None:
# write the input we have
item.process.stdin.write(self.inp.encode('utf-8'))
item.process.stdin.close()
first = False
# manual stdout handling or pipe it to the next process?
if item.stdout_handler is None:
# no manual stdout handling, pipe it to the next process via sytem pipe
next_stdin = item.process.stdout
else:
# manual stdout handling via python
selectors.append(item.process.stdout)
# next process will get input from python:
next_stdin = subprocess.PIPE
if prev_item is not None:
prev_item.next = item
prev_item = item
return selectors

View File

@ -54,15 +54,16 @@ class ExecuteNode(LogStub):
if cmd==self.PIPE:
return('|')
else:
return(cmd_quote(cmd))
return cmd_quote(cmd)
def _shell_cmd(self, cmd):
def _shell_cmd(self, cmd, cwd):
"""prefix specified ssh shell to command and escape shell characters"""
ret=[]
#add remote shell
if not self.is_local():
#note: dont escape this part (executed directly without shell)
ret=["ssh"]
if self.ssh_config is not None:
@ -70,7 +71,17 @@ class ExecuteNode(LogStub):
ret.append(self.ssh_to)
ret.append(" ".join(map(self._quote, cmd)))
#note: DO escape from here, executed in either local or remote shell.
shell_str=""
#add cwd change?
if cwd is not None:
shell_str=shell_str + "cd " + self._quote(cwd) + "; "
shell_str=shell_str + " ".join(map(self._quote, cmd))
ret.append(shell_str)
return ret
@ -78,24 +89,26 @@ class ExecuteNode(LogStub):
return self.ssh_to is None
def run(self, cmd, inp=None, tab_split=False, valid_exitcodes=None, readonly=False, hide_errors=False,
return_stderr=False, pipe=False, return_all=False):
return_stderr=False, pipe=False, return_all=False, cwd=None):
"""run a command on the node , checks output and parses/handle output and returns it
Takes care of proper quoting/escaping/ssh and logging of stdout/err/exit codes.
Either uses a local shell (sh -c) or remote shell (ssh) to execute the command.
Therefore the command can have stuff like actual pipes in it, if you dont want to use pipe=True to pipe stuff.
:param cmd: the actual command, should be a list, where the first item is the command
and the rest are parameters. use ExecuteNode.PIPE to add an unescaped |
(if you want to use system piping instead of python piping)
:param pipe: return CmdPipe instead of executing it.
:param pipe: return CmdPipe instead of executing it. (pipe this into another run() command via inp=...)
:param inp: Can be None, a string or a CmdPipe that was previously returned.
:param tab_split: split tabbed files in output into a list
:param valid_exitcodes: list of valid exit codes for this command (checks exit code of both sides of a pipe)
Use [] to accept all exit codes. Default [0]
:param valid_exitcodes: list of valid exit codes for this command. Use [] to accept all exit codes. Default [0]
:param readonly: make this True if the command doesn't make any changes and is safe to execute in testmode
:param hide_errors: don't show stderr output as error, instead show it as debugging output (use to hide expected errors)
:param return_stderr: return both stdout and stderr as a tuple. (normally only returns stdout)
:param return_all: return both stdout and stderr and exit_code as a tuple. (normally only returns stdout)
:param cwd: Change current working directory before executing command.
"""
@ -131,17 +144,14 @@ class ExecuteNode(LogStub):
return True
# add shell command and handlers to pipe
cmd_item=CmdItem(cmd=self._shell_cmd(cmd), readonly=readonly, stderr_handler=stderr_handler, exit_handler=exit_handler, shell=self.is_local())
cmd_pipe.add(cmd_item)
# return pipe instead of executing?
if pipe:
return cmd_pipe
# stdout parser
output_lines = []
if pipe:
# dont specify output handler, so it will get piped to next process
stdout_handler=None
else:
# handle output manually, dont pipe it
def stdout_handler(line):
if tab_split:
output_lines.append(line.rstrip().split('\t'))
@ -149,13 +159,21 @@ class ExecuteNode(LogStub):
output_lines.append(line.rstrip())
self._parse_stdout(line)
# add shell command and handlers to pipe
cmd_item=CmdItem(cmd=self._shell_cmd(cmd, cwd), readonly=readonly, stderr_handler=stderr_handler, exit_handler=exit_handler, shell=self.is_local(), stdout_handler=stdout_handler)
cmd_pipe.add(cmd_item)
# return CmdPipe instead of executing?
if pipe:
return cmd_pipe
if cmd_pipe.should_execute():
self.debug("CMD > {}".format(cmd_pipe))
else:
self.debug("CMDSKIP> {}".format(cmd_pipe))
# execute and calls handlers in CmdPipe
if not cmd_pipe.execute(stdout_handler=stdout_handler):
if not cmd_pipe.execute():
raise(ExecuteError("Last command returned error"))
if return_all:
@ -164,3 +182,90 @@ class ExecuteNode(LogStub):
return output_lines, error_lines
else:
return output_lines
def script(self, lines, inp=None, stdout_handler=None, stderr_handler=None, exit_handler=None, valid_exitcodes=None, readonly=False, hide_errors=False, pipe=False):
"""Run a multiline script on the node.
This is much more low level than run() and allows for finer grained control.
Either uses a local shell (sh -c) or remote shell (ssh) to execute the command.
You need to do your own escaping/quoting.
It will do logging of stderr and exit codes, but you should
specify your stdout handler when calling CmdPipe.execute.
Also specify the optional stderr/exit code handlers if you need them.
Handlers are called for each line.
It wont collect lines internally like run() does, so streams of data can be of unlimited size.
:param lines: list of lines of the actual script.
:param inp: Can be None, a string or a CmdPipe that was previously returned.
:param readonly: make this True if the command doesn't make any changes and is safe to execute in testmode
:param valid_exitcodes: list of valid exit codes for this command. Use [] to accept all exit codes. Default [0]
:param hide_errors: don't show stderr output as error, instead show it as debugging output (use to hide expected errors)
:param pipe: return CmdPipe instead of executing it. (pipe this into another run() command via inp=...)
"""
# create new pipe?
if not isinstance(inp, CmdPipe):
cmd_pipe = CmdPipe(self.readonly, inp)
else:
# add stuff to existing pipe
cmd_pipe = inp
internal_stdout_handler=None
if stdout_handler is not None:
if self.debug_output:
def internal_stdout_handler(line):
self.debug("STDOUT > " + line.rstrip())
stdout_handler(line)
else:
internal_stdout_handler=stdout_handler
def internal_stderr_handler(line):
self._parse_stderr(line, hide_errors)
if stderr_handler is not None:
stderr_handler(line)
# exit code hanlder
if valid_exitcodes is None:
valid_exitcodes = [0]
def internal_exit_handler(exit_code):
if self.debug_output:
self.debug("EXIT > {}".format(exit_code))
if exit_handler is not None:
exit_handler(exit_code)
if (valid_exitcodes != []) and (exit_code not in valid_exitcodes):
self.error("Script returned exit code {} (valid codes: {})".format(exit_code, valid_exitcodes))
return False
return True
#build command
cmd=[]
#add remote shell
if not self.is_local():
#note: dont escape this part (executed directly without shell)
cmd.append("ssh")
if self.ssh_config is not None:
cmd.append(["-F", self.ssh_config])
cmd.append(self.ssh_to)
# convert to script
cmd.append("\n".join(lines))
# add shell command and handlers to pipe
cmd_item=CmdItem(cmd=cmd, readonly=readonly, stderr_handler=internal_stderr_handler, exit_handler=internal_exit_handler, stdout_handler=internal_stdout_handler, shell=self.is_local())
cmd_pipe.add(cmd_item)
self.debug("SCRIPT > {}".format(cmd_pipe))
if pipe:
return cmd_pipe
else:
return cmd_pipe.execute()

View File

@ -10,6 +10,7 @@ class LogConsole:
self.last_log = ""
self.show_debug = show_debug
self.show_verbose = show_verbose
self._progress_uncleared=False
if color:
# try to use color, failback if colorama not available
@ -25,6 +26,7 @@ class LogConsole:
self.colorama=False
def error(self, txt):
self.clear_progress()
if self.colorama:
print(colorama.Fore.RED + colorama.Style.BRIGHT + "! " + txt + colorama.Style.RESET_ALL, file=sys.stderr)
else:
@ -32,6 +34,7 @@ class LogConsole:
sys.stderr.flush()
def warning(self, txt):
self.clear_progress()
if self.colorama:
print(colorama.Fore.YELLOW + colorama.Style.BRIGHT + " NOTE: " + txt + colorama.Style.RESET_ALL)
else:
@ -40,6 +43,7 @@ class LogConsole:
def verbose(self, txt):
if self.show_verbose:
self.clear_progress()
if self.colorama:
print(colorama.Style.NORMAL + " " + txt + colorama.Style.RESET_ALL)
else:
@ -48,6 +52,7 @@ class LogConsole:
def debug(self, txt):
if self.show_debug:
self.clear_progress()
if self.colorama:
print(colorama.Fore.GREEN + "# " + txt + colorama.Style.RESET_ALL)
else:
@ -57,10 +62,13 @@ class LogConsole:
def progress(self, txt):
"""print progress output to stderr (stays on same line)"""
self.clear_progress()
self._progress_uncleared=True
print(">>> {}\r".format(txt), end='', file=sys.stderr)
sys.stderr.flush()
def clear_progress(self):
if self._progress_uncleared:
import colorama
print(colorama.ansi.clear_line(), end='', file=sys.stderr)
sys.stderr.flush()
# sys.stderr.flush()
self._progress_uncleared=False

View File

@ -0,0 +1,60 @@
import itertools
import os
class TreeHasher():
"""uses BlockHasher recursively on a directory tree
Input and output generators are in the format: ( relative-filepath, chunk_nr, hexdigest)
"""
def __init__(self, block_hasher):
"""
:type block_hasher: BlockHasher
"""
self.block_hasher=block_hasher
def generate(self, start_path):
"""Use BlockHasher on every file in a tree, yielding the results
note that it only checks the contents of actual files. It ignores metadata like permissions and mtimes.
It also ignores empty directories, symlinks and special files.
"""
def walkerror(e):
raise e
for (dirpath, dirnames, filenames) in os.walk(start_path, onerror=walkerror):
for f in filenames:
file_path=os.path.join(dirpath, f)
if (not os.path.islink(file_path)) and os.path.isfile(file_path):
for (chunk_nr, hash) in self.block_hasher.generate(file_path):
yield ( os.path.relpath(file_path,start_path), chunk_nr, hash )
def compare(self, start_path, generator):
"""reads from generator and compares blocks
yields mismatches in the form: ( relative_filename, chunk_nr, compare_hexdigest, actual_hexdigest )
yields errors in the form: ( relative_filename, chunk_nr, compare_hexdigest, "message" )
"""
count=0
def filter_file_name( file_name, chunk_nr, hexdigest):
return ( chunk_nr, hexdigest )
for file_name, group_generator in itertools.groupby(generator, lambda x: x[0]):
count=count+1
block_generator=itertools.starmap(filter_file_name, group_generator)
for ( chunk_nr, compare_hexdigest, actual_hexdigest) in self.block_hasher.compare(os.path.join(start_path,file_name), block_generator):
yield ( file_name, chunk_nr, compare_hexdigest, actual_hexdigest )

View File

@ -1,16 +1,11 @@
import argparse
import os.path
import sys
from .LogConsole import LogConsole
from .CliBase import CliBase
class ZfsAuto(object):
"""Common Base class, this class is always used subclassed. Look at ZfsAutobackup and ZfsAutoverify ."""
# also used by setup.py
VERSION = "3.2-alpha1"
HEADER = "{} v{} - (c)2021 E.H.Eefting (edwin@datux.nl)".format(os.path.basename(sys.argv[0]), VERSION)
class ZfsAuto(CliBase):
"""Common Base class for ZfsAutobackup and ZfsAutoverify ."""
def __init__(self, argv, print_arguments=True):
@ -19,46 +14,15 @@ class ZfsAuto(object):
self.property_name = None
self.exclude_paths = None
# helps with investigating failed regression tests:
if print_arguments:
print("ARGUMENTS: " + " ".join(argv))
self.args = self.parse_args(argv)
super(ZfsAuto, self).__init__(argv, print_arguments)
def parse_args(self, argv):
"""parse common arguments, setup logging, check and adjust parameters"""
parser=self.get_parser()
args = parser.parse_args(argv)
if args.help:
parser.print_help()
sys.exit(255)
if args.version:
print(self.HEADER)
sys.exit(255)
# auto enable progress?
if sys.stderr.isatty() and not args.no_progress:
args.progress = True
if args.debug_output:
args.debug = True
if args.test:
args.verbose = True
if args.debug:
args.verbose = True
self.log = LogConsole(show_debug=args.debug, show_verbose=args.verbose, color=sys.stdout.isatty())
self.verbose(self.HEADER)
self.verbose("")
args = super(ZfsAuto, self).parse_args(argv)
if args.backup_name == None:
parser.print_usage()
self.parser.print_usage()
self.log.error("Please specify BACKUP-NAME")
sys.exit(255)
@ -82,7 +46,8 @@ class ZfsAuto(object):
self.verbose("NOTE: Source and target are on the same host, excluding target-path from selection.")
self.exclude_paths.append(args.target_path)
else:
self.verbose("NOTE: Source and target are on the same host, excluding received datasets from selection.")
if not args.exclude_received:
self.verbose("NOTE: Source and target are on the same host, adding --exclude-received to commandline.")
args.exclude_received = True
if args.test:
@ -101,8 +66,7 @@ class ZfsAuto(object):
def get_parser(self):
parser = argparse.ArgumentParser(description=self.HEADER, add_help=False,
epilog='Full manual at: https://github.com/psy0rz/zfs_autobackup')
parser = super(ZfsAuto, self).get_parser()
#positional arguments
parser.add_argument('backup_name', metavar='BACKUP-NAME', default=None, nargs='?',
@ -111,32 +75,13 @@ class ZfsAuto(object):
parser.add_argument('target_path', metavar='TARGET-PATH', default=None, nargs='?',
help='Target ZFS filesystem (optional)')
# Basic options
group=parser.add_argument_group("Basic options")
group.add_argument('--help', '-h', action='store_true', help='show help')
group.add_argument('--test', '--dry-run', '-n', action='store_true',
help='Dry run, dont change anything, just show what would be done (still does all read-only '
'operations)')
group.add_argument('--verbose', '-v', action='store_true', help='verbose output')
group.add_argument('--debug', '-d', action='store_true',
help='Show zfs commands that are executed, stops after an exception.')
group.add_argument('--debug-output', action='store_true',
help='Show zfs commands and their output/exit codes. (noisy)')
group.add_argument('--progress', action='store_true',
help='show zfs progress output. Enabled automaticly on ttys. (use --no-progress to disable)')
group.add_argument('--no-progress', action='store_true',
help=argparse.SUPPRESS) # needed to workaround a zfs recv -v bug
group.add_argument('--version', action='store_true',
help='Show version.')
group.add_argument('--strip-path', metavar='N', default=0, type=int,
help='Number of directories to strip from target path (use 1 when cloning zones between 2 '
'SmartOS machines)')
# SSH options
group=parser.add_argument_group("SSH options")
group.add_argument('--ssh-config', metavar='CONFIG-FILE', default=None, help='Custom ssh client config')
group.add_argument('--ssh-source', metavar='USER@HOST', default=None,
help='Source host to get backup from.')
help='Source host to pull backup from.')
group.add_argument('--ssh-target', metavar='USER@HOST', default=None,
help='Target host to push backup to.')
@ -147,6 +92,9 @@ class ZfsAuto(object):
help='ZFS Snapshot string format. Default: %(default)s')
group.add_argument('--hold-format', metavar='FORMAT', default="zfs_autobackup:{}",
help='ZFS hold string format. Default: %(default)s')
group.add_argument('--strip-path', metavar='N', default=0, type=int,
help='Number of directories to strip from target path (use 1 when cloning zones between 2 '
'SmartOS machines)')
group=parser.add_argument_group("Selection options")
group.add_argument('--ignore-replicated', action='store_true', help=argparse.SUPPRESS)
@ -158,28 +106,6 @@ class ZfsAuto(object):
return parser
def verbose(self, txt):
self.log.verbose(txt)
def warning(self, txt):
self.log.warning(txt)
def error(self, txt):
self.log.error(txt)
def debug(self, txt):
self.log.debug(txt)
def progress(self, txt):
self.log.progress(txt)
def clear_progress(self):
self.log.clear_progress()
def set_title(self, title):
self.log.verbose("")
self.log.verbose("#### " + title)
def print_error_sources(self):
self.error(
"No source filesystems selected, please do a 'zfs set autobackup:{0}=true' on the source datasets "

View File

@ -1,6 +1,9 @@
import time
import argparse
from signal import signal, SIGPIPE
from .util import output_redir, sigpipe_handler
from .ZfsAuto import ZfsAuto
from . import compressors
@ -9,7 +12,7 @@ from .Thinner import Thinner
from .ZfsDataset import ZfsDataset
from .ZfsNode import ZfsNode
from .ThinnerRule import ThinnerRule
import os.path
class ZfsAutobackup(ZfsAuto):
"""The main zfs-autobackup class. Start here, at run() :)"""
@ -88,6 +91,8 @@ class ZfsAutobackup(ZfsAuto):
group.add_argument('--rollback', action='store_true',
help='Rollback changes to the latest target snapshot before starting. (normally you can '
'prevent changes by setting the readonly property on the target_path to on)')
group.add_argument('--force', '-F', action='store_true',
help='Use zfs -F option to force overwrite/rollback. (Usefull with --strip-path=1, but use with care)')
group.add_argument('--destroy-incompatible', action='store_true',
help='Destroy incompatible snapshots on target. Use with care! (implies --rollback)')
group.add_argument('--ignore-transfer-errors', action='store_true',
@ -154,8 +159,8 @@ class ZfsAutobackup(ZfsAuto):
except Exception as e:
dataset.error("Error during thinning of missing datasets ({})".format(str(e)))
if self.args.progress:
self.clear_progress()
# if self.args.progress:
# self.clear_progress()
# NOTE: this method also uses self.args. args that need extra processing are passed as function parameters:
def destroy_missing_targets(self, target_dataset, used_target_datasets):
@ -214,13 +219,13 @@ class ZfsAutobackup(ZfsAuto):
dataset.destroy(fail_exception=True)
except Exception as e:
if self.args.progress:
self.clear_progress()
# if self.args.progress:
# self.clear_progress()
dataset.error("Error during --destroy-missing: {}".format(str(e)))
if self.args.progress:
self.clear_progress()
# if self.args.progress:
# self.clear_progress()
def get_send_pipes(self, logger):
"""determine the zfs send pipe"""
@ -278,6 +283,29 @@ class ZfsAutobackup(ZfsAuto):
return ret
def make_target_name(self, source_dataset):
"""make target_name from a source_dataset"""
stripped=source_dataset.lstrip_path(self.args.strip_path)
if stripped!="":
return self.args.target_path + "/" + stripped
else:
return self.args.target_path
def check_target_names(self, source_node, source_datasets, target_node):
"""check all target names for collesions etc due to strip-options"""
self.debug("Checking target names:")
target_datasets={}
for source_dataset in source_datasets:
target_name = self.make_target_name(source_dataset)
source_dataset.debug("-> {}".format(target_name))
if target_name in target_datasets:
raise Exception("Target collision: Target path {} encountered twice, due to: {} and {}".format(target_name, source_dataset, target_datasets[target_name]))
target_datasets[target_name]=source_dataset
# NOTE: this method also uses self.args. args that need extra processing are passed as function parameters:
def sync_datasets(self, source_node, source_datasets, target_node):
"""Sync datasets, or thin-only on both sides
@ -308,6 +336,7 @@ class ZfsAutobackup(ZfsAuto):
# ensure parents exists
# TODO: this isnt perfect yet, in some cases it can create parents when it shouldn't.
if not self.args.no_send \
and target_dataset.parent \
and target_dataset.parent not in target_datasets \
and not target_dataset.parent.exists:
target_dataset.parent.create_filesystem(parents=True)
@ -328,10 +357,10 @@ class ZfsAutobackup(ZfsAuto):
destroy_incompatible=self.args.destroy_incompatible,
send_pipes=send_pipes, recv_pipes=recv_pipes,
decrypt=self.args.decrypt, encrypt=self.args.encrypt,
zfs_compressed=self.args.zfs_compressed)
zfs_compressed=self.args.zfs_compressed, force=self.args.force)
except Exception as e:
if self.args.progress:
self.clear_progress()
# if self.args.progress:
# self.clear_progress()
fail_count = fail_count + 1
source_dataset.error("FAILED: " + str(e))
@ -339,8 +368,8 @@ class ZfsAutobackup(ZfsAuto):
self.verbose("Debug mode, aborting on first error")
raise
if self.args.progress:
self.clear_progress()
# if self.args.progress:
# self.clear_progress()
target_path_dataset = target_node.get_dataset(self.args.target_path)
if not self.args.no_thinning:
@ -445,6 +474,9 @@ class ZfsAutobackup(ZfsAuto):
raise (Exception(
"Target path '{}' does not exist. Please create this dataset first.".format(target_dataset)))
# check for collisions due to strip-path
self.check_target_names(source_node, source_datasets, target_node)
# do the actual sync
# NOTE: even with no_send, no_thinning and no_snapshot it does a usefull thing because it checks if the common snapshots and shows incompatible snapshots
fail_count = self.sync_datasets(
@ -474,6 +506,7 @@ class ZfsAutobackup(ZfsAuto):
self.verbose("")
self.warning("TEST MODE - DID NOT MAKE ANY CHANGES!")
self.clear_progress()
return fail_count
except Exception as e:
@ -489,6 +522,8 @@ class ZfsAutobackup(ZfsAuto):
def cli():
import sys
signal(SIGPIPE, sigpipe_handler)
sys.exit(ZfsAutobackup(sys.argv[1:], False).run())

View File

@ -1,84 +1,70 @@
import os
import time
# from util import activate_volume_snapshot, create_mountpoints, cleanup_mountpoint
from signal import signal, SIGPIPE
from .util import output_redir, sigpipe_handler
from .ExecuteNode import ExecuteNode
from .ZfsAuto import ZfsAuto
from .ZfsDataset import ZfsDataset
from .ZfsNode import ZfsNode
import sys
import platform
def tmp_name(suffix=""):
"""create temporary name unique to this process and node"""
#we could use uuids but those are ugly and confusing
name="zfstmp_{}_{}".format(platform.node(), os.getpid())
name=name+suffix
return name
def hash_tree_tar(node, path):
"""calculate md5sum of a directory tree, using tar"""
node.debug("Hashing filesystem {} ".format(path))
cmd=[ "tar", "-cf", "-", "-C", path, ".",
ExecuteNode.PIPE, "md5sum"]
stdout = node.run(cmd)
if node.readonly:
hashed=None
else:
hashed = stdout[0].split(" ")[0]
node.debug("Hash of {} filesytem is {}".format(path, hashed))
return hashed
def compare_trees_tar(source_node, source_path, target_node, target_path):
"""compare two trees using tar. compatible and simple"""
source_hash= hash_tree_tar(source_node, source_path)
target_hash= hash_tree_tar(target_node, target_path)
if source_hash != target_hash:
raise Exception("md5hash difference: {} != {}".format(source_hash, target_hash))
def compare_trees_rsync(source_node, source_path, target_node, target_path):
"""use rsync to compare two trees.
Advantage is that we can see which individual files differ.
But requires rsync and cant do remote to remote."""
cmd = ["rsync", "-rcn", "--info=COPY,DEL,MISC,NAME,SYMSAFE", "--msgs2stderr", "--delete" ]
#local
if source_node.ssh_to is None and target_node.ssh_to is None:
cmd.append("{}/".format(source_path))
cmd.append("{}/".format(target_path))
source_node.debug("Running rsync locally, on source.")
stdout, stderr = source_node.run(cmd, return_stderr=True)
#source is local
elif source_node.ssh_to is None and target_node.ssh_to is not None:
cmd.append("{}/".format(source_path))
cmd.append("{}:{}/".format(target_node.ssh_to, target_path))
source_node.debug("Running rsync locally, on source.")
stdout, stderr = source_node.run(cmd, return_stderr=True)
#target is local
elif source_node.ssh_to is not None and target_node.ssh_to is None:
cmd.append("{}:{}/".format(source_node.ssh_to, source_path))
cmd.append("{}/".format(target_path))
source_node.debug("Running rsync locally, on target.")
stdout, stderr=target_node.run(cmd, return_stderr=True)
else:
raise Exception("Source and target cant both be remote when verifying. (rsync limitation)")
if stderr:
raise Exception("Dataset verify failed, see above list for differences")
# # try to be as unix compatible as possible, while still having decent performance
# def compare_trees_find(source_node, source_path, target_node, target_path):
# # find /tmp/zfstmp_pve1_1993135target/ -xdev -type f -print0 | xargs -0 md5sum | md5sum -c
#
# #verify tree has atleast one file
#
# stdout=source_node.run(["find", ".", "-type", "f",
# ExecuteNode.PIPE, "head", "-n1",
# ], cwd=source_path)
#
# if not stdout:
# source_node.debug("No files, skipping check")
# else:
# pipe=source_node.run(["find", ".", "-type", "f", "-print0",
# ExecuteNode.PIPE, "xargs", "-0", "md5sum"
# ], pipe=True, cwd=source_path)
# stdout=target_node.run([ "md5sum", "-c", "--quiet"], inp=pipe, cwd=target_path, valid_exitcodes=[0,1])
#
# if len(stdout):
# for line in stdout:
# target_node.error("md5sum: "+line)
#
# raise(Exception("Some files have checksum errors"))
#
#
# def compare_trees_rsync(source_node, source_path, target_node, target_path):
# """use rsync to compare two trees.
# Advantage is that we can see which individual files differ.
# But requires rsync and cant do remote to remote."""
#
# cmd = ["rsync", "-rcnq", "--info=COPY,DEL,MISC,NAME,SYMSAFE", "--msgs2stderr", "--delete" ]
#
# #local
# if source_node.ssh_to is None and target_node.ssh_to is None:
# cmd.append("{}/".format(source_path))
# cmd.append("{}/".format(target_path))
# source_node.debug("Running rsync locally, on source.")
# stdout, stderr = source_node.run(cmd, return_stderr=True)
#
# #source is local
# elif source_node.ssh_to is None and target_node.ssh_to is not None:
# cmd.append("{}/".format(source_path))
# cmd.append("{}:{}/".format(target_node.ssh_to, target_path))
# source_node.debug("Running rsync locally, on source.")
# stdout, stderr = source_node.run(cmd, return_stderr=True)
#
# #target is local
# elif source_node.ssh_to is not None and target_node.ssh_to is None:
# cmd.append("{}:{}/".format(source_node.ssh_to, source_path))
# cmd.append("{}/".format(target_path))
# source_node.debug("Running rsync locally, on target.")
# stdout, stderr=target_node.run(cmd, return_stderr=True)
#
# else:
# raise Exception("Source and target cant both be remote when verifying. (rsync limitation)")
#
# if stderr:
# raise Exception("Dataset verify failed, see above list for differences")
def verify_filesystem(source_snapshot, source_mnt, target_snapshot, target_mnt, method):
@ -92,8 +78,10 @@ def verify_filesystem(source_snapshot, source_mnt, target_snapshot, target_mnt,
if method=='rsync':
compare_trees_rsync(source_snapshot.zfs_node, source_mnt, target_snapshot.zfs_node, target_mnt)
elif method == 'tar':
compare_trees_tar(source_snapshot.zfs_node, source_mnt, target_snapshot.zfs_node, target_mnt)
# elif method == 'tar':
# compare_trees_tar(source_snapshot.zfs_node, source_mnt, target_snapshot.zfs_node, target_mnt)
elif method == 'find':
compare_trees_find(source_snapshot.zfs_node, source_mnt, target_snapshot.zfs_node, target_mnt)
else:
raise(Exception("program errror, unknown method"))
@ -102,101 +90,35 @@ def verify_filesystem(source_snapshot, source_mnt, target_snapshot, target_mnt,
target_snapshot.unmount()
def hash_dev(node, dev):
"""calculate md5sum of a device on a node"""
node.debug("Hashing volume {} ".format(dev))
cmd = [ "md5sum", dev ]
stdout = node.run(cmd)
if node.readonly:
hashed=None
else:
hashed = stdout[0].split(" ")[0]
node.debug("Hash of volume {} is {}".format(dev, hashed))
return hashed
# def activate_volume_snapshot(dataset, snapshot):
# """enables snapdev, waits and tries to findout /dev path to the volume, in a compatible way. (linux/freebsd/smartos)"""
# def hash_dev(node, dev):
# """calculate md5sum of a device on a node"""
#
# dataset.set("snapdev", "visible")
# node.debug("Hashing volume {} ".format(dev))
#
# #NOTE: add smartos location to this list as well
# locations=[
# "/dev/zvol/" + snapshot.name
# ]
# cmd = [ "md5sum", dev ]
#
# dataset.debug("Waiting for /dev entry to appear...")
# time.sleep(0.1)
# stdout = node.run(cmd)
#
# start_time=time.time()
# while time.time()-start_time<10:
# for location in locations:
# stdout, stderr, exit_code=dataset.zfs_node.run(["test", "-e", location], return_all=True, valid_exitcodes=[0,1])
# if node.readonly:
# hashed=None
# else:
# hashed = stdout[0].split(" ")[0]
#
# #fake it in testmode
# if dataset.zfs_node.readonly:
# return location
# node.debug("Hash of volume {} is {}".format(dev, hashed))
#
# if exit_code==0:
# return location
# time.sleep(1)
#
# raise(Exception("Timeout while waiting for {} entry to appear.".format(locations)))
#
# def deacitvate_volume_snapshot(dataset):
# dataset.inherit("snapdev")
# return hashed
#NOTE: https://www.google.com/search?q=Mount+Path+Limit+freebsd
#Freebsd has limitations regarding path length, so we cant use the above method.
#Instead we create a temporary clone
def get_tmp_clone_name(snapshot):
pool=snapshot.zfs_node.get_pool(snapshot)
return pool.name+"/"+tmp_name()
def activate_volume_snapshot(snapshot):
"""clone volume, waits and tries to findout /dev path to the volume, in a compatible way. (linux/freebsd/smartos)"""
clone_name=get_tmp_clone_name(snapshot)
clone=snapshot.clone(clone_name)
#NOTE: add smartos location to this list as well
locations=[
"/dev/zvol/" + clone_name
]
clone.debug("Waiting for /dev entry to appear...")
time.sleep(0.1)
start_time=time.time()
while time.time()-start_time<10:
for location in locations:
stdout, stderr, exit_code=clone.zfs_node.run(["test", "-e", location], return_all=True, valid_exitcodes=[0,1])
#fake it in testmode
if clone.zfs_node.readonly:
return location
if exit_code==0:
return location
time.sleep(1)
raise(Exception("Timeout while waiting for {} entry to appear.".format(locations)))
def deacitvate_volume_snapshot(snapshot):
clone_name=get_tmp_clone_name(snapshot)
clone=snapshot.zfs_node.get_dataset(clone_name)
clone.destroy()
# def deacitvate_volume_snapshot(snapshot):
# clone_name=get_tmp_clone_name(snapshot)
# clone=snapshot.zfs_node.get_dataset(clone_name)
# clone.destroy(deferred=True, verbose=False)
def verify_volume(source_dataset, source_snapshot, target_dataset, target_snapshot):
"""compare the contents of two zfs volume snapshots"""
try:
# try:
source_dev= activate_volume_snapshot(source_snapshot)
target_dev= activate_volume_snapshot(target_snapshot)
@ -206,28 +128,16 @@ def verify_volume(source_dataset, source_snapshot, target_dataset, target_snapsh
if source_hash!=target_hash:
raise Exception("md5hash difference: {} != {}".format(source_hash, target_hash))
finally:
deacitvate_volume_snapshot(source_snapshot)
deacitvate_volume_snapshot(target_snapshot)
def create_mountpoints(source_node, target_node):
# prepare mount points
source_node.debug("Create temporary mount point")
source_mnt = "/tmp/"+tmp_name("source")
source_node.run(["mkdir", source_mnt])
target_node.debug("Create temporary mount point")
target_mnt = "/tmp/"+tmp_name("target")
target_node.run(["mkdir", target_mnt])
return source_mnt, target_mnt
# finally:
# deacitvate_volume_snapshot(source_snapshot)
# deacitvate_volume_snapshot(target_snapshot)
def cleanup_mountpoint(node, mnt):
node.debug("Cleaning up temporary mount point")
node.run([ "rmdir", mnt ], hide_errors=True, valid_exitcodes=[] )
# class ZfsAutoChecksumVolume(ZfsAuto):
# def __init__(self, argv, print_arguments=True):
#
# # NOTE: common options and parameters are in ZfsAuto
# super(ZfsAutoverify, self).__init__(argv, print_arguments)
class ZfsAutoverify(ZfsAuto):
"""The zfs-autoverify class, default agruments and stuff come from ZfsAuto"""
@ -254,8 +164,8 @@ class ZfsAutoverify(ZfsAuto):
parser=super(ZfsAutoverify, self).get_parser()
group=parser.add_argument_group("Verify options")
group.add_argument('--fs-compare', metavar='METHOD', default="tar", choices=["tar", "rsync"],
help='Compare method to use for filesystems. (tar, rsync) Default: %(default)s ')
group.add_argument('--fs-compare', metavar='METHOD', default="find", choices=["find", "rsync"],
help='Compare method to use for filesystems. (find, rsync) Default: %(default)s ')
return parser
@ -293,8 +203,8 @@ class ZfsAutoverify(ZfsAuto):
except Exception as e:
if self.args.progress:
self.clear_progress()
# if self.args.progress:
# self.clear_progress()
fail_count = fail_count + 1
target_dataset.error("FAILED: " + str(e))
@ -302,8 +212,8 @@ class ZfsAutoverify(ZfsAuto):
self.verbose("Debug mode, aborting on first error")
raise
if self.args.progress:
self.clear_progress()
# if self.args.progress:
# self.clear_progress()
return fail_count
@ -395,6 +305,8 @@ class ZfsAutoverify(ZfsAuto):
def cli():
import sys
signal(SIGPIPE, sigpipe_handler)
sys.exit(ZfsAutoverify(sys.argv[1:], False).run())

310
zfs_autobackup/ZfsCheck.py Normal file
View File

@ -0,0 +1,310 @@
from __future__ import print_function
import time
from signal import signal, SIGPIPE
from . import util
from .TreeHasher import TreeHasher
from .BlockHasher import BlockHasher
from .ZfsNode import ZfsNode
from .util import *
from .CliBase import CliBase
class ZfsCheck(CliBase):
def __init__(self, argv, print_arguments=True):
# NOTE: common options argument parsing are in CliBase
super(ZfsCheck, self).__init__(argv, print_arguments)
self.node = ZfsNode(self.log, readonly=self.args.test, debug_output=self.args.debug_output)
self.block_hasher = BlockHasher(count=self.args.count, bs=self.args.block_size, skip=self.args.skip)
def get_parser(self):
parser = super(ZfsCheck, self).get_parser()
# positional arguments
parser.add_argument('target', metavar='TARGET', default=None, nargs='?', help='Target to checkum. (can be blockdevice, directory or ZFS snapshot)')
group = parser.add_argument_group('Checker options')
group.add_argument('--block-size', metavar="BYTES", default=4096, help="Read block-size, default %(default)s",
type=int)
group.add_argument('--count', metavar="COUNT", default=int((100 * (1024 ** 2)) / 4096),
help="Hash chunks of COUNT blocks. Default %(default)s . (CHUNK size is BYTES * COUNT) ", type=int) # 100MiB
group.add_argument('--check', '-c', metavar="FILE", default=None, const=True, nargs='?',
help="Read hashes from STDIN (or FILE) and compare them")
group.add_argument('--skip', '-s', metavar="NUMBER", default=0, type=int,
help="Skip this number of chunks after every hash. %(default)s")
return parser
def parse_args(self, argv):
args = super(ZfsCheck, self).parse_args(argv)
if args.test:
self.warning("TEST MODE - WILL ONLY DO READ-ONLY STUFF")
if args.target is None:
self.error("Please specify TARGET")
sys.exit(1)
self.verbose("Target : {}".format(args.target))
self.verbose("Block size : {} bytes".format(args.block_size))
self.verbose("Block count : {}".format(args.count))
self.verbose("Effective chunk size : {} bytes".format(args.count*args.block_size))
self.verbose("Skip chunk count : {} (checks {:.2f}% of data)".format(args.skip, 100/(1+args.skip)))
self.verbose("")
return args
def prepare_zfs_filesystem(self, snapshot):
mnt = "/tmp/" + tmp_name()
self.debug("Create temporary mount point {}".format(mnt))
self.node.run(["mkdir", mnt])
snapshot.mount(mnt)
return mnt
def cleanup_zfs_filesystem(self, snapshot):
mnt = "/tmp/" + tmp_name()
snapshot.unmount()
self.debug("Cleaning up temporary mount point")
self.node.run(["rmdir", mnt], hide_errors=True, valid_exitcodes=[])
# NOTE: https://www.google.com/search?q=Mount+Path+Limit+freebsd
# Freebsd has limitations regarding path length, so we have to clone it so the part stays sort
def prepare_zfs_volume(self, snapshot):
"""clone volume, waits and tries to findout /dev path to the volume, in a compatible way. (linux/freebsd/smartos)"""
clone_name = get_tmp_clone_name(snapshot)
clone = snapshot.clone(clone_name)
# TODO: add smartos location to this list as well
locations = [
"/dev/zvol/" + clone_name
]
clone.debug("Waiting for /dev entry to appear in: {}".format(locations))
time.sleep(0.1)
start_time = time.time()
while time.time() - start_time < 10:
for location in locations:
if os.path.exists(location):
return location
# fake it in testmode
if self.args.test:
return location
time.sleep(1)
raise (Exception("Timeout while waiting for /dev entry to appear. (looking in: {})".format(locations)))
def cleanup_zfs_volume(self, snapshot):
"""destroys temporary volume snapshot"""
clone_name = get_tmp_clone_name(snapshot)
clone = snapshot.zfs_node.get_dataset(clone_name)
clone.destroy(deferred=True, verbose=False)
def generate_tree_hashes(self, prepared_target):
tree_hasher = TreeHasher(self.block_hasher)
self.debug("Hashing tree: {}".format(prepared_target))
for i in tree_hasher.generate(prepared_target):
yield i
def generate_tree_compare(self, prepared_target, input_generator=None):
tree_hasher = TreeHasher(self.block_hasher)
self.debug("Comparing tree: {}".format(prepared_target))
for i in tree_hasher.compare(prepared_target, input_generator):
yield i
def generate_file_hashes(self, prepared_target):
self.debug("Hashing file: {}".format(prepared_target))
for i in self.block_hasher.generate(prepared_target):
yield i
def generate_file_compare(self, prepared_target, input_generator=None):
self.debug("Comparing file: {}".format(prepared_target))
for i in self.block_hasher.compare(prepared_target, input_generator):
yield i
def generate_input(self):
"""parse input lines and yield items to use in compare functions"""
if self.args.check is True:
input_fh=sys.stdin
else:
input_fh=open(self.args.check, 'r')
last_progress_time = time.time()
progress_checked = 0
progress_skipped = 0
line=input_fh.readline()
skip=0
while line:
i=line.rstrip().split("\t")
#ignores lines without tabs
if (len(i)>1):
if skip==0:
progress_checked=progress_checked+1
yield i
skip=self.args.skip
else:
skip=skip-1
progress_skipped=progress_skipped+1
if self.args.progress and time.time() - last_progress_time > 1:
last_progress_time = time.time()
self.progress("Checked {} hashes (skipped {})".format(progress_checked, progress_skipped))
line=input_fh.readline()
self.verbose("Checked {} hashes (skipped {})".format(progress_checked, progress_skipped))
def print_hashes(self, hash_generator):
"""prints hashes that are yielded by the specified hash_generator"""
last_progress_time = time.time()
progress_count = 0
for i in hash_generator:
if len(i) == 3:
print("{}\t{}\t{}".format(*i))
else:
print("{}\t{}".format(*i))
progress_count = progress_count + 1
if self.args.progress and time.time() - last_progress_time > 1:
last_progress_time = time.time()
self.progress("Generated {} hashes.".format(progress_count))
sys.stdout.flush()
self.verbose("Generated {} hashes.".format(progress_count))
self.clear_progress()
return 0
def print_errors(self, compare_generator):
"""prints errors that are yielded by the specified compare_generator"""
errors = 0
for i in compare_generator:
errors = errors + 1
if len(i) == 4:
(file_name, chunk_nr, compare_hexdigest, actual_hexdigest) = i
print("{}: Chunk {} failed: {} {}".format(file_name, chunk_nr, compare_hexdigest, actual_hexdigest))
else:
(chunk_nr, compare_hexdigest, actual_hexdigest) = i
print("Chunk {} failed: {} {}".format(chunk_nr, compare_hexdigest, actual_hexdigest))
sys.stdout.flush()
self.verbose("Total errors: {}".format(errors))
self.clear_progress()
return errors
def prepare_target(self):
if "@" in self.args.target:
# zfs snapshot
snapshot=self.node.get_dataset(self.args.target)
if not snapshot.exists:
raise Exception("ZFS snapshot {} does not exist!".format(snapshot))
dataset_type = snapshot.parent.properties['type']
if dataset_type == 'volume':
return self.prepare_zfs_volume(snapshot)
elif dataset_type == 'filesystem':
return self.prepare_zfs_filesystem(snapshot)
else:
raise Exception("Unknown dataset type")
return self.args.target
def cleanup_target(self):
if "@" in self.args.target:
# zfs snapshot
snapshot=self.node.get_dataset(self.args.target)
if not snapshot.exists:
return
dataset_type = snapshot.parent.properties['type']
if dataset_type == 'volume':
self.cleanup_zfs_volume(snapshot)
elif dataset_type == 'filesystem':
self.cleanup_zfs_filesystem(snapshot)
def run(self):
compare_generator=None
hash_generator=None
try:
prepared_target=self.prepare_target()
is_dir=os.path.isdir(prepared_target)
#run as compare
if self.args.check is not None:
input_generator=self.generate_input()
if is_dir:
compare_generator = self.generate_tree_compare(prepared_target, input_generator)
else:
compare_generator=self.generate_file_compare(prepared_target, input_generator)
errors=self.print_errors(compare_generator)
#run as generator
else:
if is_dir:
hash_generator = self.generate_tree_hashes(prepared_target)
else:
hash_generator=self.generate_file_hashes(prepared_target)
errors=self.print_hashes(hash_generator)
except Exception as e:
self.error("Exception: " + str(e))
if self.args.debug:
raise
return 255
except KeyboardInterrupt:
self.error("Aborted")
return 255
finally:
#important to call check_output so that cleanup still functions in case of a broken pipe:
# util.check_output()
#close generators, to make sure files are not in use anymore when cleaning up
if hash_generator is not None:
hash_generator.close()
if compare_generator is not None:
compare_generator.close()
self.cleanup_target()
return errors
def cli():
import sys
signal(SIGPIPE, sigpipe_handler)
sys.exit(ZfsCheck(sys.argv[1:], False).run())
if __name__ == "__main__":
cli()

View File

@ -79,7 +79,11 @@ class ZfsDataset:
Args:
:type count: int
"""
return "/".join(self.split_path()[count:])
components=self.split_path()
if count>len(components):
raise Exception("Trying to strip too much from path ({} items from {})".format(count, self.name))
return "/".join(components[count:])
def rstrip_path(self, count):
"""return name with last count components stripped
@ -184,11 +188,17 @@ class ZfsDataset:
parent according to path
we cache this so everything in the parent that is cached also stays.
returns None if there is no parent.
"""
if self.is_snapshot:
return self.zfs_node.get_dataset(self.filesystem_name)
else:
return self.zfs_node.get_dataset(self.rstrip_path(1))
stripped=self.rstrip_path(1)
if stripped:
return self.zfs_node.get_dataset(stripped)
else:
return None
# NOTE: unused for now
# def find_prev_snapshot(self, snapshot, also_other_snapshots=False):
@ -260,7 +270,7 @@ class ZfsDataset:
self.force_exists = True
def destroy(self, fail_exception=False):
def destroy(self, fail_exception=False, deferred=False, verbose=True):
"""destroy the dataset. by default failures are not an exception, so we
can continue making backups
@ -268,13 +278,20 @@ class ZfsDataset:
:type fail_exception: bool
"""
if verbose:
self.verbose("Destroying")
else:
self.debug("Destroying")
if self.is_snapshot:
self.release()
try:
if deferred and self.is_snapshot:
self.zfs_node.run(["zfs", "destroy", "-d", self.name])
else:
self.zfs_node.run(["zfs", "destroy", self.name])
self.invalidate()
self.force_exists = False
return True
@ -577,7 +594,7 @@ class ZfsDataset:
return output_pipe
def recv_pipe(self, pipe, features, recv_pipes, filter_properties=None, set_properties=None, ignore_exit_code=False):
def recv_pipe(self, pipe, features, recv_pipes, filter_properties=None, set_properties=None, ignore_exit_code=False, force=False):
"""starts a zfs recv for this snapshot and uses pipe as input
note: you can it both on a snapshot or filesystem object. The
@ -618,6 +635,9 @@ class ZfsDataset:
# verbose output
cmd.append("-v")
if force:
cmd.append("-F")
if 'extensible_dataset' in features and "-s" in self.zfs_node.supported_recv_options:
# support resuming
self.debug("Enabled resume support")
@ -630,7 +650,7 @@ class ZfsDataset:
else:
valid_exitcodes = [0]
self.zfs_node.reset_progress()
# self.zfs_node.reset_progress()
self.zfs_node.run(cmd, inp=pipe, valid_exitcodes=valid_exitcodes)
# invalidate cache, but we at least know we exist now
@ -648,7 +668,7 @@ class ZfsDataset:
def transfer_snapshot(self, target_snapshot, features, prev_snapshot, show_progress,
filter_properties, set_properties, ignore_recv_exit_code, resume_token,
raw, send_properties, write_embedded, send_pipes, recv_pipes, zfs_compressed):
raw, send_properties, write_embedded, send_pipes, recv_pipes, zfs_compressed, force):
"""transfer this snapshot to target_snapshot. specify prev_snapshot for
incremental transfer
@ -689,7 +709,7 @@ class ZfsDataset:
pipe = self.send_pipe(features=features, show_progress=show_progress, prev_snapshot=prev_snapshot,
resume_token=resume_token, raw=raw, send_properties=send_properties, write_embedded=write_embedded, send_pipes=send_pipes, zfs_compressed=zfs_compressed)
target_snapshot.recv_pipe(pipe, features=features, filter_properties=filter_properties,
set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code, recv_pipes=recv_pipes)
set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code, recv_pipes=recv_pipes, force=force)
def abort_resume(self):
"""abort current resume state"""
@ -980,7 +1000,7 @@ class ZfsDataset:
def sync_snapshots(self, target_dataset, features, show_progress, filter_properties, set_properties,
ignore_recv_exit_code, holds, rollback, decrypt, encrypt, also_other_snapshots,
no_send, destroy_incompatible, send_pipes, recv_pipes, zfs_compressed):
no_send, destroy_incompatible, send_pipes, recv_pipes, zfs_compressed, force):
"""sync this dataset's snapshots to target_dataset, while also thinning
out old snapshots along the way.
@ -1001,6 +1021,8 @@ class ZfsDataset:
:type destroy_incompatible: bool
"""
self.verbose("sending to {}".format(target_dataset))
(common_snapshot, start_snapshot, source_obsoletes, target_obsoletes, target_keeps,
incompatible_target_snapshots) = \
self._plan_sync(target_dataset=target_dataset, also_other_snapshots=also_other_snapshots)
@ -1063,7 +1085,9 @@ class ZfsDataset:
filter_properties=active_filter_properties,
set_properties=active_set_properties,
ignore_recv_exit_code=ignore_recv_exit_code,
resume_token=resume_token, write_embedded=write_embedded, raw=raw, send_properties=send_properties, send_pipes=send_pipes, recv_pipes=recv_pipes, zfs_compressed=zfs_compressed)
resume_token=resume_token, write_embedded=write_embedded, raw=raw,
send_properties=send_properties, send_pipes=send_pipes,
recv_pipes=recv_pipes, zfs_compressed=zfs_compressed, force=force)
resume_token = None
@ -1113,9 +1137,10 @@ class ZfsDataset:
self.debug("Unmounting")
cmd = [
"umount", self.name
"umount", "-l", self.name
]
self.zfs_node.run(cmd=cmd, valid_exitcodes=[0])
def clone(self, name):

View File

@ -17,7 +17,7 @@ from .ExecuteNode import ExecuteError
class ZfsNode(ExecuteNode):
"""a node that contains zfs datasets. implements global (systemwide/pool wide) zfs commands"""
def __init__(self, snapshot_time_format, hold_name, logger, ssh_config=None, ssh_to=None, readonly=False,
def __init__(self, logger, snapshot_time_format="", hold_name="", ssh_config=None, ssh_to=None, readonly=False,
description="",
debug_output=False, thinner=None):
@ -32,9 +32,9 @@ class ZfsNode(ExecuteNode):
self.verbose("Using custom SSH config: {}".format(ssh_config))
if ssh_to:
self.verbose("Datasets on: {}".format(ssh_to))
else:
self.verbose("Datasets are local")
self.verbose("SSH to: {}".format(ssh_to))
# else:
# self.verbose("Datasets are local")
if thinner is not None:
rules = thinner.human_rules()
@ -107,12 +107,12 @@ class ZfsNode(ExecuteNode):
def get_dataset(self, name, force_exists=None):
"""get a ZfsDataset() object from name. stores objects internally to enable caching"""
return self.__datasets.setdefault(name, ZfsDataset(self, name))
return self.__datasets.setdefault(name, ZfsDataset(self, name, force_exists))
def reset_progress(self):
"""reset progress output counters"""
self._progress_total_bytes = 0
self._progress_start_time = time.time()
# def reset_progress(self):
# """reset progress output counters"""
# self._progress_total_bytes = 0
# self._progress_start_time = time.time()
def parse_zfs_progress(self, line, hide_errors, prefix):
"""try to parse progress output of zfs recv -Pv, and don't show it as error to the user """
@ -132,9 +132,15 @@ class ZfsNode(ExecuteNode):
# actual useful info
if len(progress_fields) >= 3:
if progress_fields[0] == 'full' or progress_fields[0] == 'size':
# Reset the total bytes and start the timer again (otherwise the MB/s
# counter gets confused)
self._progress_total_bytes = int(progress_fields[2])
self._progress_start_time = time.time()
elif progress_fields[0] == 'incremental':
# Reset the total bytes and start the timer again (otherwise the MB/s
# counter gets confused)
self._progress_total_bytes = int(progress_fields[3])
self._progress_start_time = time.time()
elif progress_fields[1].isnumeric():
bytes_ = int(progress_fields[1])
if self._progress_total_bytes:

129
zfs_autobackup/test.py Normal file
View File

@ -0,0 +1,129 @@
import os.path
import os
import subprocess
import sys
import time
from signal import signal, SIGPIPE
import util
signal(SIGPIPE, util.sigpipe_handler)
try:
print ("voor eerste")
raise Exception("eerstre")
except Exception as e:
print ("voor tweede")
raise Exception("tweede")
finally:
print ("JO")
def generator():
try:
util.deb('in generator')
print ("TRIGGER SIGPIPE")
sys.stdout.flush()
util.deb('after trigger')
# if False:
yield ("bla")
# yield ("bla")
except GeneratorExit as e:
util.deb('GENEXIT '+str(e))
raise
except Exception as e:
util.deb('EXCEPT '+str(e))
finally:
util.deb('FINALLY')
print("nog iets")
sys.stdout.flush()
util.deb('after print in finally WOOP!')
util.deb('START')
g=generator()
util.deb('after generator')
for bla in g:
# print ("heb wat ontvangen")
util.deb('ontvangen van gen')
break
# raise Exception("moi")
pass
raise Exception("moi")
util.deb('after for')
while True:
pass
#
# with open('test.py', 'rb') as fh:
#
# # fsize = fh.seek(10000, os.SEEK_END)
# # print(fsize)
#
# start=time.time()
# for i in range(0,1000000):
# # fh.seek(0, 0)
# fsize=fh.seek(0, os.SEEK_END)
# # fsize=fh.tell()
# # os.path.getsize('test.py')
# print(time.time()-start)
#
#
# print(fh.tell())
#
# sys.exit(0)
#
#
#
# checked=1
# skipped=1
# coverage=0.1
#
# max_skip=0
#
#
# skipinarow=0
# while True:
# total=checked+skipped
#
# skip=coverage<random()
# if skip:
# skipped = skipped + 1
# print("S {:.2f}%".format(checked * 100 / total))
#
# skipinarow = skipinarow+1
# if skipinarow>max_skip:
# max_skip=skipinarow
# else:
# skipinarow=0
# checked=checked+1
# print("C {:.2f}%".format(checked * 100 / total))
#
# print(max_skip)
#
# skip=0
# while True:
#
# total=checked+skipped
# if skip>0:
# skip=skip-1
# skipped = skipped + 1
# print("S {:.2f}%".format(checked * 100 / total))
# else:
# checked=checked+1
# print("C {:.2f}%".format(checked * 100 / total))
#
# #calc new skip
# skip=skip+((1/coverage)-1)*(random()*2)
# # print(skip)
# if skip> max_skip:
# max_skip=skip
#
# print(max_skip)

65
zfs_autobackup/util.py Normal file
View File

@ -0,0 +1,65 @@
# root@psyt14s:/home/psy/zfs_autobackup# ls -lh /home/psy/Downloads/carimage.zip
# -rw-rw-r-- 1 psy psy 990M Nov 26 2020 /home/psy/Downloads/carimage.zip
# root@psyt14s:/home/psy/zfs_autobackup# time sha1sum /home/psy/Downloads/carimage.zip
# a682e1a36e16fe0d0c2f011104f4a99004f19105 /home/psy/Downloads/carimage.zip
#
# real 0m2.558s
# user 0m2.105s
# sys 0m0.448s
# root@psyt14s:/home/psy/zfs_autobackup# time python3 -m zfs_autobackup.ZfsCheck
#
# real 0m1.459s
# user 0m0.993s
# sys 0m0.462s
# NOTE: surprisingly sha1 in via python3 is faster than the native sha1sum utility, even in the way we use below!
import os
import platform
import sys
def tmp_name(suffix=""):
"""create temporary name unique to this process and node. always retruns the same result during the same execution"""
#we could use uuids but those are ugly and confusing
name="{}-{}-{}".format(
os.path.basename(sys.argv[0]).replace(" ","_"),
platform.node(),
os.getpid())
name=name+suffix
return name
def get_tmp_clone_name(snapshot):
pool=snapshot.zfs_node.get_pool(snapshot)
return pool.name+"/"+tmp_name()
def output_redir():
"""use this after a BrokenPipeError to prevent further exceptions.
Redirects stdout/err to /dev/null
"""
devnull = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull, sys.stdout.fileno())
os.dup2(devnull, sys.stderr.fileno())
def sigpipe_handler(sig, stack):
#redir output so we dont get more SIGPIPES during cleanup. (which my try to write to stdout)
output_redir()
deb('redir')
# def check_output():
# """make sure stdout still functions. if its broken, this will trigger a SIGPIPE which will be handled by the sigpipe_handler."""
# try:
# print(" ")
# sys.stdout.flush()
# except Exception as e:
# pass
# def deb(txt):
# with open('/tmp/debug.log', 'a') as fh:
# fh.write("DEB: "+txt+"\n")