convert fdroid scanner --exodus to SignatureDataController

This commit is contained in:
Michael Pöhn 2022-09-28 17:35:31 +02:00
parent d5ef1b2e95
commit c10633eac5
5 changed files with 142 additions and 116 deletions

View File

@ -76,7 +76,7 @@ from fdroidserver.exception import FDroidException, VCSException, NoSubmodulesEx
BuildException, VerificationException, MetaDataException
from .asynchronousfilereader import AsynchronousFileReader
from . import apksigcopier
from . import apksigcopier, common
# The path to this fdroidserver distribution
@ -321,6 +321,22 @@ def fill_config_defaults(thisconfig):
break
def get_config(options=None):
"""
helper function for getting access to commons.config while safely
initializing if it wasn't initialized yet.
"""
global config
if config is not None:
return config
config = {}
common.fill_config_defaults(config)
common.read_config(options)
return config
def regsub_file(pattern, repl, path):
with open(path, 'rb') as f:
text = f.read()

View File

@ -20,7 +20,6 @@ import os
import re
import sys
import json
import yaml
import imghdr
import shutil
import logging
@ -42,7 +41,6 @@ from . import metadata
from .exception import BuildException, VCSException, ConfigurationException
from . import scanner
config = None
options = None
DEFAULT_JSON_PER_BUILD = {'errors': [], 'warnings': [], 'infos': []} # type: ignore
@ -136,7 +134,7 @@ def _exodus_compile_signatures(signatures):
def _datetime_now():
"""
simple warpper for datetime.now to allow mocking it for testing
simple wrapper for datetime.now to allow mocking it for testing
"""
return datetime.now().astimezone()
@ -145,11 +143,12 @@ def _scanner_cachedir():
"""
get `Path` to local cache dir
"""
if not common.config:
cfg = common.get_config()
if not cfg:
raise ConfigurationException('config not initialized')
if "cachedir_scanner" not in common.config:
if "cachedir_scanner" not in cfg:
raise ConfigurationException("could not load 'cachedir_scanner' from config")
cachedir = Path(common.config["cachedir_scanner"])
cachedir = Path(cfg["cachedir_scanner"])
cachedir.mkdir(exist_ok=True, parents=True)
return cachedir
@ -170,7 +169,7 @@ class SignatureDataController:
def __init__(self, name, filename):
self.name = name
self.filename = filename
self.cache_outdated_interval = timedelta(days=7)
self.cache_outdated_interval = None
self.data = {}
def check_data_version(self):
@ -198,8 +197,27 @@ class SignatureDataController:
raise SignatureDataMalformedException() from e
except TypeError as e:
raise SignatureDataMalformedException() from e
if (timestamp + self.cache_outdated_interval) < scanner._datetime_now():
raise SignatureDataOutdatedException()
if self.cache_outdated_interval:
if (timestamp + self.cache_outdated_interval) < scanner._datetime_now():
raise SignatureDataOutdatedException()
def fetch(self):
try:
self.load_from_cache()
self.verify_data()
self.check_last_updated()
except (
SignatureDataMalformedException,
SignatureDataVersionMismatchException,
SignatureDataOutdatedException
):
try:
self.fetch_signatures_from_web()
except AttributeError:
# just load from defaults if fetch_signatures_from_web is not
# implemented
self.load_from_defaults()
self.write_to_cache()
def load(self):
try:
@ -237,62 +255,70 @@ class SignatureDataController:
for k in [x for x in self.data.keys() if x not in valid_keys]:
del self.data[k]
# def scan
class ExodusSignatureDataController(SignatureDataController):
def __init__(self):
super().__init__('Exodus signatures', 'exodus.yml')
self.cache_outdated_interval = timedelta(days=1) # refresh exodus cache after one day
def fetch_signatures_from_web():
pass
# TODO
# exodus_url = "https://reports.exodus-privacy.eu.org/api/trackers"
# sigs = {
# "signatures": [],
# "timestamp": scanner._datetime_now().isoformat(),
# "version": SCANNER_CACHE_VERSION,
# }
def fetch_signatures_from_web(self):
exodus_url = "https://reports.exodus-privacy.eu.org/api/trackers"
self.data = {
"signatures": {},
"timestamp": scanner._datetime_now().isoformat(),
"version": SCANNER_CACHE_VERSION,
}
# with urllib.request.urlopen(exodus_url) as f:
# data = json.load(f)
# for tracker in data["trackers"].values():
# sigs["signatures"].append({
# "name": tracker["name"],
# "binary_signature": tracker["code_signature"],
# "network_signature": tracker["network_signature"],
# "types": ["tracker", "non-free"] # right now we assume all trackers in exodus are non-free
# })
with urllib.request.urlopen(exodus_url) as f:
d = json.load(f)
for tracker in d["trackers"].values():
if tracker.get('code_signature'):
self.data["signatures"][tracker["name"]] = {
"name": tracker["name"],
"warn_code_signatures": [tracker["code_signature"]],
# exodus also provides network signatures, unused atm.
# "network_signatures": [tracker["network_signature"]],
"AntiFeatures": ["Tracking"],
"license": "NonFree" # We assume all trackers in exodus
# are non-free, alought free
# trackers like piwik, acra,
# etc. might be listed by exodus
# too.
}
class ScannerSignatureDataController(SignatureDataController):
def __init__(self):
super().__init__('Scanner signatures', 'scanner.json')
def fetch_signatures_from_web(self):
url = "https://uniqx.gitlab.io/fdroid-scanner-signatures/sigs.json"
with urllib.request.urlopen(url) as f:
data = yaml.safe_load(f)
# TODO: validate parsed data
# TODO: error message 'please update fdroidserver/report' when fetching failed due to changes in the data strucutre
self.data = data
class ScannerTool():
def __init__(self):
self.sdcs = [ScannerSignatureDataController()]
for sdc in self.sdcs:
sdc.load()
self.load()
self.compile_regexes()
def load(self):
for sdc in self.sdcs:
sdc.load()
def compile_regexes(self):
self.err_regex = {'code_signatures': {}, 'gradle_signatures': {}}
self.regexs = {
'err_code_signatures': {},
'err_gradle_signatures': {},
'warn_code_signatures': {},
'warn_gradle_signatures': {},
}
for sdc in self.sdcs:
for signame, sigdef in sdc.data.get('signatures', {}).items():
for sig in sigdef.get('code_signatures', []):
self.err_regex['code_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
self.regexs['err_code_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
for sig in sigdef.get('gradle_signatures', []):
self.err_regex['gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
self.regexs['err_gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
for sig in sigdef.get('warn_code_signatures', []):
self.regexs['warn_code_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
for sig in sigdef.get('warn_gradle_signatures', []):
self.regexs['warn_gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
def clear_cache(self):
# delete cache folder and all its contents
@ -300,6 +326,10 @@ class ScannerTool():
# re-initialize, this will re-populate the cache from default values
self.__init__()
def add(self, new_controller: SignatureDataController):
self.sdcs.append(new_controller)
self.compile_regexes()
# TODO: change this from singleton instance to dependency injection
# use `_get_tool()` instead of accessing this directly
@ -342,36 +372,20 @@ def scan_binary(apkfile, extract_signatures=None):
"""Scan output of dexdump for known non-free classes."""
logging.info(_('Scanning APK with dexdump for known non-free classes.'))
result = get_embedded_classes(apkfile)
problems = 0
problems, warnings = 0, 0
for classname in result:
for suspect, regexp in _get_tool().err_regex['code_signatures'].items():
for suspect, regexp in _get_tool().regexs['warn_code_signatures'].items():
if regexp.match(classname):
logging.debug("Found class '%s'" % classname)
logging.debug("Warning: found class '%s'" % classname)
warnings += 1
for suspect, regexp in _get_tool().regexs['err_code_signatures'].items():
if regexp.match(classname):
logging.debug("Problem: found class '%s'" % classname)
problems += 1
if extract_signatures:
def _detect_tracker(sig, tracker, class_list):
for clazz in class_list:
if sig.search(clazz):
logging.debug("Found tracker, class {} matching {}".format(clazz, tracker.code_signature))
return tracker
return None
results = []
args = [(extract_signatures[1][index], tracker, result)
for (index, tracker) in enumerate(extract_signatures[0]) if
len(tracker.code_signature) > 3]
for res in itertools.starmap(_detect_tracker, args):
if res:
results.append(res)
trackers = [t for t in results if t is not None]
problems += len(trackers)
if warnings:
logging.warning(_("Found {count} warnings in {filename}").format(count=warnings, filename=apkfile))
if problems:
logging.critical("Found problems in %s" % apkfile)
logging.critical(_("Found {count} problems in {filename}").format(count=problems, filename=apkfile))
return problems
@ -396,7 +410,7 @@ def scan_source(build_dir, build=metadata.Build()):
return any(al in s for al in allowlisted)
def suspects_found(s):
for n, r in _get_tool().err_regex['gradle_signatures'].items():
for n, r in _get_tool().regexs['err_gradle_signatures'].items():
if r.match(s) and not is_allowlisted(s):
yield n
@ -669,7 +683,7 @@ def scan_source(build_dir, build=metadata.Build()):
def main():
global config, options, json_per_build
global options, json_per_build
# Parse command line...
parser = ArgumentParser(
@ -699,24 +713,25 @@ def main():
else:
logging.getLogger().setLevel(logging.ERROR)
config = common.read_config(options)
# initialize/load configuration values
common.get_config(options)
if options.clear_cache:
scanner._get_tool().clear_cache()
if options.exodus:
c = ExodusSignatureDataController()
c.fetch()
scanner._get_tool().add(c)
probcount = 0
exodus = []
if options.exodus:
exodus = load_exodus_trackers_signatures()
appids = []
for apk in options.appid:
if os.path.isfile(apk):
count = scanner.scan_binary(apk, exodus)
count = scanner.scan_binary(apk)
if count > 0:
logging.warning(
_('Scanner found {count} problems in {apk}:').format(
_('Scanner found {count} problems in {apk}').format(
count=count, apk=apk
)
)

View File

@ -402,7 +402,7 @@ class BuildTest(unittest.TestCase):
os.chdir(testdir)
os.mkdir("build")
config = dict()
config = fdroidserver.common.get_config()
config['sdk_path'] = os.getenv('ANDROID_HOME')
config['ndk_paths'] = {'r10d': os.getenv('ANDROID_NDK_HOME')}
fdroidserver.common.config = config

View File

@ -31,7 +31,7 @@ import fdroidserver.build
import fdroidserver.common
import fdroidserver.metadata
import fdroidserver.scanner
from testcommon import TmpCwd
from testcommon import TmpCwd, mock_open_to_str
class ScannerTest(unittest.TestCase):
@ -449,17 +449,18 @@ class Test_scan_binary(unittest.TestCase):
fdroidserver.common.options = mock.Mock()
fdroidserver.scanner._SCANNER_TOOL = mock.Mock()
fdroidserver.scanner._SCANNER_TOOL.err_regex = {}
fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'] = {
fdroidserver.scanner._SCANNER_TOOL.regexs = {}
fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'] = {
"java/lang/Object": re.compile(r'.*java/lang/Object', re.IGNORECASE | re.UNICODE)
}
fdroidserver.scanner._SCANNER_TOOL.regexs['warn_code_signatures'] = {}
def test_code_signature_match(self):
apkfile = os.path.join(self.basedir, 'no_targetsdk_minsdk1_unsigned.apk')
self.assertEqual(
1,
fdroidserver.scanner.scan_binary(apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'].values(), apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'].values(), apkfile),
)
@unittest.skipIf(
@ -470,7 +471,7 @@ class Test_scan_binary(unittest.TestCase):
)
def test_bottom_level_embedded_apk_code_signature(self):
apkfile = os.path.join(self.basedir, 'apk.embedded_1.apk')
fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'] = {
fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'] = {
"org/bitbucket/tickytacky/mirrormirror/MainActivity": re.compile(
r'.*org/bitbucket/tickytacky/mirrormirror/MainActivity', re.IGNORECASE | re.UNICODE
)
@ -479,12 +480,12 @@ class Test_scan_binary(unittest.TestCase):
self.assertEqual(
1,
fdroidserver.scanner.scan_binary(apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'].values(), apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'].values(), apkfile),
)
def test_top_level_signature_embedded_apk_present(self):
apkfile = os.path.join(self.basedir, 'apk.embedded_1.apk')
fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'] = {
fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'] = {
"org/fdroid/ci/BuildConfig": re.compile(
r'.*org/fdroid/ci/BuildConfig', re.IGNORECASE | re.UNICODE
)
@ -492,7 +493,7 @@ class Test_scan_binary(unittest.TestCase):
self.assertEqual(
1,
fdroidserver.scanner.scan_binary(apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'].values(), apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'].values(), apkfile),
)
# TODO: re-enable once allow-listing migrated to more complex regexes
@ -671,6 +672,23 @@ class Test_SignatureDataController(unittest.TestCase):
with self.assertRaises(fdroidserver.scanner.SignatureDataVersionMismatchException):
sdc.check_data_version()
def test_write_to_cache(self):
open_func = mock.mock_open()
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc.data = {"mocked": "data"}
with mock.patch("builtins.open", open_func), mock.patch(
"fdroidserver.scanner._scanner_cachedir",
return_value=pathlib.Path('.'),
):
sdc.write_to_cache()
open_func.assert_called_with(pathlib.Path('fff.yml'), 'w', encoding="utf-8")
self.assertEqual(
mock_open_to_str(open_func),
"""{\n "mocked": "data"\n}"""
)
class Test_ScannerSignatureDataController_fetch_signatures_from_web(unittest.TestCase):
def setUp(self):
@ -693,34 +711,6 @@ class Test_ScannerSignatureDataController_fetch_signatures_from_web(unittest.Tes
- ads
''')))
def test_fetch_signatures_from_web(self):
sdc = fdroidserver.scanner.ScannerSignatureDataController()
with unittest.mock.patch('urllib.request.urlopen', self.uo_func):
sdc.fetch_signatures_from_web()
self.assertEqual(sdc.data.get('version'), 999)
self.assertEqual(sdc.data.get('timestamp'), "1999-12-31T23:59:59.999999+00:00")
self.assertListEqual(
sdc.data.get('signatures'),
[
{
'binary_signature': 'com/google/firebase',
'name': 'Google Firebase',
'types': ['tracker', 'non-free'],
},
{
'gradle_signature': 'com/google/android/gms',
'name': 'Google Mobile Services',
'types': ['non-free'],
},
{
'network_signature': 'doubleclick\\.net',
'name': 'Another thing to test.',
'types': ['ads'],
},
]
)
self.assertEqual(len(sdc.data), 3)
class Test_main(unittest.TestCase):
def setUp(self):
@ -768,7 +758,7 @@ class Test_main(unittest.TestCase):
self.exit_func.assert_not_called()
self.read_app_args_func.assert_not_called()
self.scan_binary_func.assert_called_once_with('local.application.apk', [])
self.scan_binary_func.assert_called_once_with('local.application.apk')
if __name__ == "__main__":

View File

@ -50,6 +50,11 @@ class TmpPyPath():
def mock_open_to_str(mock):
"""
helper function for accessing all data written into a
unittest.mock.mock_open() instance as a string.
"""
return "".join([
x.args[0] for x in mock.mock_calls if str(x).startswith("call().write(")
])