import json import os.path import requests import yaml from abc import ABC from flask import url_for from jinja2 import Markup, escape from srht.api import ensure_webhooks, encrypt_request_authorization, get_results from srht.markdown import markdown, sanitize from srht.config import get_origin _gitsrht = get_origin("git.sr.ht", external=True, default=None) _hgsrht = get_origin("hg.sr.ht", external=True, default=None) _listsrht = get_origin("lists.sr.ht", external=True, default=None) _todosrht = get_origin("todo.sr.ht", external=True, default=None) _buildsrht = get_origin("builds.sr.ht", default=None) origin = get_origin("hub.sr.ht") readme_names = ["README.md", "README.markdown", "README"] def format_readme(content, filename="", link_prefix=None): markdown_exts = ['.md', '.markdown'] basename, ext = os.path.splitext(filename) if ext in markdown_exts: html = markdown(content, link_prefix=link_prefix) elif content: html = f"
{escape(content)}
" else: html = "" return Markup(html) def try_html_readme(session, prefix, user, repo_name): r = session.get(f"{prefix}/api/repos/{repo_name}/readme", headers=encrypt_request_authorization(user)) if r.status_code == 200: return Markup(sanitize(r.text)) elif r.status_code == 404: return None else: raise Exception(r.text) class SrhtService(ABC): def __init__(self): self.session = requests.Session() def post(self, user, valid, url, payload): r = self.session.post(url, headers=encrypt_request_authorization(user), json=payload) if r.status_code == 400: if valid: for error in r.json()["errors"]: valid.error(error["reason"], field=error.get("field")) return None elif r.status_code not in [200, 201]: raise Exception(r.text) return r.json() def put(self, user, valid, url, payload): r = self.session.put(url, headers=encrypt_request_authorization(user), json=payload) if r.status_code == 400: if valid: for error in r.json()["errors"]: valid.error(error["reason"], field=error.get("field")) return None elif r.status_code not in [200, 201]: raise Exception(r.text) return r.json() class GitService(SrhtService): def __init__(self): super().__init__() def get_repos(self, user): return get_results(f"{_gitsrht}/api/repos", user) def get_repo(self, user, repo_name): r = self.session.get(f"{_gitsrht}/api/repos/{repo_name}", headers=encrypt_request_authorization(user)) if r.status_code != 200: raise Exception(r.text) return r.json() def get_readme(self, user, repo_id, repo_url): readme_query = """ query Readme($repoId: Int!) { repository(id: $repoId) { html: readme md: path(path: "README.md") { ...textData } markdown: path(path: "README.markdown") { ...textData } plaintext: path(path: "README") { ...textData } } } fragment textData on TreeEntry { object { ... on TextBlob { text } } } """ r = self.post(user, None, f"{_gitsrht}/query", { "query": readme_query, "variables": { "repoId": repo_id, }, }) if not r["data"]["repository"]: raise Exception(f"git.sr.ht returned no repository for ID {repo_id}: " + json.dumps(r, indent=1)) repo = r["data"]["repository"] content = repo["html"] if content: return Markup(sanitize(content)) content = repo["md"] or repo["markdown"] if content: blob_prefix = repo_url + "/blob/HEAD/" rendered_prefix = repo_url + "/tree/HEAD/" html = markdown(content["object"]["text"], link_prefix=[rendered_prefix, blob_prefix]) return Markup(html) content = repo["plaintext"] if content: content = content["object"]["text"] return Markup(f"
{escape(content)}
") return None def get_manifests(self, user, repo_id): manifests_query = """ query Manifests($repoId: Int!) { repository(id: $repoId) { multiple: path(path:".builds") { object { ... on Tree { entries { results { name object { ... on TextBlob { text } } } } } } }, single: path(path:".build.yml") { object { ... on TextBlob { text } } } } } """ r = self.post(user, None, f"{_gitsrht}/query", { "query": manifests_query, "variables": { "repoId": repo_id, }, }) if not r["data"]["repository"]: raise Exception(f"git.sr.ht did not find repo ID {repo_id} (requesting on behalf of {user.username})\n" + json.dumps(r, indent=1)) manifests = dict() if r["data"]["repository"]["multiple"]: for ent in r["data"]["repository"]["multiple"]["object"]\ ["entries"]["results"]: if not ent["object"]: continue manifests[ent["name"]] = ent["object"]["text"] elif r["data"]["repository"]["single"]: manifests[".build.yml"] = r["data"]["repository"]["single"]\ ["object"]["text"] else: return None return manifests def log(self, user, repo, old, new): query = """ query Log($owner: String!, $repo: String!, $from: String!) { repositoryByOwner(owner: $owner, repo: $repo) { log(from: $from) { results { id message author { name } } } } } """ r = self.post(user, None, f"{_gitsrht}/query", { "query": query, "variables": { "owner": repo.owner.canonical_name, "repo": repo.name, "from": new, } }) commits = [] for c in r["data"]["repositoryByOwner"]["log"]["results"]: if c["id"] == old: break commits.append(c) return commits def create_repo(self, user, valid, visibility): query = """ mutation CreateRepo( $name: String!, $description: String, $visibility: Visibility!) { createRepository(name: $name, description: $description, visibility: $visibility) { id, name, description, visibility } } """ name = valid.require("name") description = valid.require("description") if not valid.ok: return None r = self.post(user, None, f"{_gitsrht}/query", { "query": query, "variables": { "name": name, "visibility": visibility.value.upper(), "description": description, } }) if not r["data"] or not r["data"]["createRepository"]: for error in r["errors"]: valid.error(error["message"]) return None repo = r["data"]["createRepository"] repo["visibility"] = repo["visibility"].lower() return r["data"]["createRepository"] def delete_repo(self, user, repo_id): query = """ mutation DeleteRepo($id: Int!) { deleteRepository(id: $id) { id } } """ self.post(user, None, f"{_gitsrht}/query", { "query": query, "variables": { "id": repo_id, }, }) def ensure_user_webhooks(self, user): config = { origin + url_for("webhooks.git_user", user_id=user.id): ["repo:update", "repo:delete"], } ensure_webhooks(user, f"{_gitsrht}/api/user/webhooks", config) def unensure_user_webhooks(self, user): config = { } try: ensure_webhooks(user, f"{_gitsrht}/api/user/webhooks", config) except: pass # nbd, upstream was probably deleted def ensure_repo_webhooks(self, repo): config = { origin + url_for("webhooks.git_repo", repo_id=repo.id): ["repo:post-update"], } owner = repo.owner url = f"{_gitsrht}/api/{owner.canonical_name}/repos/{repo.name}/webhooks" ensure_webhooks(owner, url, config) def unensure_repo_webhooks(self, repo): config = { } owner = repo.owner url = f"{_gitsrht}/api/{owner.canonical_name}/repos/{repo.name}/webhooks" try: ensure_webhooks(owner, url, config) except: pass # nbd, upstream was presumably deleted class HgService(SrhtService): def __init__(self): super().__init__() def get_repos(self, user): return get_results(f"{_hgsrht}/api/repos", user) def get_repo(self, user, repo_name): r = self.session.get(f"{_hgsrht}/api/repos/{repo_name}", headers=encrypt_request_authorization(user)) if r.status_code != 200: raise Exception(r.text) return r.json() def get_readme(self, user, repo_name, repo_url): # TODO: Cache? override = try_html_readme(self.session, _hgsrht, user, repo_name) if override is not None: return override blob_prefix = repo_url + "/raw/" rendered_prefix = repo_url + "/browse/" for readme_name in readme_names: r = self.session.get(f"{_hgsrht}/api/repos/{repo_name}/raw/{readme_name}", headers=encrypt_request_authorization(user)) if r.status_code == 404: continue elif r.status_code != 200: raise Exception(r.text) return format_readme(r.text, readme_name, link_prefix=[rendered_prefix, blob_prefix]) return format_readme("") def create_repo(self, user, valid, visibility): name = valid.require("name") description = valid.require("description") if not valid.ok: return None return self.post(user, valid, f"{_hgsrht}/api/repos", { "name": name, "description": description, "visibility": visibility.value, }) def delete_repo(self, user, repo_name): r = self.session.delete(f"{_hgsrht}/api/repos/{repo_name}", headers=encrypt_request_authorization(user)) if r.status_code != 204 and r.status_code != 404: raise Exception(r.text) def ensure_user_webhooks(self, user): config = { origin + url_for("webhooks.hg_user", user_id=user.id): ["repo:update", "repo:delete"], } ensure_webhooks(user, f"{_hgsrht}/api/user/webhooks", config) def unensure_user_webhooks(self, user): config = { } try: ensure_webhooks(user, f"{_hgsrht}/api/user/webhooks", config) except: pass # nbd, upstream was presumably deleted class ListService(SrhtService): def get_lists(self, user): return get_results(f"{_listsrht}/api/lists", user) def get_list(self, user, list_name): r = self.session.get(f"{_listsrht}/api/lists/{list_name}", headers=encrypt_request_authorization(user)) if r.status_code != 200: raise Exception(r.json()) return r.json() def ensure_mailing_list_webhooks(self, mailing_list): config = { origin + url_for("webhooks.mailing_list", list_id=mailing_list.id): ["list:update", "list:delete", "post:received", "patchset:received"], } owner = mailing_list.owner url = f"{_listsrht}/api/user/{owner.canonical_name}/lists/{mailing_list.name}/webhooks" ensure_webhooks(owner, url, config) def unensure_mailing_list_webhooks(self, mailing_list): config = { } owner = mailing_list.owner url = f"{_listsrht}/api/user/{owner.canonical_name}/lists/{mailing_list.name}/webhooks" try: ensure_webhooks(owner, url, config) except: pass # nbd, upstream was presumably deleted def create_list(self, user, valid): name = valid.require("name") description = valid.optional("description") if not valid.ok: return None return self.post(user, valid, f"{_listsrht}/api/lists", { "name": name, "description": description, }) def delete_list(self, user, list_name): r = self.session.delete(f"{_listsrht}/api/lists/{list_name}", headers=encrypt_request_authorization(user)) if r.status_code != 204 and r.status_code != 404: raise Exception(r.text) def patchset_set_tool(self, user, list_name, patchset_id, key, icon, details): return self.put(user, None, f"{_listsrht}/api/lists/{list_name}/patchsets/{patchset_id}/tools", { "key": key, "icon": icon, "details": details, }) class TodoService(SrhtService): def get_trackers(self, user): return get_results(f"{_todosrht}/api/trackers", user) def get_tracker(self, user, tracker_name): r = self.session.get(f"{_todosrht}/api/trackers/{tracker_name}", headers=encrypt_request_authorization(user)) if r.status_code != 200: raise Exception(r.json()) return r.json() def create_tracker(self, user, valid, visibility): name = valid.require("name") description = valid.optional("description") if not valid.ok: return None return self.post(user, valid, f"{_todosrht}/api/trackers", { "name": name, "description": description, "visibility": visibility.value.upper(), }) def delete_tracker(self, user, tracker_name): r = self.session.delete(f"{_todosrht}/api/trackers/{tracker_name}", headers=encrypt_request_authorization(user)) if r.status_code != 204 and r.status_code != 404: raise Exception(r.text) def ensure_user_webhooks(self, user): config = { origin + url_for("webhooks.todo_user", user_id=user.id): ["tracker:update", "tracker:delete"] } url = f"{_todosrht}/api/user/webhooks" ensure_webhooks(user, url, config) def unensure_user_webhooks(self, user): config = { } url = f"{_todosrht}/api/user/webhooks" try: ensure_webhooks(user, url, config) except: pass # nbd, upstream was presumably deleted def ensure_tracker_webhooks(self, tracker): config = { origin + url_for("webhooks.todo_tracker", tracker_id=tracker.id): ["ticket:create"] } owner = tracker.owner url = f"{_todosrht}/api/user/{owner.canonical_name}/trackers/{tracker.name}/webhooks" ensure_webhooks(owner, url, config) def unensure_tracker_webhooks(self, tracker): config = { } owner = tracker.owner url = f"{_todosrht}/api/user/{owner.canonical_name}/trackers/{tracker.name}/webhooks" try: ensure_webhooks(owner, url, config) except: pass # nbd, upstream was presumably deleted def ensure_ticket_webhooks(self, tracker, ticket_id): config = { origin + url_for("webhooks.todo_ticket", tracker_id=tracker.id): ["event:create"] } owner = tracker.owner url = f"{_todosrht}/api/user/{owner.canonical_name}/trackers/{tracker.name}/tickets/{ticket_id}/webhooks" ensure_webhooks(owner, url, config) def unensure_ticket_webhooks(self, tracker, ticket_id): config = { } owner = tracker.owner url = f"{_todosrht}/api/user/{owner.canonical_name}/trackers/{tracker.name}/tickets/{ticket_id}/webhooks" try: ensure_webhooks(owner, url, config) except: pass # nbd, upstream was presumably deleted def update_ticket(self, user, owner, tracker, ticket, comment, resolution=None): url = f"{_todosrht}/api/user/{owner}/trackers/{tracker}/tickets/{ticket}" payload = {"comment": comment} if resolution is not None: payload["resolution"] = resolution payload["status"] = "resolved" self.put(user, None, url, payload) class BuildService(SrhtService): def submit_build(self, user, manifest, note, tags, execute=True): return self.post(user, None, f"{_buildsrht}/api/jobs", { "manifest": yaml.dump(manifest.to_dict(), default_flow_style=False), "tags": tags, "note": note, "secrets": False, "execute": execute, }) def create_group(self, user, job_ids, note, triggers): return self.post(user, None, f"{_buildsrht}/api/job-group", { "jobs": job_ids, "note": note, "execute": True, "triggers": triggers, }) git = GitService() hg = HgService() lists = ListService() todo = TodoService() builds = BuildService()