api/graph: Implement GraphQL-native user webhooks

Implement GraphQL-native user webhooks for paste CRUD operations.
This commit is contained in:
Adnan Maolood 2022-06-14 21:27:04 -04:00 committed by Drew DeVault
parent c80a1a3e12
commit f9571a8868
8 changed files with 819 additions and 9 deletions

View File

@ -7,6 +7,7 @@ require (
github.com/99designs/gqlgen v0.17.2
github.com/Masterminds/squirrel v1.5.0
github.com/go-chi/chi v4.1.2+incompatible
github.com/google/uuid v1.0.0
github.com/lib/pq v1.10.3
github.com/vektah/dataloaden v0.2.1-0.20190515034641-a19b9a6e7c9e
github.com/vektah/gqlparser/v2 v2.4.1

View File

@ -217,6 +217,7 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=

233
api/graph/model/webhooks.go Normal file
View File

@ -0,0 +1,233 @@
package model
import (
"context"
"database/sql"
"fmt"
"strconv"
"time"
"git.sr.ht/~sircmpwn/core-go/database"
"git.sr.ht/~sircmpwn/core-go/model"
sq "github.com/Masterminds/squirrel"
"github.com/lib/pq"
)
type WebhookDelivery struct {
UUID string `json:"uuid"`
Date time.Time `json:"date"`
Event WebhookEvent `json:"event"`
RequestBody string `json:"requestBody"`
ResponseBody *string `json:"responseBody"`
ResponseHeaders *string `json:"responseHeaders"`
ResponseStatus *int `json:"responseStatus"`
ID int
SubscriptionID int
Name string
alias string
fields *database.ModelFields
}
func (whd *WebhookDelivery) WithName(name string) *WebhookDelivery {
whd.Name = name
return whd
}
func (whd *WebhookDelivery) As(alias string) *WebhookDelivery {
whd.alias = alias
return whd
}
func (whd *WebhookDelivery) Alias() string {
return whd.alias
}
func (whd *WebhookDelivery) Table() string {
return "gql_" + whd.Name + "_wh_delivery"
}
func (whd *WebhookDelivery) Fields() *database.ModelFields {
if whd.fields != nil {
return whd.fields
}
whd.fields = &database.ModelFields{
Fields: []*database.FieldMap{
{"uuid", "uuid", &whd.UUID},
{"date", "date", &whd.Date},
{"event", "event", &whd.Event},
{"request_body", "requestBody", &whd.RequestBody},
{"response_body", "responseBody", &whd.ResponseBody},
{"response_headers", "responseHeaders", &whd.ResponseHeaders},
{"response_status", "responseStatus", &whd.ResponseStatus},
// Always fetch:
{"id", "", &whd.ID},
{"subscription_id", "", &whd.SubscriptionID},
},
}
return whd.fields
}
func (whd *WebhookDelivery) QueryWithCursor(ctx context.Context,
runner sq.BaseRunner, q sq.SelectBuilder,
cur *model.Cursor) ([]*WebhookDelivery, *model.Cursor) {
var (
err error
rows *sql.Rows
)
if cur.Next != "" {
next, _ := strconv.ParseInt(cur.Next, 10, 64)
q = q.Where(database.WithAlias(whd.alias, "id")+"<= ?", next)
}
q = q.
OrderBy(database.WithAlias(whd.alias, "id") + " DESC").
Limit(uint64(cur.Count + 1))
if rows, err = q.RunWith(runner).QueryContext(ctx); err != nil {
panic(err)
}
defer rows.Close()
var deliveries []*WebhookDelivery
for rows.Next() {
var delivery WebhookDelivery
if err := rows.Scan(database.Scan(ctx, &delivery)...); err != nil {
panic(err)
}
delivery.Name = whd.Name
deliveries = append(deliveries, &delivery)
}
if len(deliveries) > cur.Count {
cur = &model.Cursor{
Count: cur.Count,
Next: strconv.Itoa(deliveries[len(deliveries)-1].ID),
Search: cur.Search,
}
deliveries = deliveries[:cur.Count]
} else {
cur = nil
}
return deliveries, cur
}
type UserWebhookSubscription struct {
ID int `json:"id"`
Events []WebhookEvent `json:"events"`
Query string `json:"query"`
URL string `json:"url"`
UserID int
AuthMethod string
ClientID *string
TokenHash *string
Expires *time.Time
Grants *string
NodeID *string
alias string
fields *database.ModelFields
}
func (we *WebhookEvent) Scan(src interface{}) error {
bytes, ok := src.([]uint8)
if !ok {
return fmt.Errorf("Unable to scan from %T into WebhookEvent", src)
}
*we = WebhookEvent(string(bytes))
if !we.IsValid() {
return fmt.Errorf("%s is not a valid WebhookEvent", string(bytes))
}
return nil
}
func (UserWebhookSubscription) IsWebhookSubscription() {}
func (sub *UserWebhookSubscription) As(alias string) *UserWebhookSubscription {
sub.alias = alias
return sub
}
func (sub *UserWebhookSubscription) Alias() string {
return sub.alias
}
func (sub *UserWebhookSubscription) Table() string {
return "gql_user_wh_sub"
}
func (sub *UserWebhookSubscription) Fields() *database.ModelFields {
if sub.fields != nil {
return sub.fields
}
sub.fields = &database.ModelFields{
Fields: []*database.FieldMap{
{"events", "events", pq.Array(&sub.Events)},
{"url", "url", &sub.URL},
// Always fetch:
{"id", "", &sub.ID},
{"query", "", &sub.Query},
{"user_id", "", &sub.UserID},
{"auth_method", "", &sub.AuthMethod},
{"token_hash", "", &sub.TokenHash},
{"client_id", "", &sub.ClientID},
{"grants", "", &sub.Grants},
{"expires", "", &sub.Expires},
{"node_id", "", &sub.NodeID},
},
}
return sub.fields
}
func (sub *UserWebhookSubscription) QueryWithCursor(ctx context.Context,
runner sq.BaseRunner, q sq.SelectBuilder,
cur *model.Cursor) ([]WebhookSubscription, *model.Cursor) {
var (
err error
rows *sql.Rows
)
if cur.Next != "" {
next, _ := strconv.ParseInt(cur.Next, 10, 64)
q = q.Where(database.WithAlias(sub.alias, "id")+"<= ?", next)
}
q = q.
OrderBy(database.WithAlias(sub.alias, "id")).
Limit(uint64(cur.Count + 1))
if rows, err = q.RunWith(runner).QueryContext(ctx); err != nil {
panic(err)
}
defer rows.Close()
var (
subs []WebhookSubscription
lastID int
)
for rows.Next() {
var sub UserWebhookSubscription
if err := rows.Scan(database.Scan(ctx, &sub)...); err != nil {
panic(err)
}
subs = append(subs, &sub)
lastID = sub.ID
}
if len(subs) > cur.Count {
cur = &model.Cursor{
Count: cur.Count,
Next: strconv.Itoa(lastID),
Search: cur.Search,
}
subs = subs[:cur.Count]
} else {
cur = nil
}
return subs, cur
}

View File

@ -18,12 +18,6 @@ access token, and are not available to clients using OAuth 2.0 access tokens.
"""
directive @private on FIELD_DEFINITION
"""
This used to decorate fields which are for internal use, and are not
available to normal API users.
"""
directive @internal on FIELD_DEFINITION
"Used to provide a human-friendly description of an access scope."
directive @scopehelp(details: String!) on ENUM_VALUE
@ -116,6 +110,101 @@ type PasteCursor {
cursor: Cursor
}
type OAuthClient {
uuid: String!
}
enum WebhookEvent {
PASTE_CREATED @access(scope: PASTES, kind: RO)
PASTE_UPDATED @access(scope: PASTES, kind: RO)
PASTE_DELETED @access(scope: PASTES, kind: RO)
}
interface WebhookSubscription {
id: Int!
events: [WebhookEvent!]!
query: String!
url: String!
"""
If this webhook was registered by an authorized OAuth 2.0 client, this
field is non-null.
"""
client: OAuthClient @private
"All deliveries which have been sent to this webhook."
deliveries(cursor: Cursor): WebhookDeliveryCursor!
"Returns a sample payload for this subscription, for testing purposes"
sample(event: WebhookEvent!): String!
}
type UserWebhookSubscription implements WebhookSubscription {
id: Int!
events: [WebhookEvent!]!
query: String!
url: String!
client: OAuthClient @private
deliveries(cursor: Cursor): WebhookDeliveryCursor!
sample(event: WebhookEvent!): String!
}
type WebhookDelivery {
uuid: String!
date: Time!
event: WebhookEvent!
subscription: WebhookSubscription!
requestBody: String!
"""
These details are provided only after a response is received from the
remote server. If a response is sent whose Content-Type is not text/*, or
cannot be decoded as UTF-8, the response body will be null. It will be
truncated after 64 KiB.
"""
responseBody: String
responseHeaders: String
responseStatus: Int
}
interface WebhookPayload {
uuid: String!
event: WebhookEvent!
date: Time!
}
type PasteEvent implements WebhookPayload {
uuid: String!
event: WebhookEvent!
date: Time!
paste: Paste!
}
"""
A cursor for enumerating a list of webhook deliveries
If there are additional results available, the cursor object may be passed
back into the same endpoint to retrieve another page. If the cursor is null,
there are no remaining results to return.
"""
type WebhookDeliveryCursor {
results: [WebhookDelivery!]!
cursor: Cursor
}
"""
A cursor for enumerating a list of webhook subscriptions
If there are additional results available, the cursor object may be passed
back into the same endpoint to retrieve another page. If the cursor is null,
there are no remaining results to return.
"""
type WebhookSubscriptionCursor {
results: [WebhookSubscription!]!
cursor: Cursor
}
type Query {
"Returns API version information."
version: Version!
@ -131,6 +220,31 @@ type Query {
"Returns a paste by its ID."
paste(id: String!): Paste @access(scope: PASTES, kind: RO)
"""
Returns a list of user webhook subscriptions. For clients
authenticated with a personal access token, this returns all webhooks
configured by all GraphQL clients for your account. For clients
authenticated with an OAuth 2.0 access token, this returns only webhooks
registered for your client.
"""
userWebhooks(cursor: Cursor): WebhookSubscriptionCursor!
"Returns details of a user webhook subscription by its ID."
userWebhook(id: Int!): WebhookSubscription
"""
Returns information about the webhook currently being processed. This is
not valid during normal queries over HTTP, and will return an error if used
outside of a webhook context.
"""
webhook: WebhookPayload!
}
input UserWebhookInput {
url: String!
events: [WebhookEvent!]!
query: String!
}
type Mutation {
@ -151,4 +265,27 @@ type Mutation {
"Deletes a paste by its ID."
delete(id: String!): Paste @access(scope: PASTES, kind: RW)
"""
Creates a new user webhook subscription. When an event from the
provided list of events occurs, the 'query' parameter (a GraphQL query)
will be evaluated and the results will be sent to the provided URL as the
body of an HTTP POST request. The list of events must include at least one
event, and no duplicates.
This query is evaluated in the webhook context, such that query { webhook }
may be used to access details of the event which trigged the webhook. The
query may not make any mutations.
"""
createUserWebhook(config: UserWebhookInput!): WebhookSubscription!
"""
Deletes a user webhook. Any events already queued may still be
delivered after this request completes. Clients authenticated with a
personal access token may delete any webhook registered for their account,
but authorized OAuth 2.0 clients may only delete their own webhooks.
Manually deleting a webhook configured by a third-party client may cause
unexpected behavior with the third-party integration.
"""
deleteUserWebhook(id: Int!): WebhookSubscription!
}

View File

@ -20,12 +20,17 @@ import (
"git.sr.ht/~sircmpwn/core-go/config"
"git.sr.ht/~sircmpwn/core-go/database"
coremodel "git.sr.ht/~sircmpwn/core-go/model"
"git.sr.ht/~sircmpwn/core-go/server"
"git.sr.ht/~sircmpwn/core-go/valid"
corewebhooks "git.sr.ht/~sircmpwn/core-go/webhooks"
"git.sr.ht/~sircmpwn/paste.sr.ht/api/graph/api"
"git.sr.ht/~sircmpwn/paste.sr.ht/api/graph/model"
"git.sr.ht/~sircmpwn/paste.sr.ht/api/loaders"
"git.sr.ht/~sircmpwn/paste.sr.ht/api/webhooks"
"github.com/99designs/gqlgen/graphql"
sq "github.com/Masterminds/squirrel"
"github.com/google/uuid"
"github.com/lib/pq"
)
func (r *fileResolver) Hash(ctx context.Context, obj *model.File) (string, error) {
@ -177,6 +182,7 @@ func (r *mutationResolver) Create(ctx context.Context, files []*graphql.Upload,
return nil, nil
}
webhooks.DeliverUserPasteEvent(ctx, model.WebhookEventPasteCreated, &paste)
return &paste, nil
}
@ -201,6 +207,7 @@ func (r *mutationResolver) Update(ctx context.Context, id string, visibility mod
return nil, err
}
webhooks.DeliverUserPasteEvent(ctx, model.WebhookEventPasteUpdated, &paste)
return &paste, nil
}
@ -215,8 +222,13 @@ func (r *mutationResolver) Delete(ctx context.Context, id string) (*model.Paste,
RETURNING
id, sha, created, user_id, visibility;`,
id, auth.ForContext(ctx).UserID)
return row.Scan(&paste.PKID, &paste.ID,
err := row.Scan(&paste.PKID, &paste.ID,
&paste.Created, &paste.UserID, &paste.RawVisibility)
if err != nil {
return err
}
webhooks.DeliverUserPasteEvent(ctx, model.WebhookEventPasteDeleted, &paste)
return nil
}); err != nil {
if err == sql.ErrNoRows {
return nil, nil
@ -227,6 +239,110 @@ func (r *mutationResolver) Delete(ctx context.Context, id string) (*model.Paste,
return &paste, nil
}
func (r *mutationResolver) CreateUserWebhook(ctx context.Context, config model.UserWebhookInput) (model.WebhookSubscription, error) {
schema := server.ForContext(ctx).Schema
if err := corewebhooks.Validate(schema, config.Query); err != nil {
return nil, err
}
user := auth.ForContext(ctx)
ac, err := corewebhooks.NewAuthConfig(ctx)
if err != nil {
return nil, err
}
var sub model.UserWebhookSubscription
if len(config.Events) == 0 {
return nil, fmt.Errorf("Must specify at least one event")
}
events := make([]string, len(config.Events))
for i, ev := range config.Events {
events[i] = ev.String()
// TODO: gqlgen does not support doing anything useful with directives
// on enums at the time of writing, so we have to do a little bit of
// manual fuckery
var access string
switch ev {
case model.WebhookEventPasteCreated, model.WebhookEventPasteUpdated,
model.WebhookEventPasteDeleted:
access = "PASTES"
default:
return nil, fmt.Errorf("Unsupported event %s", ev.String())
}
if !user.Grants.Has(access, auth.RO) {
return nil, fmt.Errorf("Insufficient access granted for webhook event %s", ev.String())
}
}
u, err := url.Parse(config.URL)
if err != nil {
return nil, err
} else if u.Host == "" {
return nil, fmt.Errorf("Cannot use URL without host")
} else if u.Scheme != "http" && u.Scheme != "https" {
return nil, fmt.Errorf("Cannot use non-HTTP or HTTPS URL")
}
if err := database.WithTx(ctx, nil, func(tx *sql.Tx) error {
row := tx.QueryRowContext(ctx, `
INSERT INTO gql_user_wh_sub (
created, events, url, query,
auth_method,
token_hash, grants, client_id, expires,
node_id,
user_id
) VALUES (
NOW() at time zone 'utc',
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10
) RETURNING id, url, query, events, user_id;`,
pq.Array(events), config.URL, config.Query,
ac.AuthMethod,
ac.TokenHash, ac.Grants, ac.ClientID, ac.Expires, // OAUTH2
ac.NodeID, // INTERNAL
user.UserID)
if err := row.Scan(&sub.ID, &sub.URL,
&sub.Query, pq.Array(&sub.Events), &sub.UserID); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return &sub, nil
}
func (r *mutationResolver) DeleteUserWebhook(ctx context.Context, id int) (model.WebhookSubscription, error) {
var sub model.UserWebhookSubscription
filter, err := corewebhooks.FilterWebhooks(ctx)
if err != nil {
return nil, err
}
if err := database.WithTx(ctx, nil, func(tx *sql.Tx) error {
row := sq.Delete(`gql_user_wh_sub`).
PlaceholderFormat(sq.Dollar).
Where(sq.And{sq.Expr(`id = ?`, id), filter}).
Suffix(`RETURNING id, url, query, events, user_id`).
RunWith(tx).
QueryRowContext(ctx)
if err := row.Scan(&sub.ID, &sub.URL,
&sub.Query, pq.Array(&sub.Events), &sub.UserID); err != nil {
return err
}
return nil
}); err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("No user webhook by ID %d found for this user", id)
}
return nil, err
}
return &sub, nil
}
func (r *pasteResolver) Files(ctx context.Context, obj *model.Paste) ([]*model.File, error) {
var files []*model.File
@ -321,6 +437,79 @@ func (r *queryResolver) Paste(ctx context.Context, id string) (*model.Paste, err
return loaders.ForContext(ctx).PastesBySHA.Load(id)
}
func (r *queryResolver) UserWebhooks(ctx context.Context, cursor *coremodel.Cursor) (*model.WebhookSubscriptionCursor, error) {
if cursor == nil {
cursor = coremodel.NewCursor(nil)
}
filter, err := corewebhooks.FilterWebhooks(ctx)
if err != nil {
return nil, err
}
var subs []model.WebhookSubscription
if err := database.WithTx(ctx, &sql.TxOptions{
Isolation: 0,
ReadOnly: true,
}, func(tx *sql.Tx) error {
sub := (&model.UserWebhookSubscription{}).As(`sub`)
query := database.
Select(ctx, sub).
From(`gql_user_wh_sub sub`).
Where(filter)
subs, cursor = sub.QueryWithCursor(ctx, tx, query, cursor)
return nil
}); err != nil {
return nil, err
}
return &model.WebhookSubscriptionCursor{subs, cursor}, nil
}
func (r *queryResolver) UserWebhook(ctx context.Context, id int) (model.WebhookSubscription, error) {
var sub model.UserWebhookSubscription
filter, err := corewebhooks.FilterWebhooks(ctx)
if err != nil {
return nil, err
}
if err := database.WithTx(ctx, &sql.TxOptions{
Isolation: 0,
ReadOnly: true,
}, func(tx *sql.Tx) error {
row := database.
Select(ctx, &sub).
From(`gql_user_wh_sub`).
Where(sq.And{sq.Expr(`id = ?`, id), filter}).
RunWith(tx).
QueryRowContext(ctx)
if err := row.Scan(database.Scan(ctx, &sub)...); err != nil {
return err
}
return nil
}); err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("No user webhook by ID %d found for this user", id)
}
return nil, err
}
return &sub, nil
}
func (r *queryResolver) Webhook(ctx context.Context) (model.WebhookPayload, error) {
raw, err := corewebhooks.Payload(ctx)
if err != nil {
return nil, err
}
payload, ok := raw.(model.WebhookPayload)
if !ok {
panic("Invalid webhook payload context")
}
return payload, nil
}
func (r *userResolver) Pastes(ctx context.Context, obj *model.User, cursor *coremodel.Cursor) (*model.PasteCursor, error) {
if cursor == nil {
cursor = coremodel.NewCursor(nil)
@ -352,6 +541,130 @@ func (r *userResolver) Pastes(ctx context.Context, obj *model.User, cursor *core
return &model.PasteCursor{pastes, cursor}, nil
}
func (r *userWebhookSubscriptionResolver) Client(ctx context.Context, obj *model.UserWebhookSubscription) (*model.OAuthClient, error) {
if obj.ClientID == nil {
return nil, nil
}
return &model.OAuthClient{
UUID: *obj.ClientID,
}, nil
}
func (r *userWebhookSubscriptionResolver) Deliveries(ctx context.Context, obj *model.UserWebhookSubscription, cursor *coremodel.Cursor) (*model.WebhookDeliveryCursor, error) {
if cursor == nil {
cursor = coremodel.NewCursor(nil)
}
var deliveries []*model.WebhookDelivery
if err := database.WithTx(ctx, &sql.TxOptions{
Isolation: 0,
ReadOnly: true,
}, func(tx *sql.Tx) error {
d := (&model.WebhookDelivery{}).
WithName(`user`).
As(`delivery`)
query := database.
Select(ctx, d).
From(`gql_user_wh_delivery delivery`).
Where(`delivery.subscription_id = ?`, obj.ID)
deliveries, cursor = d.QueryWithCursor(ctx, tx, query, cursor)
return nil
}); err != nil {
return nil, err
}
return &model.WebhookDeliveryCursor{deliveries, cursor}, nil
}
func (r *userWebhookSubscriptionResolver) Sample(ctx context.Context, obj *model.UserWebhookSubscription, event model.WebhookEvent) (string, error) {
payloadUUID := uuid.New()
webhook := corewebhooks.WebhookContext{
User: auth.ForContext(ctx),
PayloadUUID: payloadUUID,
Name: "user",
Event: event.String(),
Subscription: &corewebhooks.WebhookSubscription{
ID: obj.ID,
URL: obj.URL,
Query: obj.Query,
AuthMethod: obj.AuthMethod,
TokenHash: obj.TokenHash,
Grants: obj.Grants,
ClientID: obj.ClientID,
Expires: obj.Expires,
NodeID: obj.NodeID,
},
}
auth := auth.ForContext(ctx)
switch event {
case model.WebhookEventPasteCreated, model.WebhookEventPasteUpdated,
model.WebhookEventPasteDeleted:
webhook.Payload = &model.PasteEvent{
UUID: payloadUUID.String(),
Event: event,
Date: time.Now().UTC(),
Paste: &model.Paste{
ID: "943a702d06f34599aee1f8da8ef9f7296031d699",
Created: time.Now().UTC(),
PKID: -1,
UserID: auth.UserID,
RawVisibility: "public",
},
}
default:
return "", fmt.Errorf("Unsupported event %s", event.String())
}
subctx := corewebhooks.Context(ctx, webhook.Payload)
bytes, err := webhook.Exec(subctx, server.ForContext(ctx).Schema)
if err != nil {
return "", err
}
return string(bytes), nil
}
func (r *webhookDeliveryResolver) Subscription(ctx context.Context, obj *model.WebhookDelivery) (model.WebhookSubscription, error) {
if obj.Name == "" {
panic("WebhookDelivery without name")
}
// XXX: This could use a loader but it's unlikely to be a bottleneck
var sub model.WebhookSubscription
if err := database.WithTx(ctx, &sql.TxOptions{
Isolation: 0,
ReadOnly: true,
}, func(tx *sql.Tx) error {
// XXX: This needs some work to generalize to other kinds of webhooks
var subscription interface {
model.WebhookSubscription
database.Model
} = nil
switch obj.Name {
case "user":
subscription = (&model.UserWebhookSubscription{}).As(`sub`)
default:
panic(fmt.Errorf("unknown webhook name %q", obj.Name))
}
// Note: No filter needed because, if we have access to the delivery,
// we also have access to the subscription.
row := database.
Select(ctx, subscription).
From(`gql_`+obj.Name+`_wh_sub sub`).
Where(`sub.id = ?`, obj.SubscriptionID).
RunWith(tx).
QueryRowContext(ctx)
if err := row.Scan(database.Scan(ctx, subscription)...); err != nil {
return err
}
sub = subscription
return nil
}); err != nil {
return nil, err
}
return sub, nil
}
// File returns api.FileResolver implementation.
func (r *Resolver) File() api.FileResolver { return &fileResolver{r} }
@ -367,8 +680,18 @@ func (r *Resolver) Query() api.QueryResolver { return &queryResolver{r} }
// User returns api.UserResolver implementation.
func (r *Resolver) User() api.UserResolver { return &userResolver{r} }
// UserWebhookSubscription returns api.UserWebhookSubscriptionResolver implementation.
func (r *Resolver) UserWebhookSubscription() api.UserWebhookSubscriptionResolver {
return &userWebhookSubscriptionResolver{r}
}
// WebhookDelivery returns api.WebhookDeliveryResolver implementation.
func (r *Resolver) WebhookDelivery() api.WebhookDeliveryResolver { return &webhookDeliveryResolver{r} }
type fileResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver }
type pasteResolver struct{ *Resolver }
type queryResolver struct{ *Resolver }
type userResolver struct{ *Resolver }
type userWebhookSubscriptionResolver struct{ *Resolver }
type webhookDeliveryResolver struct{ *Resolver }

View File

@ -8,6 +8,7 @@ import (
"git.sr.ht/~sircmpwn/core-go/config"
"git.sr.ht/~sircmpwn/core-go/database"
"git.sr.ht/~sircmpwn/core-go/server"
"git.sr.ht/~sircmpwn/core-go/webhooks"
"github.com/99designs/gqlgen/graphql"
"github.com/go-chi/chi"
@ -21,6 +22,7 @@ func main() {
appConfig := config.LoadConfig(":5111")
gqlConfig := api.Config{Resolvers: &graph.Resolver{}}
gqlConfig.Directives.Private = server.Private
gqlConfig.Directives.Access = func(ctx context.Context, obj interface{},
next graphql.Resolver, scope model.AccessScope,
kind model.AccessKind) (interface{}, error) {
@ -33,10 +35,13 @@ func main() {
scopes[i] = s.String()
}
webhookQueue := webhooks.NewQueue(schema)
gsrv := server.NewServer("paste.sr.ht", appConfig).
WithDefaultMiddleware().
WithMiddleware(loaders.Middleware).
WithSchema(schema, scopes)
WithMiddleware(loaders.Middleware, webhooks.Middleware(webhookQueue)).
WithSchema(schema, scopes).
WithQueues(webhookQueue.Queue)
// Bulk transfer endpoints
gsrv.Router().Get("/query/blob/{id}", func(w http.ResponseWriter, r *http.Request) {

37
api/webhooks/webhooks.go Normal file
View File

@ -0,0 +1,37 @@
package webhooks
import (
"context"
"time"
"git.sr.ht/~sircmpwn/core-go/auth"
"git.sr.ht/~sircmpwn/core-go/webhooks"
sq "github.com/Masterminds/squirrel"
"github.com/google/uuid"
"git.sr.ht/~sircmpwn/paste.sr.ht/api/graph/model"
)
func deliverUserWebhook(ctx context.Context, event model.WebhookEvent,
payload model.WebhookPayload, payloadUUID uuid.UUID) {
q := webhooks.ForContext(ctx)
userID := auth.ForContext(ctx).UserID
query := sq.
Select().
From("gql_user_wh_sub sub").
Where("sub.user_id = ?", userID)
q.Schedule(ctx, query, "user", event.String(),
payloadUUID, payload)
}
func DeliverUserPasteEvent(ctx context.Context,
event model.WebhookEvent, paste *model.Paste) {
payloadUUID := uuid.New()
payload := model.PasteEvent{
UUID: payloadUUID.String(),
Event: event,
Date: time.Now().UTC(),
Paste: paste,
}
deliverUserWebhook(ctx, event, &payload, payloadUUID)
}

View File

@ -0,0 +1,73 @@
"""Add GraphQL user webhook tables
Revision ID: fbec2ccf782e
Revises: 8565f92ed478
Create Date: 2022-06-14 07:10:09.752674
"""
# revision identifiers, used by Alembic.
revision = 'fbec2ccf782e'
down_revision = '8565f92ed478'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.execute("""
CREATE TYPE webhook_event AS ENUM (
'PASTE_CREATED',
'PASTE_UPDATED',
'PASTE_DELETED'
);
CREATE TYPE auth_method AS ENUM (
'OAUTH_LEGACY',
'OAUTH2',
'COOKIE',
'INTERNAL',
'WEBHOOK'
);
CREATE TABLE gql_user_wh_sub (
id serial PRIMARY KEY,
created timestamp NOT NULL,
events webhook_event[] NOT NULL check (array_length(events, 1) > 0),
url varchar NOT NULL,
query varchar NOT NULL,
auth_method auth_method NOT NULL check (auth_method in ('OAUTH2', 'INTERNAL')),
token_hash varchar(128) check ((auth_method = 'OAUTH2') = (token_hash IS NOT NULL)),
grants varchar,
client_id uuid,
expires timestamp check ((auth_method = 'OAUTH2') = (expires IS NOT NULL)),
node_id varchar check ((auth_method = 'INTERNAL') = (node_id IS NOT NULL)),
user_id integer NOT NULL references "user"(id)
);
CREATE INDEX gql_user_wh_sub_token_hash_idx ON gql_user_wh_sub (token_hash);
CREATE TABLE gql_user_wh_delivery (
id serial PRIMARY KEY,
uuid uuid NOT NULL,
date timestamp NOT NULL,
event webhook_event NOT NULL,
subscription_id integer NOT NULL references gql_user_wh_sub(id) ON DELETE CASCADE,
request_body varchar NOT NULL,
response_body varchar,
response_headers varchar,
response_status integer
);
""")
def downgrade():
op.execute("""
DROP TABLE gql_user_wh_delivery;
DROP INDEX gql_user_wh_sub_token_hash_idx;
DROP TABLE gql_user_wh_sub;
DROP TYPE auth_method;
DROP TYPE webhook_event;
""")