download_repo_index_v2() for verified downloading of index-v2
This commit is contained in:
parent
a557764b4d
commit
f3e49f4bcb
|
@ -37,10 +37,12 @@ verify_apk_signature # NOQA: B101
|
|||
generate_keystore # NOQA: B101
|
||||
from fdroidserver.index import (download_repo_index,
|
||||
download_repo_index_v1,
|
||||
download_repo_index_v2,
|
||||
get_mirror_service_urls,
|
||||
make as make_index) # NOQA: E402
|
||||
download_repo_index # NOQA: B101
|
||||
download_repo_index_v1 # NOQA: B101
|
||||
download_repo_index_v2 # NOQA: B101
|
||||
get_mirror_service_urls # NOQA: B101
|
||||
make_index # NOQA: B101
|
||||
from fdroidserver.update import (process_apk,
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import collections
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
@ -1526,15 +1527,78 @@ def download_repo_index_v1(url_str, etag=None, verify_fingerprint=True, timeout=
|
|||
with tempfile.NamedTemporaryFile() as fp:
|
||||
fp.write(download)
|
||||
fp.flush()
|
||||
index, public_key, public_key_fingerprint = get_index_from_jar(fp.name, fingerprint)
|
||||
index, public_key, public_key_fingerprint = get_index_from_jar(
|
||||
fp.name, fingerprint, allow_deprecated=True
|
||||
)
|
||||
index["repo"]["pubkey"] = hexlify(public_key).decode()
|
||||
index["repo"]["fingerprint"] = public_key_fingerprint
|
||||
index["apps"] = [metadata.App(app) for app in index["apps"]]
|
||||
return index, new_etag
|
||||
|
||||
|
||||
def get_index_from_jar(jarfile, fingerprint=None):
|
||||
"""Return the data, public key, and fingerprint from index-v1.jar.
|
||||
def download_repo_index_v2(url_str, etag=None, verify_fingerprint=True, timeout=600):
|
||||
"""Download and verifies index v2 file, then returns its data.
|
||||
|
||||
Downloads the repository index from the given :param url_str and
|
||||
verifies the repository's fingerprint if :param verify_fingerprint
|
||||
is not False. In order to verify the data, the fingerprint must
|
||||
be provided as part of the URL.
|
||||
|
||||
Raises
|
||||
------
|
||||
VerificationException() if the repository could not be verified
|
||||
|
||||
Returns
|
||||
-------
|
||||
A tuple consisting of:
|
||||
- The index in JSON v2 format or None if the index did not change
|
||||
- The new eTag as returned by the HTTP request
|
||||
|
||||
"""
|
||||
url = urllib.parse.urlsplit(url_str)
|
||||
|
||||
fingerprint = None
|
||||
if verify_fingerprint:
|
||||
query = urllib.parse.parse_qs(url.query)
|
||||
if 'fingerprint' not in query:
|
||||
raise VerificationException(_("No fingerprint in URL."))
|
||||
fingerprint = query['fingerprint'][0]
|
||||
|
||||
if url.path.endswith('/entry.jar') or url.path.endswith('/index-v2.json'):
|
||||
path = url.path.rsplit('/', 1)[0]
|
||||
else:
|
||||
path = url.path.rstrip('/')
|
||||
|
||||
url = urllib.parse.SplitResult(url.scheme, url.netloc, path + '/entry.jar', '', '')
|
||||
download, new_etag = net.http_get(url.geturl(), etag, timeout)
|
||||
|
||||
if download is None:
|
||||
return None, new_etag
|
||||
|
||||
# jarsigner is used to verify the JAR, it requires a file for input
|
||||
with tempfile.TemporaryDirectory() as dirname:
|
||||
with (Path(dirname) / 'entry.jar').open('wb') as fp:
|
||||
fp.write(download)
|
||||
fp.flush()
|
||||
entry, public_key, fingerprint = get_index_from_jar(fp.name, fingerprint)
|
||||
|
||||
name = entry['index']['name']
|
||||
sha256 = entry['index']['sha256']
|
||||
url = urllib.parse.SplitResult(url.scheme, url.netloc, path + name, '', '')
|
||||
index, _ignored = net.http_get(url.geturl(), None, timeout)
|
||||
if sha256 != hashlib.sha256(index).hexdigest():
|
||||
raise VerificationException(
|
||||
_("SHA-256 of {url} does not match entry!").format(url=url)
|
||||
)
|
||||
return json.loads(index), new_etag
|
||||
|
||||
|
||||
def get_index_from_jar(jarfile, fingerprint=None, allow_deprecated=False):
|
||||
"""Return the data, public key and fingerprint from an index JAR with one JSON file.
|
||||
|
||||
The F-Droid index files always contain a single data file and a
|
||||
JAR Signature. Since index-v1, the data file is always JSON.
|
||||
That single data file is named the same as the JAR file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
@ -1547,14 +1611,25 @@ def get_index_from_jar(jarfile, fingerprint=None):
|
|||
|
||||
"""
|
||||
logging.debug(_('Verifying index signature:'))
|
||||
common.verify_deprecated_jar_signature(jarfile)
|
||||
|
||||
if allow_deprecated:
|
||||
common.verify_deprecated_jar_signature(jarfile)
|
||||
else:
|
||||
common.verify_jar_signature(jarfile)
|
||||
|
||||
with zipfile.ZipFile(jarfile) as jar:
|
||||
public_key, public_key_fingerprint = get_public_key_from_jar(jar)
|
||||
if fingerprint is not None:
|
||||
fingerprint = re.sub(r'[^0-9A-F]', r'', fingerprint.upper())
|
||||
if fingerprint != public_key_fingerprint:
|
||||
raise VerificationException(_("The repository's fingerprint does not match."))
|
||||
data = json.loads(jar.read('index-v1.json').decode())
|
||||
raise VerificationException(
|
||||
_("The repository's fingerprint does not match.")
|
||||
)
|
||||
for f in jar.namelist():
|
||||
if not f.startswith('META-INF/'):
|
||||
jsonfile = f
|
||||
break
|
||||
data = json.loads(jar.read(jsonfile))
|
||||
return data, public_key, public_key_fingerprint
|
||||
|
||||
|
||||
|
|
|
@ -67,6 +67,24 @@ class ApiTest(unittest.TestCase):
|
|||
)
|
||||
self.assertEqual(index_url, etag_set_to_url)
|
||||
|
||||
@mock.patch('fdroidserver.net.http_get')
|
||||
def test_download_repo_index_v2_url_parsing(self, mock_http_get):
|
||||
"""Test whether it is trying to download the right file
|
||||
|
||||
This passes the URL back via the etag return value just as a
|
||||
hack to check which URL was actually attempted.
|
||||
|
||||
"""
|
||||
mock_http_get.side_effect = lambda url, etag, timeout: (None, url)
|
||||
repo_url = 'https://example.org/fdroid/repo'
|
||||
entry_url = 'https://example.org/fdroid/repo/entry.jar'
|
||||
index_url = 'https://example.org/fdroid/repo/index-v2.json'
|
||||
for url in (repo_url, entry_url, index_url):
|
||||
_ignored, etag_set_to_url = fdroidserver.download_repo_index_v2(
|
||||
url, verify_fingerprint=False
|
||||
)
|
||||
self.assertEqual(entry_url, etag_set_to_url)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
newSuite = unittest.TestSuite()
|
||||
|
|
|
@ -113,7 +113,7 @@ class IndexTest(unittest.TestCase):
|
|||
self._sign_test_index_v1_jar()
|
||||
pubkey, fingerprint = fdroidserver.index.extract_pubkey()
|
||||
data, public_key, public_key_fingerprint = fdroidserver.index.get_index_from_jar(
|
||||
'repo/index-v1.jar', fingerprint
|
||||
'repo/index-v1.jar', fingerprint, allow_deprecated=True
|
||||
)
|
||||
self.assertIsNotNone(data)
|
||||
self.assertIsNotNone(public_key)
|
||||
|
@ -123,13 +123,15 @@ class IndexTest(unittest.TestCase):
|
|||
pubkey, fingerprint = fdroidserver.index.extract_pubkey()
|
||||
fingerprint = fingerprint[:-1] + 'G'
|
||||
with self.assertRaises(fdroidserver.exception.VerificationException):
|
||||
fdroidserver.index.get_index_from_jar('repo/index-v1.jar', fingerprint)
|
||||
fdroidserver.index.get_index_from_jar(
|
||||
'repo/index-v1.jar', fingerprint, allow_deprecated=True
|
||||
)
|
||||
|
||||
def test_get_index_from_jar_with_chars_to_be_stripped(self):
|
||||
self._sign_test_index_v1_jar()
|
||||
fingerprint = 'NOOOO F4 9A F3 F1 1E FD DF 20 DF FD 70 F5 E3 11 7B 99 76 67 41 67 AD CA 28 0E 6B 19 32 A0 60 1B 26 F6'
|
||||
data, public_key, public_key_fingerprint = fdroidserver.index.get_index_from_jar(
|
||||
'repo/index-v1.jar', fingerprint
|
||||
'repo/index-v1.jar', fingerprint, allow_deprecated=True
|
||||
)
|
||||
|
||||
@patch('requests.head')
|
||||
|
@ -169,14 +171,106 @@ class IndexTest(unittest.TestCase):
|
|||
|
||||
@patch('fdroidserver.net.http_get')
|
||||
def test_download_repo_index_url_parsing(self, mock_http_get):
|
||||
"""Test whether it is trying to download the right file
|
||||
|
||||
This passes the URL back via the etag return value just as a
|
||||
hack to check which URL was actually attempted.
|
||||
|
||||
"""
|
||||
mock_http_get.side_effect = lambda url, etag, timeout: (None, url)
|
||||
repo_url = 'https://example.org/fdroid/repo'
|
||||
index_url = 'https://example.org/fdroid/repo/index-v1.jar'
|
||||
fingerprint_url = 'https://example.org/fdroid/repo?fingerprint=' + GP_FINGERPRINT
|
||||
slash_url = 'https://example.org/fdroid/repo//?fingerprint=' + GP_FINGERPRINT
|
||||
for url in (repo_url, index_url, fingerprint_url, slash_url):
|
||||
_ignored, returned_url = fdroidserver.index.download_repo_index(url, verify_fingerprint=False)
|
||||
self.assertEqual(index_url, returned_url)
|
||||
_ignored, etag_set_to_url = fdroidserver.index.download_repo_index(url, verify_fingerprint=False)
|
||||
self.assertEqual(index_url, etag_set_to_url)
|
||||
|
||||
@patch('fdroidserver.net.http_get')
|
||||
def test_download_repo_index_v2_url_parsing(self, mock_http_get):
|
||||
"""Test whether it is trying to download the right file
|
||||
|
||||
This passes the URL back via the etag return value just as a
|
||||
hack to check which URL was actually attempted.
|
||||
|
||||
"""
|
||||
mock_http_get.side_effect = lambda url, etag, timeout: (None, url)
|
||||
repo_url = 'https://example.org/fdroid/repo'
|
||||
entry_url = 'https://example.org/fdroid/repo/entry.jar'
|
||||
index_url = 'https://example.org/fdroid/repo/index-v2.json'
|
||||
fingerprint_url = 'https://example.org/fdroid/repo?fingerprint=' + GP_FINGERPRINT
|
||||
slash_url = 'https://example.org/fdroid/repo//?fingerprint=' + GP_FINGERPRINT
|
||||
for url in (repo_url, entry_url, index_url, fingerprint_url, slash_url):
|
||||
_ignored, etag_set_to_url = fdroidserver.index.download_repo_index_v2(
|
||||
url, verify_fingerprint=False
|
||||
)
|
||||
self.assertEqual(entry_url, etag_set_to_url)
|
||||
|
||||
@patch('fdroidserver.net.http_get')
|
||||
def test_download_repo_index_v2(self, mock_http_get):
|
||||
def http_get_def(url, etag, timeout): # pylint: disable=unused-argument
|
||||
f = os.path.basename(url)
|
||||
with open(os.path.join(self.testdir, 'repo', f), 'rb') as fp:
|
||||
return (fp.read(), 'fakeetag')
|
||||
mock_http_get.side_effect = http_get_def
|
||||
os.chdir(self.testdir)
|
||||
fdroidserver.signindex.config['keystore'] = os.path.join(self.basedir, 'keystore.jks')
|
||||
os.mkdir('repo')
|
||||
shutil.copy(os.path.join(self.basedir, 'repo', 'entry.json'), 'repo')
|
||||
shutil.copy(os.path.join(self.basedir, 'repo', 'index-v2.json'), 'repo')
|
||||
fdroidserver.signindex.sign_index('repo', 'entry.json')
|
||||
repo_url = 'https://example.org/fdroid/repo'
|
||||
entry_url = 'https://example.org/fdroid/repo/entry.jar'
|
||||
index_url = 'https://example.org/fdroid/repo/index-v2.json'
|
||||
fingerprint_url = 'https://example.org/fdroid/repo?fingerprint=' + GP_FINGERPRINT
|
||||
slash_url = 'https://example.org/fdroid/repo//?fingerprint=' + GP_FINGERPRINT
|
||||
for url in (repo_url, entry_url, index_url, fingerprint_url, slash_url):
|
||||
data, _ignored = fdroidserver.index.download_repo_index_v2(
|
||||
url, verify_fingerprint=False
|
||||
)
|
||||
self.assertEqual(['repo', 'packages'], list(data.keys()))
|
||||
|
||||
@patch('fdroidserver.net.http_get')
|
||||
def test_download_repo_index_v2_bad_fingerprint(self, mock_http_get):
|
||||
def http_get_def(url, etag, timeout): # pylint: disable=unused-argument
|
||||
f = os.path.basename(url)
|
||||
with open(os.path.join(self.testdir, 'repo', f), 'rb') as fp:
|
||||
return (fp.read(), 'fakeetag')
|
||||
mock_http_get.side_effect = http_get_def
|
||||
os.chdir(self.testdir)
|
||||
fdroidserver.signindex.config['keystore'] = os.path.join(self.basedir, 'keystore.jks')
|
||||
os.mkdir('repo')
|
||||
shutil.copy(os.path.join(self.basedir, 'repo', 'entry.json'), 'repo')
|
||||
shutil.copy(os.path.join(self.basedir, 'repo', 'index-v2.json'), 'repo')
|
||||
fdroidserver.signindex.sign_index('repo', 'entry.json')
|
||||
bad_fp = '0123456789001234567890012345678900123456789001234567890012345678'
|
||||
bad_fp_url = 'https://example.org/fdroid/repo?fingerprint=' + bad_fp
|
||||
with self.assertRaises(fdroidserver.exception.VerificationException):
|
||||
data, _ignored = fdroidserver.index.download_repo_index_v2(bad_fp_url)
|
||||
|
||||
@patch('fdroidserver.net.http_get')
|
||||
def test_download_repo_index_v2_entry_verify(self, mock_http_get):
|
||||
def http_get_def(url, etag, timeout): # pylint: disable=unused-argument
|
||||
return (b'not the entry.jar file contents', 'fakeetag')
|
||||
mock_http_get.side_effect = http_get_def
|
||||
url = 'https://example.org/fdroid/repo?fingerprint=' + GP_FINGERPRINT
|
||||
with self.assertRaises(fdroidserver.exception.VerificationException):
|
||||
data, _ignored = fdroidserver.index.download_repo_index_v2(url)
|
||||
|
||||
@patch('fdroidserver.net.http_get')
|
||||
def test_download_repo_index_v2_index_verify(self, mock_http_get):
|
||||
def http_get_def(url, etag, timeout): # pylint: disable=unused-argument
|
||||
return (b'not the index-v2.json file contents', 'fakeetag')
|
||||
mock_http_get.side_effect = http_get_def
|
||||
os.chdir(self.testdir)
|
||||
fdroidserver.signindex.config['keystore'] = os.path.join(self.basedir, 'keystore.jks')
|
||||
os.mkdir('repo')
|
||||
shutil.copy(os.path.join(self.basedir, 'repo', 'entry.json'), 'repo')
|
||||
shutil.copy(os.path.join(self.basedir, 'repo', 'index-v2.json'), 'repo')
|
||||
fdroidserver.signindex.sign_index('repo', 'entry.json')
|
||||
url = 'https://example.org/fdroid/repo?fingerprint=' + GP_FINGERPRINT
|
||||
with self.assertRaises(fdroidserver.exception.VerificationException):
|
||||
data, _ignored = fdroidserver.index.download_repo_index_v2(url)
|
||||
|
||||
def test_v1_sort_packages(self):
|
||||
|
||||
|
|
Loading…
Reference in New Issue