lists.sr.ht/listssrht/blueprints/user.py

189 lines
7.0 KiB
Python

from email.mime.text import MIMEText
from email.utils import parseaddr, formatdate, make_msgid
from flask import current_app, Blueprint, render_template, request, redirect, url_for, abort
from flask import session
from srht.config import cfg, cfgi
from srht.database import db
from srht.oauth import UserType, current_user, loginrequired
from srht.flask import paginate_query
from srht.graphql import exec_gql
from srht.search import search_by
from srht.validation import Validation
from sqlalchemy import or_
from listssrht.types import List, ListAccess, User, Email, Subscription, Mirror, Visibility
from listssrht.webhooks import UserWebhook
import re
import smtplib
user = Blueprint("user", __name__)
smtp_host = cfg("mail", "smtp-host", default=None)
smtp_port = cfgi("mail", "smtp-port", default=None)
smtp_user = cfg("mail", "smtp-user", default=None)
smtp_password = cfg("mail", "smtp-password", default=None)
@user.route("/")
def index():
if not current_user:
return render_template("index.html")
recent = (Email.query
.join(List)
.join(Subscription)
.filter(Email.list_id == List.id)
.filter(Subscription.list_id == List.id)
.filter(Subscription.user_id == current_user.id)
.order_by(Email.created.desc())).limit(10).all()
subs = [sub.list for sub in (Subscription.query
.join(List)
.filter(Subscription.user_id == current_user.id)
.order_by(List.updated.desc())).limit(10).all()]
notice = session.pop("notice", None)
return render_template("dashboard.html", recent=recent,
subs=subs, notice=notice)
@user.route("/~<username>")
def user_profile(username):
user = User.query.filter(User.username == username).first()
if not user:
abort(404)
recent = Email.query.filter(Email.sender_id == user.id)
lists = List.query.filter(List.owner_id == user.id)
if not current_user or current_user.id != user.id:
lists = lists.filter(List.visibility == Visibility.PUBLIC)
recent = recent.join(List).filter(List.visibility == Visibility.PUBLIC)
recent = recent.order_by(Email.created.desc()).limit(10).all()
lists = lists.order_by(List.updated.desc()).limit(10).all()
return render_template("user.html",
user=user, recent=recent, lists=lists, parseaddr=parseaddr)
@user.route("/lists/~<username>")
def lists_for_user(username):
user = User.query.filter(User.username == username).first()
if not user:
abort(404)
lists = List.query.filter(List.owner_id == user.id)
if not current_user or current_user.id != user.id:
lists = lists.filter(List.visibility == Visibility.PUBLIC)
lists = lists.order_by(List.updated.desc())
terms = request.args.get('search')
if terms:
lists = search_by(lists, terms, [List.name, List.description], {})
lists, pagination = paginate_query(lists)
return render_template("user-lists.html",
user=user, lists=lists, search=terms, **pagination)
@user.route("/lists/create")
@loginrequired
def create_list_GET():
if (cfg("lists.sr.ht", "allow-new-lists", default="yes") != "yes"
and current_user.user_type != UserType.admin):
abort(401)
return render_template("create.html")
@user.route("/lists/create", methods=["POST"])
def create_list_POST():
if (cfg("lists.sr.ht", "allow-new-lists", default="yes") != "yes"
and current_user.user_type != UserType.admin):
abort(401)
valid = Validation(request)
name = valid.require("name", friendly_name="Name")
description = valid.optional("description")
visibility = valid.require("visibility")
if not valid.ok:
return render_template("create.html", **valid.kwargs)
resp = exec_gql(current_app.site, """
mutation CreateMailingList($name: String!, $description: String, $visibility: Visibility!) {
createMailingList(name: $name, description: $description, visibility: $visibility) {
name
owner {
canonicalName
}
}
}
""", valid=valid, name=name, description=description, visibility=visibility)
if not valid.ok:
return render_template("create.html", **valid.kwargs)
resp = resp["createMailingList"]
return redirect(url_for("archives.archive",
owner_name=resp["owner"]["canonicalName"],
list_name=resp["name"]))
@user.route("/lists/create-mirror")
@loginrequired
def create_mirror_GET():
return render_template("create-mirror.html")
def mirror_subscribe(ml, mirror):
posting_domain = cfg("lists.sr.ht", "posting-domain")
list_name = "u.{}.{}".format(ml.owner.username, ml.name)
smtp = smtplib.SMTP(smtp_host, smtp_port)
smtp.ehlo()
smtp.starttls()
smtp.login(smtp_user, smtp_password)
mail = MIMEText(f"Subscription request for {posting_domain} on behalf of "
f"{ml.owner.canonical_name}\n\n"
"If this email is unexpected, feel free to ignore it, or send "
"questions to:\n\n"
f"{cfg('sr.ht', 'owner-name')} <{cfg('sr.ht', 'owner-email')}>")
mail["X-Mirroring-To"] = posting_domain
mail["Subject"] = "subscribe"
mail["To"] = mirror.list_subscribe
mail["From"] = f"{posting_domain} mirror <{list_name}@{posting_domain}>"
mail["Date"] = formatdate()
mail["Message-ID"] = make_msgid()
smtp.sendmail(smtp_user, [mirror.list_subscribe], mail.as_string(
unixfrom=True, maxheaderlen=998))
smtp.quit()
@user.route("/lists/create-mirror", methods=["POST"])
@loginrequired
def create_mirror_POST():
valid = Validation(request)
ml = List(current_user, valid)
address = valid.require("address", friendly_name="Subscription address")
valid.expect(not address or "@" in address,
"A valid email address is required", field="address")
weird_ok = valid.optional("weird-email-okay")
valid.expect(not address or weird_ok == "yes" or "subscribe" in address,
"This address does not look like a subscription address. Double "
"check it and click 'Create' if you're certain.", field="address")
if not valid.ok:
return render_template("create-mirror.html", **valid.kwargs)
posting_domain = cfg("lists.sr.ht", "posting-domain")
user, domain = address.split("@")
valid.expect(domain != posting_domain,
"You can't mirror a list from {{cfg('sr.ht', 'site-name')}}!",
field="address")
if not valid.ok:
return render_template("create-mirror.html", **valid.kwargs)
mirror = Mirror()
mirror.list_subscribe = address
db.session.add(mirror)
db.session.flush()
ml.mirror_id = mirror.id
mirror_subscribe(ml, mirror)
UserWebhook.deliver(UserWebhook.Events.list_create,
ml.to_dict(), UserWebhook.Subscription.user_id == ml.owner_id)
db.session.commit()
return redirect(url_for("archives.archive",
owner_name=current_user.canonical_name,
list_name=ml.name))