134 lines
4.3 KiB
Python
134 lines
4.3 KiB
Python
"""
|
|
This module is responsible for cryptographic authorization and encryption for
|
|
communication between sr.ht services and each other, as well as the outside
|
|
world.
|
|
"""
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
|
|
from cryptography.fernet import Fernet, InvalidToken
|
|
from datetime import timedelta
|
|
from flask import abort, Response, request, current_app
|
|
from srht.config import cfg
|
|
import base64
|
|
import binascii
|
|
import json
|
|
import os
|
|
|
|
private_key = cfg("webhooks", "private-key")
|
|
private_key = Ed25519PrivateKey.from_private_bytes(
|
|
base64.b64decode(private_key))
|
|
public_key = private_key.public_key()
|
|
fernet = Fernet(cfg("sr.ht", "network-key"))
|
|
|
|
try:
|
|
from srht.redis import redis
|
|
except:
|
|
print("Warning: unable to initialize redis, nonce reuse will be possible")
|
|
redis = type("Redis", tuple(), {
|
|
"get": lambda *args, **kwargs: None,
|
|
"setex": lambda *args, **kwargs: None,
|
|
})
|
|
|
|
def verify_request_signature(request):
|
|
"""
|
|
Verifies an HTTP request has a valid signature from the sr.ht webhook key.
|
|
Returns the decoded, authenticated payload.
|
|
"""
|
|
def fail():
|
|
abort(Response(
|
|
status=403,
|
|
mimetype="application/json",
|
|
response=json.dumps({
|
|
"errors": [
|
|
{ "reason": "Request payload signature " +
|
|
"verification failed." },
|
|
]
|
|
})))
|
|
payload = request.data
|
|
signature = request.headers.get("X-Payload-Signature")
|
|
nonce = request.headers.get("X-Payload-Nonce")
|
|
nonce_key = f"sr.ht.signature-nonce.{nonce}"
|
|
if redis.get(nonce_key):
|
|
fail()
|
|
redis.setex(nonce_key, timedelta(days=90), "1")
|
|
try:
|
|
signature = base64.b64decode(signature)
|
|
nonce = nonce.encode()
|
|
public_key.verify(signature, payload + nonce)
|
|
return payload
|
|
except Exception as ex:
|
|
print("Request signature payload verification failure")
|
|
print(ex)
|
|
fail()
|
|
|
|
def verify_payload(payload, signature, nonce):
|
|
try:
|
|
payload = payload.encode()
|
|
signature = base64.b64decode(signature)
|
|
nonce = nonce.encode()
|
|
public_key.verify(signature, payload + nonce)
|
|
return payload
|
|
except Exception as ex:
|
|
return None
|
|
|
|
def sign_payload(payload):
|
|
"""
|
|
Returns the signature headers for a payload signed with the sr.ht webhook
|
|
key.
|
|
"""
|
|
nonce = binascii.hexlify(os.urandom(8))
|
|
signature = private_key.sign(payload.encode() + nonce)
|
|
signature = base64.b64encode(signature).decode()
|
|
return {
|
|
"X-Payload-Signature": signature,
|
|
"X-Payload-Nonce": nonce.decode(),
|
|
}
|
|
|
|
def verify_encrypted_authorization(auth):
|
|
"""
|
|
Verifies an X-Srht-Authorization header and returns the authenticated
|
|
side-channel payload, which includes the authorized user and OAuth client
|
|
ID. This is used for internal HTTP requests between sr.ht services.
|
|
"""
|
|
try:
|
|
auth = fernet.decrypt(auth.encode(), ttl=30)
|
|
except InvalidToken:
|
|
abort(Response(
|
|
status=403,
|
|
mimetype="application/json",
|
|
response=json.dumps({
|
|
"reason": "Internal request authorization failed.",
|
|
}),
|
|
))
|
|
return json.loads(auth)
|
|
|
|
internal_anon = 1337
|
|
"""
|
|
Set user=internal_anon for encrypt_request_authorization in order to use
|
|
"anonymous" internal auth. This is only useful in specific circumstances and
|
|
likely to break things in other circumstances. See comments in
|
|
core-go/auth/middleware.go for details.
|
|
"""
|
|
|
|
def encrypt_request_authorization(user=None, client_id=None):
|
|
"""
|
|
Returns request headers which can be used to authenticate an HTTP request
|
|
to other sr.ht services. This is used for internal HTTP requests between
|
|
sr.ht services.
|
|
"""
|
|
if not user and not client_id:
|
|
from srht.oauth import current_user
|
|
user = current_user
|
|
if user is internal_anon:
|
|
user = None
|
|
auth = {
|
|
"name": user.username if user else None,
|
|
"client_id": "core.sr.ht",
|
|
"node_id": "core.sr.ht legacy",
|
|
"oauth_client_id": client_id if client_id else None,
|
|
}
|
|
auth = fernet.encrypt(json.dumps(auth).encode())
|
|
return {
|
|
"Authorization": f"Internal {auth.decode()}",
|
|
}
|