mirror of
https://github.com/roddhjav/pass-import.git
synced 2026-03-01 18:23:34 +01:00
273 lines
9.0 KiB
Python
273 lines
9.0 KiB
Python
# -*- encoding: utf-8 -*-
|
|
# pass-import - test suite common resources
|
|
# Copyright (C) 2017-2024 Alexandre PUJOL <alexandre@pujol.io>.
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
"""pass-import test suite common resources.
|
|
It provides:
|
|
- tests.tmp Path to the test temporary directory.
|
|
- tests.tests Root path for tests
|
|
- tests.assets Root path of tests assets.
|
|
- tests.db Root path of db where the files to import live.
|
|
- tests.formats Root path with basic malformed formats.
|
|
- tests.managers Interface to manage the managers classes.
|
|
- tests.conf Dictionary with managers tests settings.
|
|
- tests.Tests() Base test class.
|
|
- tests.yaml_load() Open and load a yaml reference resource.
|
|
- tests.cls() Load a password manager object.
|
|
- tests.reference() Set the expected reference data for a given manager.
|
|
- tests.clear() Clear data from key not in keep.
|
|
- tests.captured() Context manager to capture stdout.
|
|
- tests.mocked() Mock cloud password managers API response.
|
|
- tests.skipIfNo() Skip a password manager test if it is disabled.
|
|
- tests.skipIfNoInstalled() Skip a test if a program is not installed.
|
|
- tests.skipIfNoModule() Skip a test if an optional module is not installed.
|
|
- tests.mock_hibp() Mock HIBP API response.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import unittest
|
|
from io import StringIO
|
|
from contextlib import contextmanager
|
|
import yaml
|
|
|
|
import pass_import
|
|
import pass_import.__main__
|
|
|
|
|
|
tmp = '/tmp/tests/pass-import/' # nosec
|
|
tests = os.path.abspath('tests')
|
|
assets = os.path.join(tests, 'assets') + os.sep
|
|
formats = os.path.join(assets, 'format') + os.sep
|
|
db = os.path.join(assets, 'db') + os.sep
|
|
managers = pass_import.Managers()
|
|
with open(os.path.join(tests, 'tests.yml'), 'r') as cfile:
|
|
conf = yaml.safe_load(cfile)
|
|
|
|
|
|
def _id(obj):
|
|
return obj
|
|
|
|
|
|
def skipIfNo(name: str):
|
|
"""Skip a password manager test if it is disabled."""
|
|
if name not in {'bitwarden', 'lastpass', 'onepassword'}:
|
|
return _id
|
|
manager = name.upper()
|
|
enabled = 'T_%s' % manager
|
|
password = 'TESTS_%s_PASS' % manager
|
|
if not (enabled in os.environ and password in os.environ):
|
|
return unittest.skip(f"Skipping: {name} tests disabled.")
|
|
return _id
|
|
|
|
|
|
def skipIfNoInstalled(name: str):
|
|
"""Skip a test if a program is not installed."""
|
|
if shutil.which(name) is None:
|
|
return unittest.skip(f"Skipping: {name} not installed disabled.")
|
|
return _id
|
|
|
|
|
|
def skipIfNoModule(name: str):
|
|
"""Skip a test if an optional module is not installed."""
|
|
try:
|
|
__import__(name)
|
|
except ImportError:
|
|
return unittest.skip(f"Skipping: module {name} not installed.")
|
|
return _id
|
|
|
|
|
|
def mocked(manager, cmd):
|
|
"""Mock cloud password managers API response."""
|
|
names = {'bitwarden', 'lastpass', 'onepassword'}
|
|
if manager not in names:
|
|
return ''
|
|
|
|
path = os.path.join(db, manager, cmd)
|
|
with open(path) as file:
|
|
return file.read()
|
|
|
|
|
|
@contextmanager
|
|
def captured():
|
|
"""Context manager to capture stdout."""
|
|
new_out, new_err = StringIO(), StringIO()
|
|
old_out, old_err = sys.stdout, sys.stderr
|
|
try:
|
|
sys.stdout, sys.stderr = new_out, new_err
|
|
yield sys.stdout, sys.stderr
|
|
finally:
|
|
sys.stdout, sys.stderr = old_out, old_err
|
|
|
|
|
|
def yaml_load(ref_path):
|
|
"""Open and load a yaml reference resource."""
|
|
ref_path = os.path.join(assets, 'references', ref_path)
|
|
with open(ref_path, 'r') as file:
|
|
return yaml.safe_load(file)
|
|
|
|
|
|
def cls(name, prefix=None, **args):
|
|
"""Load a password manager object."""
|
|
if not prefix:
|
|
prefix = os.path.join(db, conf[name]['path'])
|
|
settings = {'extra': True}
|
|
for key, value in args.items():
|
|
settings[key] = value
|
|
return managers.get(name)(prefix, settings=settings)
|
|
|
|
|
|
def reference(name=None):
|
|
"""Set the expected reference data for a given manager.
|
|
|
|
Some password managers do not store a lot off data (no group...).
|
|
Therefore, we need to remove these entries from the reference data
|
|
when testing these managers.
|
|
|
|
"""
|
|
with open(assets + '/references/main.yml', 'r') as file:
|
|
ref = yaml.safe_load(file)
|
|
if name:
|
|
if 'without' in conf[name]:
|
|
for key in conf[name]['without']:
|
|
for entry in ref:
|
|
entry.pop(key, None)
|
|
if 'root' in conf[name]:
|
|
for entry in ref:
|
|
entry['group'] = conf[name]['root'] + entry['group']
|
|
return ref
|
|
|
|
|
|
def clear(data, keep=None):
|
|
"""Clear data from key not in keep."""
|
|
if not keep:
|
|
keep = ['title', 'password', 'login', 'url', 'comments', 'group']
|
|
for entry in data:
|
|
delete = [k for k in entry.keys() if k not in keep]
|
|
empty = [k for k, v in entry.items() if not v]
|
|
delete.extend(empty)
|
|
for key in delete:
|
|
entry.pop(key, None)
|
|
|
|
|
|
def mock_hibp(*args, **kwargs):
|
|
"""Mock HIBP API response."""
|
|
|
|
class MockResponse:
|
|
def __init__(self):
|
|
data = [
|
|
"D5EE0CB1A41071812CCED2F1930E6E1A5D2:2",
|
|
"2DC183F740EE76F27B78EB39C8AD972A757:52579",
|
|
"CF164D7A51A1FD864B1BF9E1CE8A3EC171B:4",
|
|
"D0B910E7A3028703C0B30039795E908CEB2:7",
|
|
"AD6438836DBE526AA231ABDE2D0EEF74D42:3",
|
|
"EBAB0A7CE978E0194608B572E4F9404AA21:3",
|
|
"17727EAB0E800E62A776C76381DEFBC4145:120",
|
|
"5370372AC65308F03F6ED75EC6068C8E1BE:1386",
|
|
"1E4C9B93F3F0682250B6CF8331B7EE68FD8:3730471",
|
|
"437FAA5A7FCE15D1DDCB9EAEAEA377667B8:123422",
|
|
"944C22589AC652B0F47918D58CA0CDCCB63:411"
|
|
]
|
|
|
|
self.text = "\r\n".join(data)
|
|
|
|
def raise_for_status(self):
|
|
pass
|
|
|
|
return MockResponse()
|
|
|
|
|
|
class Test(unittest.TestCase):
|
|
"""Common resources for all tests.
|
|
|
|
:param str key: Optional key for a password manager.
|
|
:param str token: Optional token for a password manager.
|
|
:param str login: Login for a password manager.
|
|
:param str prefix: Path to a password repository.
|
|
:param str masterpassword: Master password used for a password manager.
|
|
:param list gpgids: Test GPGIDs.
|
|
|
|
"""
|
|
key = ''
|
|
token = '' # nosec
|
|
login = ''
|
|
prefix = ''
|
|
masterpassword = 'correct horse battery staple'
|
|
gpgids = ['D4C78DB7920E1E27F5416B81CC9DB947CF90C77B', '']
|
|
|
|
def __init__(self, methodName='runTest'): # noqa
|
|
super().__init__(methodName)
|
|
|
|
# GPG keyring & pass settings
|
|
os.environ.pop('GPG_AGENT_INFO', None)
|
|
os.environ.pop('PASSWORD_STORE_SIGNING_KEY', None)
|
|
os.environ['GNUPGHOME'] = os.path.join(os.getcwd(), assets + 'gnupg')
|
|
|
|
# Main related method
|
|
|
|
def main(self, cmd, code=None, msg=''):
|
|
"""Call to the main function."""
|
|
sys.argv = ['main']
|
|
sys.argv.extend(cmd)
|
|
if code is None:
|
|
pass_import.__main__.main()
|
|
elif msg == '':
|
|
with self.assertRaises(SystemExit) as cm:
|
|
pass_import.__main__.main()
|
|
self.assertEqual(cm.exception.code, code)
|
|
else:
|
|
with captured() as (out, err):
|
|
with self.assertRaises(SystemExit) as cm:
|
|
pass_import.__main__.main()
|
|
if code == 0:
|
|
message = out.getvalue().strip()
|
|
else:
|
|
message = err.getvalue().strip()
|
|
self.assertIn(msg, message)
|
|
self.assertEqual(cm.exception.code, code)
|
|
|
|
# Export related method
|
|
|
|
def _tmpdir(self, path=''):
|
|
"""Create a temporary test directory named after the testname."""
|
|
self.prefix = os.path.join(tmp, self._testMethodName)
|
|
|
|
# Re-initialize the test directory
|
|
if os.path.isdir(self.prefix):
|
|
shutil.rmtree(self.prefix, ignore_errors=True)
|
|
os.makedirs(self.prefix, exist_ok=True)
|
|
|
|
if path != '':
|
|
self.prefix = os.path.join(self.prefix, path)
|
|
|
|
# Import related methods
|
|
|
|
def assertImport(self, data, refdata, keep=None): # noqa
|
|
"""Compare imported data with the reference data."""
|
|
clear(data, keep)
|
|
self.assertEqual(len(data), len(refdata))
|
|
for entry in data:
|
|
self.assertIn(entry, refdata)
|
|
|
|
# Special exporter access methods
|
|
|
|
def _credentials(self, manager=''):
|
|
"""Set credentials for cloud based password managers."""
|
|
names = {'bitwarden', 'lastpass', 'onepassword'}
|
|
if manager in names:
|
|
name = manager.upper()
|
|
self.masterpassword = os.environ.get(f'TESTS_{name}_PASS', '')
|
|
self.login = os.environ.get(f'TESTS_{name}_LOGIN', '')
|
|
self.key = os.environ.get(f'TESTS_{name}_SECRETKEY', '')
|
|
self.token = os.environ.get(f'TESTS_{name}_TOKEN', '')
|
|
|
|
def _init_pass(self):
|
|
"""Initialize a new password store repository."""
|
|
with open(os.path.join(self.prefix, '.gpg-id'), 'w') as file:
|
|
file.write('\n'.join(self.gpgids))
|
|
|
|
def _init_keepass(self):
|
|
"""Initialize a new keepass repository."""
|
|
shutil.copyfile(assets + 'export/keepass.kdbx', self.prefix)
|