add --refresh to scanner

This commit is contained in:
Michael Pöhn 2022-09-29 19:55:27 +02:00
parent 4a38908bd7
commit bfcc30b854
2 changed files with 52 additions and 63 deletions

View File

@ -21,7 +21,6 @@ import re
import sys
import json
import imghdr
import shutil
import logging
import zipfile
import itertools
@ -147,10 +146,12 @@ class SignatureDataVersionMismatchException(Exception):
class SignatureDataController:
def __init__(self, name, filename):
def __init__(self, name, filename, url):
self.name = name
self.filename = filename
self.cache_outdated_interval = None
self.url = url
# by default we assume cache is valid indefinitely
self.cache_outdated_interval = timedelta(days=999999)
self.data = {}
def check_data_version(self):
@ -165,7 +166,7 @@ class SignatureDataController:
is not ok.
:raises SignatureDataMalformedException: when timestamp value is
inaccessible or not parseable
inaccessible or not parse-able
:raises SignatureDataOutdatedException: when timestamp is older then
`self.cache_outdated_interval`
'''
@ -178,9 +179,8 @@ class SignatureDataController:
raise SignatureDataMalformedException() from e
except TypeError as e:
raise SignatureDataMalformedException() from e
if self.cache_outdated_interval:
if (timestamp + self.cache_outdated_interval) < scanner._datetime_now():
raise SignatureDataOutdatedException()
if (timestamp + self.cache_outdated_interval) < scanner._datetime_now():
raise SignatureDataOutdatedException()
def fetch(self):
try:
@ -233,24 +233,31 @@ class SignatureDataController:
self.check_data_version()
valid_keys = ['timestamp', 'version', 'signatures']
for k in [x for x in self.data.keys() if x not in valid_keys]:
del self.data[k]
for k in list(self.data.keys()):
if k not in valid_keys:
del self.data[k]
def fetch_signatures_from_web(self):
logging.debug(_("downloading '{}'").format(self.url))
with urllib.request.urlopen(self.url) as f:
self.data = json.load(f)
class ExodusSignatureDataController(SignatureDataController):
def __init__(self):
super().__init__('Exodus signatures', 'exodus.yml')
super().__init__('Exodus signatures', 'exodus.yml', 'https://reports.exodus-privacy.eu.org/api/trackers')
self.cache_outdated_interval = timedelta(days=1) # refresh exodus cache after one day
def fetch_signatures_from_web(self):
exodus_url = "https://reports.exodus-privacy.eu.org/api/trackers"
logging.debug(_("downloading '{}'").format(self.url))
self.data = {
"signatures": {},
"timestamp": scanner._datetime_now().isoformat(),
"version": SCANNER_CACHE_VERSION,
}
with urllib.request.urlopen(exodus_url) as f:
with urllib.request.urlopen(self.url) as f:
d = json.load(f)
for tracker in d["trackers"].values():
if tracker.get('code_signature'):
@ -261,21 +268,26 @@ class ExodusSignatureDataController(SignatureDataController):
# "network_signatures": [tracker["network_signature"]],
"AntiFeatures": ["Tracking"],
"license": "NonFree" # We assume all trackers in exodus
# are non-free, alought free
# are non-free, although free
# trackers like piwik, acra,
# etc. might be listed by exodus
# too.
}
class SUSSDataController(SignatureDataController):
def __init__(self):
super().__init__('SUSS', 'suss.json')
class ScannerTool():
def __init__(self):
self.sdcs = [SUSSDataController()]
self.sdcs = [
SignatureDataController(
'SUSS',
'suss.json',
'https://fdroid.gitlab.io/fdroid-suss/suss.json'
),
]
# we could add support for loading additional signature source
# definitions from config.yml here
self.load()
self.compile_regexes()
@ -301,11 +313,9 @@ class ScannerTool():
for sig in sigdef.get('warn_gradle_signatures', []):
self.regexs['warn_gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
def clear_cache(self):
# delete cache folder and all its contents
shutil.rmtree(scanner._scanner_cachedir(), ignore_errors=True)
# re-initialize, this will re-populate the cache from default values
self.__init__()
def refresh(self):
for sdc in self.sdcs:
sdc.fetch_signatures_from_web()
def add(self, new_controller: SignatureDataController):
self.sdcs.append(new_controller)
@ -660,8 +670,8 @@ def main():
help=_("Force scan of disabled apps and builds."))
parser.add_argument("--json", action="store_true", default=False,
help=_("Output JSON to stdout."))
parser.add_argument("--clear-cache", action="store_true", default=False,
help=_("purge local scanner definitions cache"))
parser.add_argument("--refresh", "-r", action="store_true", default=False,
help=_("fetach the latest version of signatures from the web"))
metadata.add_metadata_arguments(parser)
options = parser.parse_args()
metadata.warnings_action = options.W
@ -676,11 +686,14 @@ def main():
# initialize/load configuration values
common.get_config(options)
if options.clear_cache:
scanner._get_tool().clear_cache()
if options.refresh:
scanner._get_tool().refresh()
if options.exodus:
c = ExodusSignatureDataController()
c.fetch()
if options.refresh:
c.fetch_signatures_from_web()
else:
c.fetch()
scanner._get_tool().add(c)
probcount = 0

View File

@ -4,7 +4,6 @@ import glob
import inspect
import logging
import optparse
import io
import os
import re
import shutil
@ -500,56 +499,56 @@ class Test_scan_binary(unittest.TestCase):
class Test_SignatureDataController(unittest.TestCase):
# __init__
def test_init(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml', 'https://example.com/test.json')
self.assertEqual(sdc.name, 'nnn')
self.assertEqual(sdc.filename, 'fff.yml')
self.assertEqual(sdc.cache_outdated_interval, None)
self.assertEqual(sdc.cache_outdated_interval, timedelta(999999))
self.assertDictEqual(sdc.data, {})
# check_last_updated
def test_check_last_updated_ok(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml', 'https://example.com/test.json')
sdc.data['timestamp'] = datetime.now().astimezone().isoformat()
sdc.check_last_updated()
def test_check_last_updated_exception_cache_outdated(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml', 'https://example.com/test.json')
sdc.cache_outdated_interval = timedelta(days=7)
sdc.data['timestamp'] = (datetime.now().astimezone() - timedelta(days=30)).isoformat()
with self.assertRaises(fdroidserver.scanner.SignatureDataOutdatedException):
sdc.check_last_updated()
def test_check_last_updated_exception_missing_timestamp_value(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml', 'https://example.com/test.json')
with self.assertRaises(fdroidserver.scanner.SignatureDataMalformedException):
sdc.check_last_updated()
def test_check_last_updated_exception_not_string(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml', 'https://example.com/test.json')
sdc.data['timestamp'] = 12345
with self.assertRaises(fdroidserver.scanner.SignatureDataMalformedException):
sdc.check_last_updated()
def test_check_last_updated_exception_not_iso_formatted_string(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml', 'https://example.com/test.json')
sdc.data['timestamp'] = '01/09/2002 10:11'
with self.assertRaises(fdroidserver.scanner.SignatureDataMalformedException):
sdc.check_last_updated()
# check_data_version
def test_check_data_version_ok(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml', 'https://example.com/test.json')
sdc.data['version'] = fdroidserver.scanner.SCANNER_CACHE_VERSION
sdc.check_data_version()
def test_check_data_version_exception(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml', 'https://example.com/test.json')
with self.assertRaises(fdroidserver.scanner.SignatureDataVersionMismatchException):
sdc.check_data_version()
def test_write_to_cache(self):
open_func = mock.mock_open()
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml', 'https://example.com/test.json')
sdc.data = {"mocked": "data"}
with mock.patch("builtins.open", open_func), mock.patch(
@ -565,28 +564,6 @@ class Test_SignatureDataController(unittest.TestCase):
)
class Test_SUSSDataController_fetch_signatures_from_web(unittest.TestCase):
def setUp(self):
self.uo_func = mock.Mock(return_value=io.StringIO(textwrap.dedent('''\
version: 999
timestamp: "1999-12-31T23:59:59.999999+00:00"
signatures:
- binary_signature: com/google/firebase
name: Google Firebase
types:
- tracker
- non-free
- gradle_signature: com/google/android/gms
name: Google Mobile Services
types:
- non-free
- network_signature: doubleclick\\.net
name: Another thing to test.
types:
- ads
''')))
class Test_main(unittest.TestCase):
def setUp(self):
self.args = ["com.example.app", "local/additional.apk", "another.apk"]
@ -654,7 +631,6 @@ if __name__ == "__main__":
unittest.makeSuite(ScannerTest),
unittest.makeSuite(Test_scan_binary),
unittest.makeSuite(Test_SignatureDataController),
unittest.makeSuite(Test_SUSSDataController_fetch_signatures_from_web),
unittest.makeSuite(Test_main),
])
unittest.main(failfast=False)