todo.sr.ht/todosrht-lmtp

362 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
from srht.config import cfg, get_origin
from srht.database import db, DbSession
db = DbSession(cfg("todo.sr.ht", "connection-string"))
import todosrht.types
db.init()
from aiosmtpd.lmtp import SMTP, LMTP
from email.utils import parseaddr
from grp import getgrnam
from todosrht.access import get_tracker, get_ticket
from todosrht.types import TicketAccess, TicketResolution, Tracker, Ticket, User
from todosrht.types import Label, TicketLabel, TicketSubscription, Event, EventType, ParticipantType
from todosrht.tickets import add_comment, get_participant_for_email, submit_ticket
from todosrht.webhooks import UserWebhook, TrackerWebhook, TicketWebhook
from srht.validation import Validation
import asyncio
import email
import email.policy
import os
import shlex
import signal
import sys
loop = asyncio.new_event_loop()
class MailHandler:
def lookup_destination(self, address, sender):
# Address formats are:
# Tracker (opening a new ticket):
# ~username/tracker@todo.sr.ht
# or (for shitty MTAs):
# u.username.tracker@todo.sr.ht
# Ticket (participating in dicsussion):
# ~username/tracker/1234@todo.sr.ht
# or (for shitty MTAs):
# u.username.tracker.1234@todo.sr.ht
# Tracker (un)subscribe:
# ~username/tracker/subscribe@todo.sr.ht
# ~username/tracker/unsubscribe@todo.sr.ht
# or (for shitty MTAs):
# u.username.tracker.subscribe@todo.sr.ht
# u.username.tracker.unsubscribe@todo.sr.ht
# Ticket (un)subscribe:
# ~username/tracker/1234/subscribe@todo.sr.ht
# ~username/tracker/1234/unsubscribe@todo.sr.ht
# or (for shitty MTAs):
# u.username.tracker.1234.subscribe@todo.sr.ht
# u.username.tracker.1234.unsubscribe@todo.sr.ht
address = address[:address.rfind("@")]
ticket_id = None
sub_action = None
if address.startswith("~"):
# TODO: user groups
parts = address.split("/")
if len(parts) == 2:
owner, tracker_name = parts
elif len(parts) == 3:
owner, tracker_name, ticket_id = parts
try:
ticket_id = int(ticket_id)
except:
sub_action = ticket_id
ticket_id = None
if sub_action == "subscribe":
sub_action = True
elif sub_action == "unsubscribe":
sub_action = False
else:
return None, None, None
elif len(parts) == 4:
owner, tracker_name, ticket_id, sub_action = parts
try:
ticket_id = int(ticket_id)
except:
return None, None, None
if sub_action == "subscribe":
sub_action = True
elif sub_action == "unsubscribe":
sub_action = False
else:
return None, None, None
else:
return None, None, None
else:
address = address.split(".")
if len(address) == 3:
prefix, owner, tracker_name = address
elif len(address) == 4:
prefix, owner, tracker_name, ticket_id = address
try:
ticket_id = int(ticket_id)
except:
sub_action = ticket_id
ticket_id = None
if sub_action == "subscribe":
sub_action = True
elif sub_action == "unsubscribe":
sub_action = False
else:
return None, None, None
elif len(address) == 5:
prefix, owner, tracker_name, ticket_id, sub_action = address
try:
ticket_id = int(ticket_id)
except:
return None, None, None
if sub_action == "subscribe":
sub_action = True
elif sub_action == "unsubscribe":
sub_action = False
else:
return None, None, None
else:
return None, None, None
if prefix == "u":
owner = "~" + owner
else:
# TODO: user groups
return None, None, None
# TODO: ACLs for email participants
tracker, access = get_tracker(owner, tracker_name, user=sender.user)
if not ticket_id:
return tracker, sub_action, access
ticket, access = get_ticket(tracker, ticket_id, user=sender.user)
return ticket, sub_action, access
async def handle_RCPT(self, server, session,
envelope, address, rcpt_options):
print("RCPT {}".format(address))
envelope.rcpt_tos.append(address)
return "250 OK"
async def handle_tracker_message(self, tracker, sender, access, mail, body):
if not TicketAccess.submit in access:
print("Rejected, insufficient permissions")
return "550 You do not have permission to post on this tracker."
subject = mail["Subject"]
valid = Validation({
"title": subject,
"description": body,
})
title = valid.require("title")
desc = valid.optional("description")
valid.expect(not title or 3 <= len(title) <= 2048,
"Title must be between 3 and 2048 characters",
field="title")
valid.expect(not desc or len(desc) < 16384,
"Description must be no more than 16384 characters",
field="description")
if not valid.ok:
print("Rejecting email due to validation errors")
return "550 " + ", ".join([e.message for e in valid.errors])
ticket = submit_ticket(tracker, sender, title, desc,
from_email=True, from_email_id=mail["Message-ID"])
UserWebhook.deliver(UserWebhook.Events.ticket_create,
ticket.to_dict(),
UserWebhook.Subscription.user_id == sender.id)
TrackerWebhook.deliver(TrackerWebhook.Events.ticket_create,
ticket.to_dict(),
TrackerWebhook.Subscription.tracker_id == tracker.id)
print(f"Created ticket {ticket.ref()}")
return "250 Message accepted for delivery"
async def handle_ticket_message(self, ticket, sender, access, mail, body):
required_access = TicketAccess.comment
last_line = body.splitlines()[-1]
resolution = None
resolve = reopen = False
cmds = ["!resolve", "!resolved", "!reopen"]
if sender.participant_type == ParticipantType.user:
# TODO: This should be possible via ACLs later
cmds += ["!assign", "!label", "!unlabel"]
if any(last_line.startswith(cmd) for cmd in cmds):
cmd = shlex.split(last_line)
body = body[:-len(last_line)-1].rstrip()
required_access = TicketAccess.triage
if cmd[0] in ["!resolve", "!resolved"] and len(cmd) == 2:
resolve = True
resolution = TicketResolution[cmd[1].lower()]
elif cmd[0] == "!reopen":
reopen = True
elif cmd[0] == "!label" or cmd[0] == "!unlabel":
labels = Label.query.filter(
Label.name.in_(cmd[1:]),
Label.tracker_id == ticket.tracker_id).all()
if len(labels) != len(cmd) - 1:
return ("550 The label you requested does not exist on " +
"this tracker.")
if not TicketAccess.triage in access:
print(f"Rejected, {sender.name} has insufficient " +
f"permissions (have {access}, want triage)")
return "550 You do not have permission to triage on this tracker."
for label in labels:
ticket_label = (TicketLabel.query
.filter(TicketLabel.label_id == label.id)
.filter(TicketLabel.ticket_id == ticket.id)).first()
event = Event()
event.participant_id = sender.id
event.ticket_id = ticket.id
event.label_id = label.id
if not ticket_label and cmd[0] == "!label":
# TODO: only supported for user participants
ticket_label = TicketLabel()
ticket_label.ticket_id = ticket.id
ticket_label.label_id = label.id
ticket_label.user_id = sender.id
db.session.add(ticket_label)
event.event_type = EventType.label_added
elif ticket_label and cmd[0] == "!unlabel":
db.session.delete(ticket_label)
event.event_type = EventType.label_removed
db.session.add(event)
db.session.commit()
TicketWebhook.deliver(TicketWebhook.Events.event_create,
event.to_dict(),
TicketWebhook.Subscription.ticket_id == ticket.id)
TrackerWebhook.deliver(TrackerWebhook.Events.event_create,
event.to_dict(),
TrackerWebhook.Subscription.tracker_id == ticket.tracker_id)
# TODO: Remaining commands
if not required_access in access:
print(f"Rejected, {sender.name} has insufficient " +
f"permissions (have {access}, want {required_access})")
return "550 You do not have permission to post on this tracker."
if body and 3 > len(body) > 16384:
print("Rejected, invalid comment length")
return "550 Comment must be between 3 and 16384 characters."
event = add_comment(sender, ticket, text=body,
resolution=resolution, resolve=resolve, reopen=reopen, from_email=True)
TicketWebhook.deliver(TicketWebhook.Events.event_create,
event.to_dict(),
TicketWebhook.Subscription.ticket_id == ticket.id)
TrackerWebhook.deliver(TrackerWebhook.Events.event_create,
event.to_dict(),
TrackerWebhook.Subscription.tracker_id == ticket.tracker_id)
print(f"Added comment to {ticket.ref()}")
return "250 Message accepted for delivery"
async def handle_DATA(self, server, session, envelope):
try:
return await self._handle_DATA(server, session, envelope)
except:
db.session.rollback()
raise
async def _handle_DATA(self, server, session, envelope):
address = envelope.rcpt_tos[0]
mail = email.message_from_bytes(envelope.content,
policy=email.policy.SMTP)
name, sender_addr = parseaddr(mail["From"])
sender = get_participant_for_email(sender_addr, name)
dest, sub_action, access = self.lookup_destination(address, sender)
if dest is None:
print("Rejected, destination not found")
return "550 The tracker or ticket you requested does not exist."
if sub_action is not None:
return await self.handle_un_subscription(dest, sender, sub_action)
body = None
for part in mail.walk():
if part.is_multipart():
continue
content_type = part.get_content_type()
[charset] = part.get_charsets("utf-8")
if content_type == 'text/plain' and not body:
body = part.get_payload(decode=True).decode(charset)
if content_type == 'text/html':
print("Rejected, HTML email")
return "550 HTML emails are not permitted on SourceHut"
if not body:
print("Rejected, requires plaintext part")
return ("550 At least one text/plain part is required " +
"to use this service.")
if isinstance(dest, Tracker):
return await self.handle_tracker_message(
dest, sender, access, mail, body)
elif isinstance(dest, Ticket):
return await self.handle_ticket_message(
dest, sender, access, mail, body)
else:
assert False
async def handle_un_subscription(self, dest, participant, do_subscribe):
if isinstance(dest, Tracker):
tracker_id = dest.id
ticket_id = None
elif isinstance(dest, Ticket):
tracker_id = None
ticket_id = dest.id
else:
assert False
sub = (TicketSubscription.query
.filter(TicketSubscription.tracker_id == tracker_id)
.filter(TicketSubscription.ticket_id == ticket_id)
.filter(TicketSubscription.participant_id == participant.id)
).one_or_none()
if sub:
if do_subscribe:
return "250 Already subscribed"
db.session.delete(sub)
else:
if not do_subscribe:
return "250 Not subscribed"
sub = TicketSubscription()
sub.tracker_id = tracker_id
sub.ticket_id = ticket_id
sub.participant_id = participant.id
db.session.add(sub)
db.session.commit()
if do_subscribe:
print(f"Subscribed to {dest.ref()}")
return "250 Subscribed"
else:
print(f"Unsubscribed from {dest.ref()}")
return "250 Unsubscribed"
async def create_server():
handler = MailHandler()
sock = cfg("todo.sr.ht::mail", "sock")
if "/" in sock:
await loop.create_unix_server(
lambda: LMTP(handler, enable_SMTPUTF8=True),
path=sock)
os.chmod(sock, 0o775)
sock_group = cfg("todo.sr.ht::mail", "sock-group", default=None)
if sock_group is not None:
sock_gid = getgrnam(sock_group).gr_gid
os.chown(sock, os.getuid(), sock_gid)
else:
host, port = sock.split(":")
await loop.create_server(
lambda: SMTP(handler, enable_SMTPUTF8=True),
host=host, port=int(port))
def sigint_handler():
print("Exiting due to SIGINT")
sys.exit(0)
loop.add_signal_handler(signal.SIGINT, sigint_handler)
print("Starting incoming mail daemon")
loop.run_until_complete(create_server())
loop.run_forever()
loop.close()