350 lines
13 KiB
Python
Executable File
350 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from srht.config import cfg, get_origin
|
|
from srht.database import db, DbSession
|
|
from srht.graphql import exec_gql
|
|
db = DbSession(cfg("todo.sr.ht", "connection-string"))
|
|
import todosrht.types
|
|
db.init()
|
|
|
|
from aiosmtpd.lmtp import SMTP, LMTP
|
|
from email.utils import parseaddr
|
|
from grp import getgrnam
|
|
from todosrht.access import get_tracker, get_ticket
|
|
from todosrht.types import TicketAccess, TicketResolution, Tracker, Ticket, User
|
|
from todosrht.types import Label, TicketLabel, TicketSubscription, Event, EventType, ParticipantType
|
|
from todosrht.tickets import add_comment, get_participant_for_email
|
|
from srht.validation import Validation
|
|
import asyncio
|
|
import email
|
|
import email.policy
|
|
import os
|
|
import shlex
|
|
import signal
|
|
import sys
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
class MailHandler:
|
|
def lookup_destination(self, address, sender):
|
|
# Address formats are:
|
|
# Tracker (opening a new ticket):
|
|
# ~username/tracker@todo.sr.ht
|
|
# or (for shitty MTAs):
|
|
# u.username.tracker@todo.sr.ht
|
|
# Ticket (participating in dicsussion):
|
|
# ~username/tracker/1234@todo.sr.ht
|
|
# or (for shitty MTAs):
|
|
# u.username.tracker.1234@todo.sr.ht
|
|
# Tracker (un)subscribe:
|
|
# ~username/tracker/subscribe@todo.sr.ht
|
|
# ~username/tracker/unsubscribe@todo.sr.ht
|
|
# or (for shitty MTAs):
|
|
# u.username.tracker.subscribe@todo.sr.ht
|
|
# u.username.tracker.unsubscribe@todo.sr.ht
|
|
# Ticket (un)subscribe:
|
|
# ~username/tracker/1234/subscribe@todo.sr.ht
|
|
# ~username/tracker/1234/unsubscribe@todo.sr.ht
|
|
# or (for shitty MTAs):
|
|
# u.username.tracker.1234.subscribe@todo.sr.ht
|
|
# u.username.tracker.1234.unsubscribe@todo.sr.ht
|
|
address = address[:address.rfind("@")]
|
|
ticket_id = None
|
|
sub_action = None
|
|
if address.startswith("~"):
|
|
# TODO: user groups
|
|
parts = address.split("/")
|
|
if len(parts) == 2:
|
|
owner, tracker_name = parts
|
|
elif len(parts) == 3:
|
|
owner, tracker_name, ticket_id = parts
|
|
try:
|
|
ticket_id = int(ticket_id)
|
|
except:
|
|
sub_action = ticket_id
|
|
ticket_id = None
|
|
if sub_action == "subscribe":
|
|
sub_action = True
|
|
elif sub_action == "unsubscribe":
|
|
sub_action = False
|
|
else:
|
|
return None, None, None
|
|
elif len(parts) == 4:
|
|
owner, tracker_name, ticket_id, sub_action = parts
|
|
try:
|
|
ticket_id = int(ticket_id)
|
|
except:
|
|
return None, None, None
|
|
if sub_action == "subscribe":
|
|
sub_action = True
|
|
elif sub_action == "unsubscribe":
|
|
sub_action = False
|
|
else:
|
|
return None, None, None
|
|
else:
|
|
return None, None, None
|
|
else:
|
|
address = address.split(".")
|
|
if len(address) == 3:
|
|
prefix, owner, tracker_name = address
|
|
elif len(address) == 4:
|
|
prefix, owner, tracker_name, ticket_id = address
|
|
try:
|
|
ticket_id = int(ticket_id)
|
|
except:
|
|
sub_action = ticket_id
|
|
ticket_id = None
|
|
if sub_action == "subscribe":
|
|
sub_action = True
|
|
elif sub_action == "unsubscribe":
|
|
sub_action = False
|
|
else:
|
|
return None, None, None
|
|
elif len(address) == 5:
|
|
prefix, owner, tracker_name, ticket_id, sub_action = address
|
|
try:
|
|
ticket_id = int(ticket_id)
|
|
except:
|
|
return None, None, None
|
|
if sub_action == "subscribe":
|
|
sub_action = True
|
|
elif sub_action == "unsubscribe":
|
|
sub_action = False
|
|
else:
|
|
return None, None, None
|
|
else:
|
|
return None, None, None
|
|
if prefix == "u":
|
|
owner = "~" + owner
|
|
else:
|
|
# TODO: user groups
|
|
return None, None, None
|
|
# TODO: ACLs for email participants
|
|
tracker, access = get_tracker(owner, tracker_name, user=sender.user)
|
|
if not ticket_id:
|
|
return tracker, sub_action, access
|
|
ticket, access = get_ticket(tracker, ticket_id, user=sender.user)
|
|
return ticket, sub_action, access
|
|
|
|
async def handle_RCPT(self, server, session,
|
|
envelope, address, rcpt_options):
|
|
print("RCPT {}".format(address))
|
|
envelope.rcpt_tos.append(address)
|
|
return "250 OK"
|
|
|
|
async def handle_tracker_message(self, tracker, sender, access, mail, body):
|
|
if not TicketAccess.submit in access:
|
|
print("Rejected, insufficient permissions")
|
|
return "550 You do not have permission to post on this tracker."
|
|
|
|
# Need to commit in case a new participant was created
|
|
db.session.commit()
|
|
|
|
valid = Validation({})
|
|
|
|
input = {
|
|
"subject": mail["Subject"],
|
|
"body": body,
|
|
"senderId": sender.id,
|
|
"messageId": mail["Message-ID"],
|
|
}
|
|
|
|
resp = exec_gql("todo.sr.ht", """
|
|
mutation SubmitTicketEmail($trackerId: Int!, $input: SubmitTicketEmailInput!) {
|
|
submitTicketEmail(trackerId: $trackerId, input: $input) {
|
|
id
|
|
}
|
|
}
|
|
""", user=tracker.owner, valid=valid, trackerId=tracker.id, input=input)
|
|
|
|
if not valid.ok:
|
|
print("Rejecting email due to validation errors")
|
|
return "550 " + ", ".join([e.message for e in valid.errors])
|
|
|
|
ticket, _ = get_ticket(tracker, resp["submitTicketEmail"]["id"], user=sender.user)
|
|
|
|
print(f"Created ticket {ticket.ref()}")
|
|
return "250 Message accepted for delivery"
|
|
|
|
async def handle_ticket_message(self, ticket, sender, access, mail, body):
|
|
required_access = TicketAccess.comment
|
|
last_line = body.splitlines()[-1]
|
|
|
|
# Need to commit in case a new participant was created
|
|
db.session.commit()
|
|
|
|
valid = Validation({})
|
|
|
|
input = {
|
|
"text": body,
|
|
"senderId": sender.id,
|
|
}
|
|
|
|
cmds = ["!resolve", "!resolved", "!reopen"]
|
|
if sender.participant_type == ParticipantType.user:
|
|
# TODO: This should be possible via ACLs later
|
|
cmds += ["!assign", "!label", "!unlabel"]
|
|
if any(last_line.startswith(cmd) for cmd in cmds):
|
|
cmd = shlex.split(last_line)
|
|
input["text"] = body.rstrip()[:-len(last_line)-1].rstrip()
|
|
required_access = TicketAccess.triage
|
|
if cmd[0] in ["!resolve", "!resolved"] and len(cmd) == 2:
|
|
input["cmd"] = "RESOLVE"
|
|
input["resolution"] = cmd[1].upper()
|
|
elif cmd[0] == "!reopen":
|
|
input["cmd"] = "REOPEN"
|
|
elif cmd[0] == "!label" or cmd[0] == "!unlabel":
|
|
if cmd[0] == "!label":
|
|
input["cmd"] = "LABEL"
|
|
else:
|
|
input["cmd"] = "UNLABEL"
|
|
labels = Label.query.filter(
|
|
Label.name.in_(cmd[1:]),
|
|
Label.tracker_id == ticket.tracker_id).all()
|
|
if len(labels) != len(cmd) - 1:
|
|
return ("550 The label you requested does not exist on " +
|
|
"this tracker.")
|
|
if not TicketAccess.triage in access:
|
|
print(f"Rejected, {sender.name} has insufficient " +
|
|
f"permissions (have {access}, want triage)")
|
|
return "550 You do not have permission to triage on this tracker."
|
|
input["labelIds"] = [label.id for label in labels]
|
|
# TODO: Remaining commands
|
|
|
|
if not required_access in access:
|
|
print(f"Rejected, {sender.name} has insufficient " +
|
|
f"permissions (have {access}, want {required_access})")
|
|
return "550 You do not have permission to post on this tracker."
|
|
|
|
if body and 3 > len(body) > 16384:
|
|
print("Rejected, invalid comment length")
|
|
return "550 Comment must be between 3 and 16384 characters."
|
|
|
|
resp = exec_gql("todo.sr.ht", """
|
|
mutation SubmitCommentEmail($trackerId: Int!, $ticketId: Int!, $input: SubmitCommentEmailInput!) {
|
|
submitCommentEmail(trackerId: $trackerId, ticketId: $ticketId, input: $input) {
|
|
id
|
|
}
|
|
}
|
|
""", user=ticket.tracker.owner, valid=valid, trackerId=ticket.tracker_id, ticketId=ticket.scoped_id, input=input)
|
|
|
|
if not valid.ok:
|
|
print("Rejecting email due to validation errors")
|
|
return "550 " + ", ".join([e.message for e in valid.errors])
|
|
|
|
print(f"Added comment to {ticket.ref()}")
|
|
return "250 Message accepted for delivery"
|
|
|
|
async def handle_DATA(self, server, session, envelope):
|
|
try:
|
|
return await self._handle_DATA(server, session, envelope)
|
|
except:
|
|
db.session.rollback()
|
|
raise
|
|
|
|
async def _handle_DATA(self, server, session, envelope):
|
|
address = envelope.rcpt_tos[0]
|
|
|
|
mail = email.message_from_bytes(envelope.content,
|
|
policy=email.policy.SMTP)
|
|
name, sender_addr = parseaddr(mail["From"])
|
|
sender = get_participant_for_email(sender_addr, name)
|
|
|
|
dest, sub_action, access = self.lookup_destination(address, sender)
|
|
if dest is None:
|
|
print("Rejected, destination not found")
|
|
return "550 The tracker or ticket you requested does not exist."
|
|
|
|
if sub_action is not None:
|
|
return await self.handle_un_subscription(dest, sender, sub_action)
|
|
|
|
body = None
|
|
for part in mail.walk():
|
|
if part.is_multipart():
|
|
continue
|
|
content_type = part.get_content_type()
|
|
[charset] = part.get_charsets("utf-8")
|
|
if content_type == 'text/plain' and not body:
|
|
body = part.get_payload(decode=True).decode(charset)
|
|
if content_type == 'text/html':
|
|
print("Rejected, HTML email")
|
|
return "550 HTML emails are not permitted on SourceHut"
|
|
if not body:
|
|
print("Rejected, requires plaintext part")
|
|
return ("550 At least one text/plain part is required " +
|
|
"to use this service.")
|
|
|
|
if isinstance(dest, Tracker):
|
|
return await self.handle_tracker_message(
|
|
dest, sender, access, mail, body)
|
|
elif isinstance(dest, Ticket):
|
|
return await self.handle_ticket_message(
|
|
dest, sender, access, mail, body)
|
|
else:
|
|
assert False
|
|
|
|
async def handle_un_subscription(self, dest, participant, do_subscribe):
|
|
if isinstance(dest, Tracker):
|
|
tracker_id = dest.id
|
|
ticket_id = None
|
|
elif isinstance(dest, Ticket):
|
|
tracker_id = None
|
|
ticket_id = dest.id
|
|
else:
|
|
assert False
|
|
|
|
sub = (TicketSubscription.query
|
|
.filter(TicketSubscription.tracker_id == tracker_id)
|
|
.filter(TicketSubscription.ticket_id == ticket_id)
|
|
.filter(TicketSubscription.participant_id == participant.id)
|
|
).one_or_none()
|
|
|
|
if sub:
|
|
if do_subscribe:
|
|
return "250 Already subscribed"
|
|
db.session.delete(sub)
|
|
else:
|
|
if not do_subscribe:
|
|
return "250 Not subscribed"
|
|
sub = TicketSubscription()
|
|
sub.tracker_id = tracker_id
|
|
sub.ticket_id = ticket_id
|
|
sub.participant_id = participant.id
|
|
db.session.add(sub)
|
|
|
|
db.session.commit()
|
|
if do_subscribe:
|
|
print(f"Subscribed to {dest.ref()}")
|
|
return "250 Subscribed"
|
|
else:
|
|
print(f"Unsubscribed from {dest.ref()}")
|
|
return "250 Unsubscribed"
|
|
|
|
async def create_server():
|
|
handler = MailHandler()
|
|
sock = cfg("todo.sr.ht::mail", "sock")
|
|
if "/" in sock:
|
|
await loop.create_unix_server(
|
|
lambda: LMTP(handler, enable_SMTPUTF8=True),
|
|
path=sock)
|
|
os.chmod(sock, 0o775)
|
|
sock_group = cfg("todo.sr.ht::mail", "sock-group", default=None)
|
|
if sock_group is not None:
|
|
sock_gid = getgrnam(sock_group).gr_gid
|
|
os.chown(sock, os.getuid(), sock_gid)
|
|
else:
|
|
host, port = sock.split(":")
|
|
await loop.create_server(
|
|
lambda: SMTP(handler, enable_SMTPUTF8=True),
|
|
host=host, port=int(port))
|
|
|
|
def sigint_handler():
|
|
print("Exiting due to SIGINT")
|
|
sys.exit(0)
|
|
|
|
loop.add_signal_handler(signal.SIGINT, sigint_handler)
|
|
|
|
print("Starting incoming mail daemon")
|
|
loop.run_until_complete(create_server())
|
|
loop.run_forever()
|
|
loop.close()
|