core.sr.ht/srht/api.py

93 lines
3.3 KiB
Python

import requests
from flask import current_app, request
from srht.crypto import encrypt_request_authorization
from werkzeug.local import LocalProxy
_default = 1
def paginated_response(id_col, query,
order_by=_default, serialize=_default, per_page=50, **kwargs):
"""
Returns a standard paginated response for a given SQLAlchemy query result.
The id_col should be the column to paginate by (generally the primary key),
which is compared <= the ?start=<n> argument.
order_by will ordered by the ID column, descending, by default. Otherwise
provide a SQLAlchemy expression, e.g. SomeType.some_col.desc().
serialize is a function which will serialize each record into a dict. The
default is lambda r: r.to_dict().
per_page is the number of results per page to return. Default is 50.
"""
total = query.count()
start = request.args.get('start') or -1
if isinstance(start, str):
start = int(start)
if start != -1:
query = query.filter(id_col <= start)
if order_by is _default:
order_by = id_col.desc()
if order_by is not None:
if isinstance(order_by, tuple):
query = query.order_by(*order_by)
else:
query = query.order_by(order_by)
records = query.limit(per_page + 1).all()
if len(records) != per_page + 1:
next_id = None
else:
next_id = str(records[-1].id)
records = records[:per_page]
if serialize is _default:
serialize = lambda r: r.to_dict(**kwargs)
return {
"next": next_id,
"results": [serialize(record) for record in records],
"total": total,
"results_per_page": per_page,
}
def get_authorization(user):
return encrypt_request_authorization(user)
def get_results(url, user):
response = {"next": -1}
while response.get("next") is not None:
rurl = f"{url}?start={response['next']}"
r = requests.get(rurl, headers=get_authorization(user))
if r.status_code != 200:
raise Exception(r.text)
response = r.json()
yield from response["results"]
def ensure_webhooks(user, baseurl, webhooks):
"""
Ensures that the specified webhooks are rigged up. Webhooks should be a
dict whose key is the webhook URL and whose values are the list of events
to send to that URL, or None to unconfigure this webhook.
"""
for webhook in get_results(baseurl, user):
url = webhook["url"]
if url not in webhooks:
continue
if webhook["events"] == webhooks[url]:
del webhooks[url]
continue # This webhook already configured
# This webhook is set up incorrectly, delete it
webhook_url = f"{baseurl}/{webhook['id']}"
r = requests.delete(webhook_url,
headers=get_authorization(user))
if r.status_code != 204:
raise Exception(f"Failed to remove invalid webhook: {r.text}")
if webhooks[url] is None:
del webhooks[url]
for url, events in webhooks.items():
if not events:
continue
r = requests.post(baseurl,
headers=get_authorization(user),
json={"events": events, "url": url})
if r.status_code != 201:
raise Exception(f"Failed to create webhook: {r.text}")