api/graph: Implement GraphQL-native user webhooks

Implement GraphQL-native user webhooks for tracker creation, updates,
and deletion.
This commit is contained in:
Adnan Maolood 2022-04-04 15:14:48 -04:00 committed by Drew DeVault
parent 65f5f914b9
commit ae7916d7fd
8 changed files with 824 additions and 73 deletions

View File

@ -8,6 +8,7 @@ require (
github.com/Masterminds/squirrel v1.4.0
github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/emersion/go-message v0.15.0
github.com/google/uuid v1.0.0
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.8.0

233
api/graph/model/webhooks.go Normal file
View File

@ -0,0 +1,233 @@
package model
import (
"context"
"database/sql"
"fmt"
"strconv"
"time"
"git.sr.ht/~sircmpwn/core-go/database"
"git.sr.ht/~sircmpwn/core-go/model"
sq "github.com/Masterminds/squirrel"
"github.com/lib/pq"
)
type WebhookDelivery struct {
UUID string `json:"uuid"`
Date time.Time `json:"date"`
Event WebhookEvent `json:"event"`
RequestBody string `json:"requestBody"`
ResponseBody *string `json:"responseBody"`
ResponseHeaders *string `json:"responseHeaders"`
ResponseStatus *int `json:"responseStatus"`
ID int
SubscriptionID int
Name string
alias string
fields *database.ModelFields
}
func (whd *WebhookDelivery) WithName(name string) *WebhookDelivery {
whd.Name = name
return whd
}
func (whd *WebhookDelivery) As(alias string) *WebhookDelivery {
whd.alias = alias
return whd
}
func (whd *WebhookDelivery) Alias() string {
return whd.alias
}
func (whd *WebhookDelivery) Table() string {
return "gql_" + whd.Name + "_wh_delivery"
}
func (whd *WebhookDelivery) Fields() *database.ModelFields {
if whd.fields != nil {
return whd.fields
}
whd.fields = &database.ModelFields{
Fields: []*database.FieldMap{
{"uuid", "uuid", &whd.UUID},
{"date", "date", &whd.Date},
{"event", "event", &whd.Event},
{"request_body", "requestBody", &whd.RequestBody},
{"response_body", "responseBody", &whd.ResponseBody},
{"response_headers", "responseHeaders", &whd.ResponseHeaders},
{"response_status", "responseStatus", &whd.ResponseStatus},
// Always fetch:
{"id", "", &whd.ID},
{"subscription_id", "", &whd.SubscriptionID},
},
}
return whd.fields
}
func (whd *WebhookDelivery) QueryWithCursor(ctx context.Context,
runner sq.BaseRunner, q sq.SelectBuilder,
cur *model.Cursor) ([]*WebhookDelivery, *model.Cursor) {
var (
err error
rows *sql.Rows
)
if cur.Next != "" {
next, _ := strconv.ParseInt(cur.Next, 10, 64)
q = q.Where(database.WithAlias(whd.alias, "id")+"<= ?", next)
}
q = q.
OrderBy(database.WithAlias(whd.alias, "id") + " DESC").
Limit(uint64(cur.Count + 1))
if rows, err = q.RunWith(runner).QueryContext(ctx); err != nil {
panic(err)
}
defer rows.Close()
var deliveries []*WebhookDelivery
for rows.Next() {
var delivery WebhookDelivery
if err := rows.Scan(database.Scan(ctx, &delivery)...); err != nil {
panic(err)
}
delivery.Name = whd.Name
deliveries = append(deliveries, &delivery)
}
if len(deliveries) > cur.Count {
cur = &model.Cursor{
Count: cur.Count,
Next: strconv.Itoa(deliveries[len(deliveries)-1].ID),
Search: cur.Search,
}
deliveries = deliveries[:cur.Count]
} else {
cur = nil
}
return deliveries, cur
}
type UserWebhookSubscription struct {
ID int `json:"id"`
Events []WebhookEvent `json:"events"`
Query string `json:"query"`
URL string `json:"url"`
UserID int
AuthMethod string
ClientID *string
TokenHash *string
Expires *time.Time
Grants *string
NodeID *string
alias string
fields *database.ModelFields
}
func (we *WebhookEvent) Scan(src interface{}) error {
bytes, ok := src.([]uint8)
if !ok {
return fmt.Errorf("Unable to scan from %T into WebhookEvent", src)
}
*we = WebhookEvent(string(bytes))
if !we.IsValid() {
return fmt.Errorf("%s is not a valid WebhookEvent", string(bytes))
}
return nil
}
func (UserWebhookSubscription) IsWebhookSubscription() {}
func (sub *UserWebhookSubscription) As(alias string) *UserWebhookSubscription {
sub.alias = alias
return sub
}
func (sub *UserWebhookSubscription) Alias() string {
return sub.alias
}
func (sub *UserWebhookSubscription) Table() string {
return "gql_user_wh_sub"
}
func (sub *UserWebhookSubscription) Fields() *database.ModelFields {
if sub.fields != nil {
return sub.fields
}
sub.fields = &database.ModelFields{
Fields: []*database.FieldMap{
{"events", "events", pq.Array(&sub.Events)},
{"url", "url", &sub.URL},
// Always fetch:
{"id", "", &sub.ID},
{"query", "", &sub.Query},
{"user_id", "", &sub.UserID},
{"auth_method", "", &sub.AuthMethod},
{"token_hash", "", &sub.TokenHash},
{"client_id", "", &sub.ClientID},
{"grants", "", &sub.Grants},
{"expires", "", &sub.Expires},
{"node_id", "", &sub.NodeID},
},
}
return sub.fields
}
func (sub *UserWebhookSubscription) QueryWithCursor(ctx context.Context,
runner sq.BaseRunner, q sq.SelectBuilder,
cur *model.Cursor) ([]WebhookSubscription, *model.Cursor) {
var (
err error
rows *sql.Rows
)
if cur.Next != "" {
next, _ := strconv.ParseInt(cur.Next, 10, 64)
q = q.Where(database.WithAlias(sub.alias, "id")+"<= ?", next)
}
q = q.
OrderBy(database.WithAlias(sub.alias, "id")).
Limit(uint64(cur.Count + 1))
if rows, err = q.RunWith(runner).QueryContext(ctx); err != nil {
panic(err)
}
defer rows.Close()
var (
subs []WebhookSubscription
lastID int
)
for rows.Next() {
var sub UserWebhookSubscription
if err := rows.Scan(database.Scan(ctx, &sub)...); err != nil {
panic(err)
}
subs = append(subs, &sub)
lastID = sub.ID
}
if len(subs) > cur.Count {
cur = &model.Cursor{
Count: cur.Count,
Next: strconv.Itoa(lastID),
Search: cur.Search,
}
subs = subs[:cur.Count]
} else {
cur = nil
}
return subs, cur
}

View File

@ -8,6 +8,12 @@ scalar Upload
"Used to provide a human-friendly description of an access scope"
directive @scopehelp(details: String!) on ENUM_VALUE
"""
This is used to decorate fields which are only accessible with a personal
access token, and are not available to clients using OAuth 2.0 access tokens.
"""
directive @private on FIELD_DEFINITION
enum AccessScope {
PROFILE @scopehelp(details: "profile information")
TRACKERS @scopehelp(details: "trackers")
@ -127,6 +133,86 @@ type Tracker {
export: URL!
}
type OAuthClient {
uuid: String!
}
enum WebhookEvent {
TRACKER_CREATED @access(scope: TRACKERS, kind: RO)
TRACKER_UPDATE @access(scope: TRACKERS, kind: RO)
TRACKER_DELETED @access(scope: TRACKERS, kind: RO)
TICKET_CREATED @access(scope: TICKETS, kind: RO)
}
interface WebhookSubscription {
id: Int!
events: [WebhookEvent!]!
query: String!
url: String!
"""
If this webhook was registered by an authorized OAuth 2.0 client, this
field is non-null.
"""
client: OAuthClient @private
"All deliveries which have been sent to this webhook."
deliveries(cursor: Cursor): WebhookDeliveryCursor!
"Returns a sample payload for this subscription, for testing purposes"
sample(event: WebhookEvent!): String!
}
type UserWebhookSubscription implements WebhookSubscription {
id: Int!
events: [WebhookEvent!]!
query: String!
url: String!
client: OAuthClient @private
deliveries(cursor: Cursor): WebhookDeliveryCursor!
sample(event: WebhookEvent): String!
}
type WebhookDelivery {
uuid: String!
date: Time!
event: WebhookEvent!
subscription: WebhookSubscription!
requestBody: String!
"""
These details are provided only after a response is received from the
remote server. If a response is sent whose Content-Type is not text/*, or
cannot be decoded as UTF-8, the response body will be null. It will be
truncated after 64 KiB.
"""
responseBody: String
responseHeaders: String
responseStatus: Int
}
interface WebhookPayload {
uuid: String!
event: WebhookEvent!
date: Time!
}
type TrackerEvent implements WebhookPayload {
uuid: String!
event: WebhookEvent!
date: Time!
tracker: Tracker!
}
type TicketEvent implements WebhookPayload {
uuid: String!
event: WebhookEvent!
date: Time!
ticket: Ticket!
}
enum TicketStatus {
REPORTED
CONFIRMED
@ -439,6 +525,30 @@ type ActivitySubscriptionCursor {
cursor: Cursor
}
"""
A cursor for enumerating a list of webhook deliveries
If there are additional results available, the cursor object may be passed
back into the same endpoint to retrieve another page. If the cursor is null,
there are no remaining results to return.
"""
type WebhookDeliveryCursor {
results: [WebhookDelivery!]!
cursor: Cursor
}
"""
A cursor for enumerating a list of webhook subscriptions
If there are additional results available, the cursor object may be passed
back into the same endpoint to retrieve another page. If the cursor is null,
there are no remaining results to return.
"""
type WebhookSubscriptionCursor {
results: [WebhookSubscription!]!
cursor: Cursor
}
type Query {
"Returns API version information."
version: Version!
@ -481,6 +591,25 @@ type Query {
"List of subscriptions of the authenticated user."
subscriptions(cursor: Cursor): ActivitySubscriptionCursor @access(scope: SUBSCRIPTIONS, kind: RO)
"""
Returns a list of user webhook subscriptions. For clients
authenticated with a personal access token, this returns all webhooks
configured by all GraphQL clients for your account. For clients
authenticated with an OAuth 2.0 access token, this returns only webhooks
registered for your client.
"""
userWebhooks(cursor: Cursor): WebhookSubscriptionCursor!
"Returns details of a user webhook subscription by its ID."
userWebhook(id: Int!): WebhookSubscription
"""
Returns information about the webhook currently being processed. This is
not valid during normal queries over HTTP, and will return an error if used
outside of a webhook context.
"""
webhook: WebhookPayload!
}
"You may omit any fields to leave them unchanged."
@ -579,6 +708,12 @@ input UpdateStatusInput {
import: ImportInput
}
input UserWebhookInput {
url: String!
events: [WebhookEvent!]!
query: String!
}
type Mutation {
"""
Creates a new bug tracker. If specified, the 'import' field specifies a
@ -695,4 +830,27 @@ type Mutation {
"Removes a list from the list of labels for a ticket"
unlabelTicket(trackerId: Int!, ticketId: Int!,
labelId: Int!): Event @access(scope: TICKETS, kind: RW)
"""
Creates a new user webhook subscription. When an event from the
provided list of events occurs, the 'query' parameter (a GraphQL query)
will be evaluated and the results will be sent to the provided URL as the
body of an HTTP POST request. The list of events must include at least one
event, and no duplicates.
This query is evaluated in the webhook context, such that query { webhook }
may be used to access details of the event which trigged the webhook. The
query may not make any mutations.
"""
createWebhook(config: UserWebhookInput!): WebhookSubscription!
"""
Deletes a user webhook. Any events already queued may still be
delivered after this request completes. Clients authenticated with a
personal access token may delete any webhook registered for their account,
but authorized OAuth 2.0 clients may only delete their own webhooks.
Manually deleting a webhook configured by a third-party client may cause
unexpected behavior with the third-party integration.
"""
deleteWebhook(id: Int!): WebhookSubscription
}

View File

@ -16,7 +16,9 @@ import (
"git.sr.ht/~sircmpwn/core-go/config"
"git.sr.ht/~sircmpwn/core-go/database"
coremodel "git.sr.ht/~sircmpwn/core-go/model"
"git.sr.ht/~sircmpwn/core-go/server"
"git.sr.ht/~sircmpwn/core-go/valid"
corewebhooks "git.sr.ht/~sircmpwn/core-go/webhooks"
"git.sr.ht/~sircmpwn/todo.sr.ht/api/graph/api"
"git.sr.ht/~sircmpwn/todo.sr.ht/api/graph/model"
"git.sr.ht/~sircmpwn/todo.sr.ht/api/loaders"
@ -191,6 +193,7 @@ func (r *mutationResolver) CreateTracker(ctx context.Context, name string, descr
return nil, err
}
webhooks.DeliverLegacyTrackerEvent(ctx, &tracker, "tracker:create")
webhooks.DeliverTrackerEvent(ctx, model.WebhookEventTrackerCreated, &tracker)
return &tracker, nil
}
@ -216,30 +219,32 @@ func (r *mutationResolver) UpdateTracker(ctx context.Context, id int, input map[
return nil, nil
}
tracker, err := loaders.ForContext(ctx).TrackersByID.Load(id)
if err != nil || tracker == nil {
return nil, err
}
if tracker.OwnerID != auth.ForContext(ctx).UserID {
return nil, fmt.Errorf("Access denied")
}
var tracker model.Tracker
if err := database.WithTx(ctx, nil, func(tx *sql.Tx) error {
var err error
if len(input) != 0 {
_, err = query.
Where(database.WithAlias(tracker.Alias(), `id`)+"= ?", tracker.ID).
Set(database.WithAlias(tracker.Alias(), `updated`),
sq.Expr(`now() at time zone 'utc'`)).
RunWith(tx).
ExecContext(ctx)
query = query.
Where(`id = ?`, id).
Where(`owner_id = ?`, auth.ForContext(ctx).UserID).
Set(`updated`, sq.Expr(`now() at time zone 'utc'`)).
Suffix(`RETURNING
id, created, updated, name, description, visibility,
default_access, owner_id`)
row := query.RunWith(tx).QueryRowContext(ctx)
if err := row.Scan(&tracker.ID, &tracker.Created, &tracker.Updated,
&tracker.Name, &tracker.Description, &tracker.Visibility,
&tracker.DefaultAccess, &tracker.OwnerID); err != nil {
if err == sql.ErrNoRows {
return fmt.Errorf("No tracker by ID %d found for this user", id)
}
return err
}
return err
return nil
}); err != nil {
return nil, err
}
webhooks.DeliverLegacyTrackerEvent(ctx, tracker, "tracker:update")
return tracker, nil
webhooks.DeliverLegacyTrackerEvent(ctx, &tracker, "tracker:update")
webhooks.DeliverTrackerEvent(ctx, model.WebhookEventTrackerUpdate, &tracker)
return &tracker, nil
}
func (r *mutationResolver) DeleteTracker(ctx context.Context, id int) (*model.Tracker, error) {
@ -270,6 +275,7 @@ func (r *mutationResolver) DeleteTracker(ctx context.Context, id int) (*model.Tr
}
webhooks.DeliverLegacyTrackerDelete(ctx, tracker.ID, user.UserID)
webhooks.DeliverTrackerEvent(ctx, model.WebhookEventTrackerDeleted, &tracker)
return &tracker, nil
}
@ -312,19 +318,30 @@ func (r *mutationResolver) UpdateUserACL(ctx context.Context, trackerID int, use
func (r *mutationResolver) UpdateTrackerACL(ctx context.Context, trackerID int, input model.ACLInput) (*model.DefaultACL, error) {
bits := aclBits(input)
user := auth.ForContext(ctx)
var tracker model.Tracker // Need to load tracker data for webhook delivery
if err := database.WithTx(ctx, nil, func(tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
row := tx.QueryRowContext(ctx, `
UPDATE tracker
SET default_access = $1
WHERE id = $2 AND owner_id = $3;
WHERE id = $2 AND owner_id = $3
RETURNING
id, created, updated, name, description, visibility,
default_access, owner_id;
`, bits, trackerID, user.UserID)
return err
if err := row.Scan(&tracker.ID, &tracker.Created, &tracker.Updated,
&tracker.Name, &tracker.Description, &tracker.Visibility,
&tracker.DefaultAccess, &tracker.OwnerID); err != nil {
return err
}
return nil
}); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
webhooks.DeliverLegacyTrackerEvent(ctx, &tracker, "tracker:update")
webhooks.DeliverTrackerEvent(ctx, model.WebhookEventTrackerUpdate, &tracker)
acl := &model.DefaultACL{}
acl.SetBits(bits)
return acl, nil
@ -803,6 +820,7 @@ func (r *mutationResolver) SubmitTicket(ctx context.Context, trackerID int, inpu
return nil, err
}
webhooks.DeliverLegacyTicketCreate(ctx, tracker, &ticket)
webhooks.DeliverTicketEvent(ctx, model.WebhookEventTicketCreated, &ticket)
return &ticket, nil
}
@ -1539,6 +1557,110 @@ func (r *mutationResolver) UnlabelTicket(ctx context.Context, trackerID int, tic
return &event, nil
}
func (r *mutationResolver) CreateWebhook(ctx context.Context, config model.UserWebhookInput) (model.WebhookSubscription, error) {
schema := server.ForContext(ctx).Schema
if err := corewebhooks.Validate(schema, config.Query); err != nil {
return nil, err
}
user := auth.ForContext(ctx)
ac, err := corewebhooks.NewAuthConfig(ctx)
if err != nil {
return nil, err
}
var sub model.UserWebhookSubscription
if len(config.Events) == 0 {
return nil, fmt.Errorf("Must specify at least one event")
}
events := make([]string, len(config.Events))
for i, ev := range config.Events {
events[i] = ev.String()
// TODO: gqlgen does not support doing anything useful with directives
// on enums at the time of writing, so we have to do a little bit of
// manual fuckery
var access string
switch ev {
case model.WebhookEventTrackerCreated, model.WebhookEventTrackerUpdate,
model.WebhookEventTrackerDeleted:
access = "TRACKERS"
case model.WebhookEventTicketCreated:
access = "TICKETS"
}
if !user.Grants.Has(access, auth.RO) {
return nil, fmt.Errorf("Insufficient access granted for webhook event %s", ev.String())
}
}
u, err := url.Parse(config.URL)
if err != nil {
return nil, err
} else if u.Host == "" {
return nil, fmt.Errorf("Cannot use URL without host")
} else if u.Scheme != "http" && u.Scheme != "https" {
return nil, fmt.Errorf("Cannot use non-HTTP or HTTPS URL")
}
if err := database.WithTx(ctx, nil, func(tx *sql.Tx) error {
row := tx.QueryRowContext(ctx, `
INSERT INTO gql_user_wh_sub (
created, events, url, query,
auth_method,
token_hash, grants, client_id, expires,
node_id,
user_id
) VALUES (
NOW() at time zone 'utc',
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10
) RETURNING id, url, query, events, user_id;`,
pq.Array(events), config.URL, config.Query,
ac.AuthMethod,
ac.TokenHash, ac.Grants, ac.ClientID, ac.Expires, // OAUTH2
ac.NodeID, // INTERNAL
user.UserID)
if err := row.Scan(&sub.ID, &sub.URL,
&sub.Query, pq.Array(&sub.Events), &sub.UserID); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return &sub, nil
}
func (r *mutationResolver) DeleteWebhook(ctx context.Context, id int) (model.WebhookSubscription, error) {
var sub model.UserWebhookSubscription
filter, err := corewebhooks.FilterWebhooks(ctx)
if err != nil {
return nil, err
}
if err := database.WithTx(ctx, nil, func(tx *sql.Tx) error {
row := sq.Delete(`gql_user_wh_sub`).
PlaceholderFormat(sq.Dollar).
Where(sq.And{sq.Expr(`id = ?`, id), filter}).
Suffix(`RETURNING id, url, query, events, user_id`).
RunWith(tx).
QueryRowContext(ctx)
if err := row.Scan(&sub.ID, &sub.URL,
&sub.Query, pq.Array(&sub.Events), &sub.UserID); err != nil {
return err
}
return nil
}); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
return &sub, nil
}
func (r *queryResolver) Version(ctx context.Context) (*model.Version, error) {
return &model.Version{
Major: 0,
@ -1651,6 +1773,79 @@ func (r *queryResolver) Subscriptions(ctx context.Context, cursor *coremodel.Cur
return &model.ActivitySubscriptionCursor{subs, cursor}, nil
}
func (r *queryResolver) UserWebhooks(ctx context.Context, cursor *coremodel.Cursor) (*model.WebhookSubscriptionCursor, error) {
if cursor == nil {
cursor = coremodel.NewCursor(nil)
}
filter, err := corewebhooks.FilterWebhooks(ctx)
if err != nil {
return nil, err
}
var subs []model.WebhookSubscription
if err := database.WithTx(ctx, &sql.TxOptions{
Isolation: 0,
ReadOnly: true,
}, func(tx *sql.Tx) error {
sub := (&model.UserWebhookSubscription{}).As(`sub`)
query := database.
Select(ctx, sub).
From(`gql_user_wh_sub sub`).
Where(filter)
subs, cursor = sub.QueryWithCursor(ctx, tx, query, cursor)
return nil
}); err != nil {
return nil, err
}
return &model.WebhookSubscriptionCursor{subs, cursor}, nil
}
func (r *queryResolver) UserWebhook(ctx context.Context, id int) (model.WebhookSubscription, error) {
var sub model.UserWebhookSubscription
filter, err := corewebhooks.FilterWebhooks(ctx)
if err != nil {
return nil, err
}
if err := database.WithTx(ctx, &sql.TxOptions{
Isolation: 0,
ReadOnly: true,
}, func(tx *sql.Tx) error {
row := database.
Select(ctx, &sub).
From(`gql_user_wh_sub`).
Where(sq.And{sq.Expr(`id = ?`, id), filter}).
RunWith(tx).
QueryRowContext(ctx)
if err := row.Scan(database.Scan(ctx, &sub)...); err != nil {
return err
}
return nil
}); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
return &sub, nil
}
func (r *queryResolver) Webhook(ctx context.Context) (model.WebhookPayload, error) {
raw, err := corewebhooks.Payload(ctx)
if err != nil {
return nil, err
}
payload, ok := raw.(model.WebhookPayload)
if !ok {
panic("Invalid webhook payload context")
}
return payload, nil
}
func (r *statusChangeResolver) Ticket(ctx context.Context, obj *model.StatusChange) (*model.Ticket, error) {
return loaders.ForContext(ctx).TicketsByID.Load(obj.TicketID)
}
@ -1995,6 +2190,78 @@ func (r *userMentionResolver) Mentioned(ctx context.Context, obj *model.UserMent
return loaders.ForContext(ctx).EntitiesByParticipantID.Load(obj.MentionedID)
}
func (r *userWebhookSubscriptionResolver) Client(ctx context.Context, obj *model.UserWebhookSubscription) (*model.OAuthClient, error) {
if obj.ClientID == nil {
return nil, nil
}
return &model.OAuthClient{
UUID: *obj.ClientID,
}, nil
}
func (r *userWebhookSubscriptionResolver) Deliveries(ctx context.Context, obj *model.UserWebhookSubscription, cursor *coremodel.Cursor) (*model.WebhookDeliveryCursor, error) {
if cursor == nil {
cursor = coremodel.NewCursor(nil)
}
var deliveries []*model.WebhookDelivery
if err := database.WithTx(ctx, &sql.TxOptions{
Isolation: 0,
ReadOnly: true,
}, func(tx *sql.Tx) error {
d := (&model.WebhookDelivery{}).
WithName(`profile`).
As(`delivery`)
query := database.
Select(ctx, d).
From(`gql_user_wh_delivery delivery`).
Where(`delivery.subscription_id = ?`, obj.ID)
deliveries, cursor = d.QueryWithCursor(ctx, tx, query, cursor)
return nil
}); err != nil {
return nil, err
}
return &model.WebhookDeliveryCursor{deliveries, cursor}, nil
}
func (r *userWebhookSubscriptionResolver) Sample(ctx context.Context, obj *model.UserWebhookSubscription, event *model.WebhookEvent) (string, error) {
// TODO
panic(fmt.Errorf("not implemented"))
}
func (r *webhookDeliveryResolver) Subscription(ctx context.Context, obj *model.WebhookDelivery) (model.WebhookSubscription, error) {
if obj.Name == "" {
panic("WebhookDelivery without name")
}
// XXX: This could use a loader but it's unlikely to be a bottleneck
var sub model.WebhookSubscription
if err := database.WithTx(ctx, &sql.TxOptions{
Isolation: 0,
ReadOnly: true,
}, func(tx *sql.Tx) error {
// XXX: This needs some work to generalize to other kinds of webhooks
subscription := (&model.UserWebhookSubscription{}).As(`sub`)
// Note: No filter needed because, if we have access to the delivery,
// we also have access to the subscription.
row := database.
Select(ctx, subscription).
From(`gql_user_wh_sub sub`).
Where(`sub.id = ?`, obj.SubscriptionID).
RunWith(tx).
QueryRowContext(ctx)
if err := row.Scan(database.Scan(ctx, subscription)...); err != nil {
return err
}
sub = subscription
return nil
}); err != nil {
return nil, err
}
return sub, nil
}
// Assignment returns api.AssignmentResolver implementation.
func (r *Resolver) Assignment() api.AssignmentResolver { return &assignmentResolver{r} }
@ -2050,6 +2317,14 @@ func (r *Resolver) User() api.UserResolver { return &userResolver{r} }
// UserMention returns api.UserMentionResolver implementation.
func (r *Resolver) UserMention() api.UserMentionResolver { return &userMentionResolver{r} }
// UserWebhookSubscription returns api.UserWebhookSubscriptionResolver implementation.
func (r *Resolver) UserWebhookSubscription() api.UserWebhookSubscriptionResolver {
return &userWebhookSubscriptionResolver{r}
}
// WebhookDelivery returns api.WebhookDeliveryResolver implementation.
func (r *Resolver) WebhookDelivery() api.WebhookDeliveryResolver { return &webhookDeliveryResolver{r} }
type assignmentResolver struct{ *Resolver }
type commentResolver struct{ *Resolver }
type createdResolver struct{ *Resolver }
@ -2067,3 +2342,5 @@ type trackerACLResolver struct{ *Resolver }
type trackerSubscriptionResolver struct{ *Resolver }
type userResolver struct{ *Resolver }
type userMentionResolver struct{ *Resolver }
type userWebhookSubscriptionResolver struct{ *Resolver }
type webhookDeliveryResolver struct{ *Resolver }

View File

@ -5,13 +5,13 @@ import (
"git.sr.ht/~sircmpwn/core-go/config"
"git.sr.ht/~sircmpwn/core-go/server"
"git.sr.ht/~sircmpwn/core-go/webhooks"
"github.com/99designs/gqlgen/graphql"
"git.sr.ht/~sircmpwn/todo.sr.ht/api/graph"
"git.sr.ht/~sircmpwn/todo.sr.ht/api/graph/api"
"git.sr.ht/~sircmpwn/todo.sr.ht/api/graph/model"
"git.sr.ht/~sircmpwn/todo.sr.ht/api/loaders"
"git.sr.ht/~sircmpwn/todo.sr.ht/api/webhooks"
)
func main() {
@ -30,15 +30,17 @@ func main() {
scopes[i] = s.String()
}
webhookQueue := webhooks.NewQueue(schema)
legacyWebhooks := webhooks.NewLegacyQueue()
server.NewServer("todo.sr.ht", appConfig).
WithDefaultMiddleware().
WithMiddleware(
loaders.Middleware,
webhooks.Middleware(webhookQueue),
webhooks.LegacyMiddleware(legacyWebhooks),
).
WithSchema(schema, scopes).
WithQueues(legacyWebhooks.Queue).
WithQueues(webhookQueue.Queue, legacyWebhooks.Queue).
Run()
}

View File

@ -6,7 +6,6 @@ package webhooks
import (
"context"
"encoding/json"
"net/http"
"strings"
"time"
@ -113,28 +112,6 @@ type EventWebhookPayload struct {
FromTicket *TicketWebhookPayload `json:"from_ticket"`
}
func NewLegacyQueue() *webhooks.LegacyQueue {
return webhooks.NewLegacyQueue()
}
var legacyWebhooksCtxKey = &contextKey{"legacy-webhooks"}
type contextKey struct {
name string
}
func LegacyMiddleware(
queue *webhooks.LegacyQueue,
) func(next http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), legacyWebhooksCtxKey, queue)
r = r.WithContext(ctx)
next.ServeHTTP(w, r)
})
}
}
func mkaccess(tracker *model.Tracker) []string {
var items []string
if tracker.DefaultAccess&model.ACCESS_BROWSE != 0 {
@ -189,11 +166,7 @@ func mkparticipant(part model.Entity) *ParticipantWebhookPayload {
func DeliverLegacyTrackerEvent(ctx context.Context,
tracker *model.Tracker, ev string) {
q, ok := ctx.Value(legacyWebhooksCtxKey).(*webhooks.LegacyQueue)
if !ok {
panic("No legacy webhooks worker for this context")
}
q := webhooks.LegacyForContext(ctx)
user := auth.ForContext(ctx)
if user.UserID != tracker.OwnerID {
panic("Submitting webhook for another user's context (why?)")
@ -227,11 +200,7 @@ func DeliverLegacyTrackerEvent(ctx context.Context,
}
func DeliverLegacyTrackerDelete(ctx context.Context, trackerId, userId int) {
q, ok := ctx.Value(legacyWebhooksCtxKey).(*webhooks.LegacyQueue)
if !ok {
panic("No legacy webhooks worker for this context")
}
q := webhooks.LegacyForContext(ctx)
type WebhookPayload struct {
ID int `json:"id"`
}
@ -252,10 +221,7 @@ func DeliverLegacyTrackerDelete(ctx context.Context, trackerId, userId int) {
func DeliverLegacyLabelCreate(ctx context.Context,
tracker *model.Tracker, label *model.Label) {
q, ok := ctx.Value(legacyWebhooksCtxKey).(*webhooks.LegacyQueue)
if !ok {
panic("No legacy webhooks worker for this context")
}
q := webhooks.LegacyForContext(ctx)
payload := LabelWebhookPayload{
Name: label.Name,
@ -288,10 +254,7 @@ func DeliverLegacyLabelCreate(ctx context.Context,
}
func DeliverLegacyLabelDelete(ctx context.Context, trackerID, labelID int) {
q, ok := ctx.Value(legacyWebhooksCtxKey).(*webhooks.LegacyQueue)
if !ok {
panic("No legacy webhooks worker for this context")
}
q := webhooks.LegacyForContext(ctx)
// It occurs to me that this webhook is completely useless given that the
// legacy API doesn't expose label IDs to the user
@ -315,10 +278,7 @@ func DeliverLegacyLabelDelete(ctx context.Context, trackerID, labelID int) {
func DeliverLegacyTicketCreate(ctx context.Context,
tracker *model.Tracker, ticket *model.Ticket) {
q, ok := ctx.Value(legacyWebhooksCtxKey).(*webhooks.LegacyQueue)
if !ok {
panic("No legacy webhooks worker for this context")
}
q := webhooks.LegacyForContext(ctx)
part, err := loaders.ForContext(ctx).EntitiesByParticipantID.Load(ticket.SubmitterID)
if err != nil || part == nil {
@ -444,10 +404,7 @@ func mkResolution(res *int) *string {
func DeliverLegacyEventCreate(ctx context.Context,
tracker *model.Tracker, ticket *model.Ticket, event *model.Event) {
q, ok := ctx.Value(legacyWebhooksCtxKey).(*webhooks.LegacyQueue)
if !ok {
panic("No legacy webhooks worker for this context")
}
q := webhooks.LegacyForContext(ctx)
part, err := loaders.ForContext(ctx).EntitiesByParticipantID.Load(event.ParticipantID)
if err != nil || part == nil {

49
api/webhooks/webhooks.go Normal file
View File

@ -0,0 +1,49 @@
package webhooks
import (
"context"
"time"
"git.sr.ht/~sircmpwn/core-go/auth"
"git.sr.ht/~sircmpwn/core-go/webhooks"
sq "github.com/Masterminds/squirrel"
"github.com/google/uuid"
"git.sr.ht/~sircmpwn/todo.sr.ht/api/graph/model"
)
func deliverUserWebhook(ctx context.Context, event model.WebhookEvent,
payload model.WebhookPayload, payloadUUID uuid.UUID) {
q := webhooks.ForContext(ctx)
userID := auth.ForContext(ctx).UserID
query := sq.
Select().
From("gql_user_wh_sub sub").
Where("sub.user_id = ?", userID)
q.Schedule(ctx, query, "user", event.String(),
payloadUUID, payload)
}
func DeliverTrackerEvent(ctx context.Context,
event model.WebhookEvent, tracker *model.Tracker) {
payloadUUID := uuid.New()
payload := model.TrackerEvent{
UUID: payloadUUID.String(),
Event: event,
Date: time.Now().UTC(),
Tracker: tracker,
}
deliverUserWebhook(ctx, event, &payload, payloadUUID)
}
func DeliverTicketEvent(ctx context.Context,
event model.WebhookEvent, ticket *model.Ticket) {
payloadUUID := uuid.New()
payload := model.TicketEvent{
UUID: payloadUUID.String(),
Event: event,
Date: time.Now().UTC(),
Ticket: ticket,
}
deliverUserWebhook(ctx, event, &payload, payloadUUID)
}

View File

@ -0,0 +1,74 @@
"""Add GraphQL user webhook tables
Revision ID: dbed5c6ea613
Revises: 2e5358e46632
Create Date: 2022-03-31 11:35:52.419873
"""
# revision identifiers, used by Alembic.
revision = 'dbed5c6ea613'
down_revision = '2e5358e46632'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.execute("""
CREATE TYPE webhook_event AS ENUM (
'TRACKER_CREATED',
'TRACKER_UPDATE',
'TRACKER_DELETED',
'TICKET_CREATED'
);
CREATE TYPE auth_method AS ENUM (
'OAUTH_LEGACY',
'OAUTH2',
'COOKIE',
'INTERNAL',
'WEBHOOK'
);
CREATE TABLE gql_user_wh_sub (
id serial PRIMARY KEY,
created timestamp NOT NULL,
events webhook_event[] NOT NULL check (array_length(events, 1) > 0),
url varchar NOT NULL,
query varchar NOT NULL,
auth_method auth_method NOT NULL check (auth_method in ('OAUTH2', 'INTERNAL')),
token_hash varchar(128) check ((auth_method = 'OAUTH2') = (token_hash IS NOT NULL)),
grants varchar,
client_id uuid,
expires timestamp check ((auth_method = 'OAUTH2') = (expires IS NOT NULL)),
node_id varchar check ((auth_method = 'INTERNAL') = (node_id IS NOT NULL)),
user_id integer NOT NULL references "user"(id)
);
CREATE INDEX gql_user_wh_sub_token_hash_idx ON gql_user_wh_sub (token_hash);
CREATE TABLE gql_user_wh_delivery (
id serial PRIMARY KEY,
uuid uuid NOT NULL,
date timestamp NOT NULL,
event webhook_event NOT NULL,
subscription_id integer NOT NULL references gql_user_wh_sub(id) ON DELETE CASCADE,
request_body varchar NOT NULL,
response_body varchar,
response_headers varchar,
response_status integer
);
""")
def downgrade():
op.execute("""
DROP TABLE gql_user_wh_delivery;
DROP INDEX gql_user_wh_sub_token_hash_idx;
DROP TABLE gql_user_wh_sub;
DROP TYPE auth_method;
DROP TYPE webhook_event;
""")