Merge pull request #7011 from concourse/issue/6670
Fix Postgres deadlock when frequently setting pipelines
This commit is contained in:
commit
ab4756b4cc
|
@ -1201,7 +1201,7 @@ func (b *build) SaveOutput(
|
|||
return err
|
||||
}
|
||||
|
||||
resourceConfig, err := resourceConfigDescriptor.findOrCreate(tx, b.lockFactory, b.conn)
|
||||
resourceConfig, err := resourceConfigDescriptor.findOrCreate(tx, b.lockFactory, b.conn, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ func (cache *ResourceCacheDescriptor) findOrCreate(
|
|||
lockFactory lock.LockFactory,
|
||||
conn Conn,
|
||||
) (UsedResourceCache, error) {
|
||||
resourceConfig, err := cache.ResourceConfigDescriptor.findOrCreate(tx, lockFactory, conn)
|
||||
resourceConfig, err := cache.ResourceConfigDescriptor.findOrCreate(tx, lockFactory, conn, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -112,17 +112,7 @@ func (r *resourceConfig) FindOrCreateScope(resource Resource) (ResourceConfigSco
|
|||
return scope, nil
|
||||
}
|
||||
|
||||
func (r *resourceConfig) updateLastReferenced(tx Tx) error {
|
||||
return psql.Update("resource_configs").
|
||||
Set("last_referenced", sq.Expr("now()")).
|
||||
Where(sq.Eq{"id": r.id}).
|
||||
Suffix("RETURNING last_referenced").
|
||||
RunWith(tx).
|
||||
QueryRow().
|
||||
Scan(&r.lastReferenced)
|
||||
}
|
||||
|
||||
func (r *ResourceConfigDescriptor) findOrCreate(tx Tx, lockFactory lock.LockFactory, conn Conn) (*resourceConfig, error) {
|
||||
func (r *ResourceConfigDescriptor) findOrCreate(tx Tx, lockFactory lock.LockFactory, conn Conn, updateLastReferenced bool) (*resourceConfig, error) {
|
||||
rc := &resourceConfig{
|
||||
lockFactory: lockFactory,
|
||||
conn: conn,
|
||||
|
@ -160,7 +150,13 @@ func (r *ResourceConfigDescriptor) findOrCreate(tx Tx, lockFactory lock.LockFact
|
|||
parentID = rc.CreatedByBaseResourceType().ID
|
||||
}
|
||||
|
||||
found, err := r.findWithParentID(tx, rc, parentColumnName, parentID)
|
||||
var found bool
|
||||
var err error
|
||||
if updateLastReferenced {
|
||||
found, err = r.updateLastReferenced(tx, rc, parentColumnName, parentID)
|
||||
} else {
|
||||
found, err = r.findWithParentID(tx, rc, parentColumnName, parentID)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -168,19 +164,24 @@ func (r *ResourceConfigDescriptor) findOrCreate(tx Tx, lockFactory lock.LockFact
|
|||
if !found {
|
||||
hash := mapHash(r.Source)
|
||||
|
||||
valueMap := map[string]interface{}{
|
||||
parentColumnName: parentID,
|
||||
"source_hash": hash,
|
||||
}
|
||||
if updateLastReferenced {
|
||||
valueMap["last_referenced"] = sq.Expr("now()")
|
||||
}
|
||||
var updateLastReferencedStr string
|
||||
if updateLastReferenced {
|
||||
updateLastReferencedStr = `, last_referenced = now()`
|
||||
}
|
||||
err := psql.Insert("resource_configs").
|
||||
Columns(
|
||||
parentColumnName,
|
||||
"source_hash",
|
||||
).
|
||||
Values(
|
||||
parentID,
|
||||
hash,
|
||||
).
|
||||
SetMap(valueMap).
|
||||
Suffix(`
|
||||
ON CONFLICT (`+parentColumnName+`, source_hash) DO UPDATE SET
|
||||
`+parentColumnName+` = ?,
|
||||
source_hash = ?
|
||||
source_hash = ?`+
|
||||
updateLastReferencedStr+`
|
||||
RETURNING id, last_referenced
|
||||
`, parentID, hash).
|
||||
RunWith(tx).
|
||||
|
@ -201,7 +202,29 @@ func (r *ResourceConfigDescriptor) findWithParentID(tx Tx, rc *resourceConfig, p
|
|||
parentColumnName: parentID,
|
||||
"source_hash": mapHash(r.Source),
|
||||
}).
|
||||
Suffix("FOR UPDATE").
|
||||
Suffix("FOR SHARE").
|
||||
RunWith(tx).
|
||||
QueryRow().
|
||||
Scan(&rc.id, &rc.lastReferenced)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *ResourceConfigDescriptor) updateLastReferenced(tx Tx, rc *resourceConfig, parentColumnName string, parentID int) (bool, error) {
|
||||
err := psql.Update("resource_configs").
|
||||
Set("last_referenced", sq.Expr("now()")).
|
||||
Where(sq.Eq{
|
||||
parentColumnName: parentID,
|
||||
"source_hash": mapHash(r.Source),
|
||||
}).
|
||||
Suffix("RETURNING id, last_referenced").
|
||||
RunWith(tx).
|
||||
QueryRow().
|
||||
Scan(&rc.id, &rc.lastReferenced)
|
||||
|
@ -269,12 +292,43 @@ func findOrCreateResourceConfigScope(
|
|||
return nil, err
|
||||
}
|
||||
} else if uniqueResource != nil {
|
||||
// This `SELECT ... FOR UPDATE` on the resource is just to avoid a
|
||||
// deadlock, which occurs when concurrently setting a pipeline and
|
||||
// running FindOrCreateScope on the resource that's being updated in
|
||||
// the pipeline. Specifically, it happens with the following "DELETE
|
||||
// FROM resource_config_scopes" query - this deletes the old resource
|
||||
// config scope, which in turn triggers an "ON DELETE SET NULL" in the
|
||||
// resource. However, there's some implicit lock that's acquired when
|
||||
// setting the pipeline on the resource_config_scope, and without this
|
||||
// dummy query, the locks are acquired in a bad order wrt one another:
|
||||
//
|
||||
// DELETE FROM resource_config_scopes:
|
||||
// 1. Lock resource_config_scopes
|
||||
// 2. Lock resource
|
||||
//
|
||||
// INSERT INTO resources (occurs when setting the pipeline):
|
||||
// 1. Lock resource
|
||||
// 2. Lock resource_config_scope
|
||||
//
|
||||
// Thus, forcing the DELETE FROM resource_config_scopes query to
|
||||
// acquire a lock on the affected resource fixes this order (first
|
||||
// resource, then resource_config_scope) to avoid a cycle.
|
||||
_, err := psql.Select("1").
|
||||
From("resources").
|
||||
Where(sq.Eq{
|
||||
"id": uniqueResource.ID(),
|
||||
}).
|
||||
Suffix("FOR UPDATE").
|
||||
RunWith(tx).
|
||||
Exec()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// delete outdated scopes for resource
|
||||
_, err := psql.Delete("resource_config_scopes").
|
||||
Where(sq.And{
|
||||
sq.Eq{
|
||||
"resource_id": resource.ID(),
|
||||
},
|
||||
_, err = psql.Delete("resource_config_scopes").
|
||||
Where(sq.Eq{
|
||||
"resource_id": resource.ID(),
|
||||
}).
|
||||
RunWith(tx).
|
||||
Exec()
|
||||
|
|
|
@ -85,12 +85,7 @@ func (f *resourceConfigFactory) FindOrCreateResourceConfig(
|
|||
}
|
||||
defer Rollback(tx)
|
||||
|
||||
resourceConfig, err := resourceConfigDescriptor.findOrCreate(tx, f.lockFactory, f.conn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = resourceConfig.updateLastReferenced(tx)
|
||||
resourceConfig, err := resourceConfigDescriptor.findOrCreate(tx, f.lockFactory, f.conn, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package db_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.cloudfoundry.org/lager"
|
||||
|
@ -3446,6 +3448,118 @@ var _ = Describe("Team", func() {
|
|||
Expect(err).To(HaveOccurred())
|
||||
})
|
||||
})
|
||||
|
||||
It("does not deadlock when concurrently setting pipelines and running checks/gets", func() {
|
||||
// enable concurrent use of database. this is set to 1 by default to
|
||||
// ensure methods don't require more than one in a single connection,
|
||||
// which can cause deadlocking as the pool is limited.
|
||||
dbConn.SetMaxOpenConns(4)
|
||||
|
||||
config := atc.Config{
|
||||
ResourceTypes: atc.ResourceTypes{
|
||||
{
|
||||
Name: "some-resource-type",
|
||||
Type: dbtest.BaseResourceType,
|
||||
Source: atc.Source{"foo": "bar"},
|
||||
},
|
||||
},
|
||||
Resources: atc.ResourceConfigs{
|
||||
{
|
||||
Name: "some-resource",
|
||||
Type: "some-resource-type",
|
||||
Source: atc.Source{"foo": "baz"},
|
||||
},
|
||||
},
|
||||
}
|
||||
scenario := dbtest.Setup(
|
||||
builder.WithBaseWorker(),
|
||||
builder.WithPipeline(config),
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
loopUntilTimeoutOrPanic := func(name string, run func(i int)) func() {
|
||||
wg.Add(1)
|
||||
return func() {
|
||||
defer wg.Done()
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
errChan <- fmt.Errorf("%s error: %v", name, err)
|
||||
}
|
||||
}()
|
||||
|
||||
i := 0
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
run(i)
|
||||
i++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pipeline := scenario.Pipeline
|
||||
go loopUntilTimeoutOrPanic("set pipeline", func(_ int) {
|
||||
var err error
|
||||
pipeline, _, err = scenario.Team.SavePipeline(
|
||||
atc.PipelineRef{Name: pipeline.Name()},
|
||||
config,
|
||||
pipeline.ConfigVersion(),
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
})()
|
||||
|
||||
resource := scenario.Resource("some-resource")
|
||||
go loopUntilTimeoutOrPanic("check resource", func(i int) {
|
||||
scenario.Run(
|
||||
builder.WithResourceVersions(resource.Name(), atc.Version{"v": strconv.Itoa(i / 10)}),
|
||||
)
|
||||
})()
|
||||
|
||||
build, err := defaultJob.CreateBuild("some-user")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
rt, err := scenario.Pipeline.ResourceTypes()
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
go loopUntilTimeoutOrPanic("get resource", func(i int) {
|
||||
_, err := resourceCacheFactory.FindOrCreateResourceCache(
|
||||
db.ForBuild(build.ID()),
|
||||
resource.Type(),
|
||||
atc.Version{"v": strconv.Itoa(i / 10)},
|
||||
resource.Source(),
|
||||
atc.Params{},
|
||||
rt.Deserialize(),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
})()
|
||||
|
||||
go loopUntilTimeoutOrPanic("check resource type", func(i int) {
|
||||
scenario.Run(
|
||||
builder.WithResourceTypeVersions("some-resource-type", atc.Version{"foo": strconv.Itoa(i / 10)}),
|
||||
)
|
||||
})()
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(errChan)
|
||||
}()
|
||||
|
||||
Expect(<-errChan).ToNot(HaveOccurred())
|
||||
})
|
||||
})
|
||||
|
||||
Describe("RenamePipeline", func() {
|
||||
|
|
Loading…
Reference in New Issue