Expand context for work queues

This commit is contained in:
Drew DeVault 2020-10-09 10:53:27 -04:00
parent 5b2fce2447
commit 14f4b92139
6 changed files with 29 additions and 10 deletions

View File

@ -35,6 +35,14 @@ func ForContext(ctx context.Context) (*sql.Conn, error) {
return raw.Conn(ctx)
}
func DBForContext(ctx context.Context) *sql.DB {
raw, ok := ctx.Value(dbCtxKey).(*sql.DB)
if !ok {
panic(errors.New("Invalid database context"))
}
return raw
}
func WithTx(ctx context.Context, opts *sql.TxOptions, fn func(tx *sql.Tx) error) error {
conn, err := ForContext(ctx)
if err != nil {

View File

@ -9,8 +9,6 @@ import (
"git.sr.ht/~sircmpwn/dowork"
gomail "gopkg.in/mail.v2"
"git.sr.ht/~sircmpwn/core-go/config"
)
var emailCtxKey = &contextKey{"email"}
@ -22,10 +20,9 @@ type contextKey struct {
// Returns a task which will send this email for the work queue. If the caller
// does not need to customize the task parameters, the Enqueue function may be
// more desirable.
func NewTask(ctx context.Context, m *gomail.Message) *work.Task {
conf := config.ForContext(ctx)
func NewTask(m *gomail.Message) *work.Task {
return work.NewTask(func(ctx context.Context) error {
return Send(config.Context(ctx, conf), m)
return Send(ctx, m)
}).Retries(10).After(func(ctx context.Context, task *work.Task) {
if task.Result() == nil {
log.Printf("MAIL TO %s: '%s' sent after %d attempts",
@ -43,7 +40,7 @@ func NewTask(ctx context.Context, m *gomail.Message) *work.Task {
// Enqueues an email for sending with the default parameters.
func Enqueue(ctx context.Context, m *gomail.Message) {
ForContext(ctx).Enqueue(NewTask(ctx, m))
ForContext(ctx).Enqueue(NewTask(m))
}
// Creates a new email processing queue.

1
go.mod
View File

@ -12,6 +12,7 @@ require (
github.com/fernet/fernet-go v0.0.0-20191111064656-eff2850e6001
github.com/go-chi/chi v4.1.2+incompatible
github.com/go-redis/redis/v8 v8.2.3
github.com/kavu/go_reuseport v1.5.0
github.com/lib/pq v1.8.0
github.com/martinlindhe/base36 v1.1.0
github.com/prometheus/client_golang v1.7.1

2
go.sum
View File

@ -174,6 +174,8 @@ github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kavu/go_reuseport v1.5.0 h1:UNuiY2OblcqAtVDE8Gsg1kZz8zbBWg907sP1ceBV+bk=
github.com/kavu/go_reuseport v1.5.0/go.mod h1:CG8Ee7ceMFSMnx/xr25Vm0qXaj2Z4i5PWoUx+JZ5/CU=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=

View File

@ -17,14 +17,16 @@ type contextKey struct {
func Middleware(client *goRedis.Client) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), redisCtxKey, client)
r = r.WithContext(ctx)
r = r.WithContext(Context(r.Context(), client))
next.ServeHTTP(w, r)
})
}
}
func Context(ctx context.Context, client *goRedis.Client) context.Context {
return context.WithValue(ctx, redisCtxKey, client)
}
func ForContext(ctx context.Context) *goRedis.Client {
raw, ok := ctx.Value(redisCtxKey).(*goRedis.Client)
if !ok {

View File

@ -47,6 +47,8 @@ var (
type Server struct {
conf ini.File
db *sql.DB
redis *goRedis.Client
router chi.Router
schema graphql.ExecutableSchema
service string
@ -133,6 +135,7 @@ func (server *Server) WithDefaultMiddleware() *Server {
if err != nil {
log.Fatalf("Failed to open a database connection: %v", err)
}
server.db = db
rcs, ok := server.conf.Get("sr.ht", "redis-host")
if !ok {
@ -143,6 +146,7 @@ func (server *Server) WithDefaultMiddleware() *Server {
log.Fatalf("Invalid sr.ht::redis-host in config.ini: %e", err)
}
rc := goRedis.NewClient(ropts)
server.redis = rc
apiconf := fmt.Sprintf("%s::api", server.service)
@ -185,9 +189,14 @@ func (server *Server) WithMiddleware(
// Add dowork task queues for this server to manage
func (server *Server) WithQueues(queues ...*work.Queue) *Server {
ctx := context.Background()
ctx = config.Context(ctx, server.conf)
ctx = database.Context(ctx, server.db)
ctx = redis.Context(ctx, server.redis)
server.queues = append(server.queues, queues...)
for _, queue := range queues {
queue.Start(context.Background())
queue.Start(ctx)
}
return server
}