Merge pull request #5789 from concourse/issue/5734-track-set-pipelines

atc/exec & atc/db: track set pipelines
This commit is contained in:
Taylor Silva 2020-06-30 15:33:46 -04:00 committed by GitHub
commit a83786990a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 954 additions and 194 deletions

View File

@ -903,6 +903,14 @@ func (cmd *RunCommand) backendComponents(
resourceFetcher,
)
dbBuildFactory := db.NewBuildFactory(dbConn, lockFactory, cmd.GC.OneOffBuildGracePeriod, cmd.GC.FailedGracePeriod)
dbCheckFactory := db.NewCheckFactory(dbConn, lockFactory, secretManager, cmd.varSourcePool, cmd.GlobalResourceCheckTimeout)
dbPipelineFactory := db.NewPipelineFactory(dbConn, lockFactory)
dbJobFactory := db.NewJobFactory(dbConn, lockFactory)
dbCheckableCounter := db.NewCheckableCounter(dbConn)
alg := algorithm.New(db.NewVersionsDB(dbConn, algorithmLimitRows, schedulerCache))
dbWorkerBaseResourceTypeFactory := db.NewWorkerBaseResourceTypeFactory(dbConn)
dbTaskCacheFactory := db.NewTaskCacheFactory(dbConn)
dbWorkerTaskCacheFactory := db.NewWorkerTaskCacheFactory(dbConn)
@ -960,6 +968,7 @@ func (cmd *RunCommand) backendComponents(
workerClient,
resourceFactory,
teamFactory,
dbBuildFactory,
dbResourceCacheFactory,
dbResourceConfigFactory,
secretManager,
@ -968,14 +977,6 @@ func (cmd *RunCommand) backendComponents(
lockFactory,
)
dbBuildFactory := db.NewBuildFactory(dbConn, lockFactory, cmd.GC.OneOffBuildGracePeriod, cmd.GC.FailedGracePeriod)
dbCheckFactory := db.NewCheckFactory(dbConn, lockFactory, secretManager, cmd.varSourcePool, cmd.GlobalResourceCheckTimeout)
dbPipelineFactory := db.NewPipelineFactory(dbConn, lockFactory)
dbJobFactory := db.NewJobFactory(dbConn, lockFactory)
dbCheckableCounter := db.NewCheckableCounter(dbConn)
alg := algorithm.New(db.NewVersionsDB(dbConn, algorithmLimitRows, schedulerCache))
// In case that a user configures resource-checking-interval, but forgets to
// configure resource-with-webhook-checking-interval, keep both checking-
// intervals consistent. Even if both intervals are configured, there is no
@ -1552,6 +1553,7 @@ func (cmd *RunCommand) constructEngine(
workerClient worker.Client,
resourceFactory resource.ResourceFactory,
teamFactory db.TeamFactory,
buildFactory db.BuildFactory,
resourceCacheFactory db.ResourceCacheFactory,
resourceConfigFactory db.ResourceConfigFactory,
secretManager creds.Secrets,
@ -1565,6 +1567,7 @@ func (cmd *RunCommand) constructEngine(
workerClient,
resourceFactory,
teamFactory,
buildFactory,
resourceCacheFactory,
resourceConfigFactory,
defaultLimits,

View File

@ -22,6 +22,7 @@ import (
const schema = "exec.v2"
var ErrAdoptRerunBuildHasNoInputs = errors.New("inputs not ready for build to rerun")
var ErrSetByNewerBuild = errors.New("pipeline set by a newer build")
type BuildInput struct {
Name string
@ -161,6 +162,14 @@ type Build interface {
SetDrained(bool) error
SpanContext() propagators.Supplier
SavePipeline(
pipelineName string,
teamId int,
config atc.Config,
from ConfigVersion,
initiallyPaused bool,
) (Pipeline, bool, error)
}
type build struct {
@ -1479,6 +1488,54 @@ func (b *build) SpanContext() propagators.Supplier {
return b.spanContext
}
func (b *build) SavePipeline(
pipelineName string,
teamID int,
config atc.Config,
from ConfigVersion,
initiallyPaused bool,
) (Pipeline, bool, error) {
tx, err := b.conn.Begin()
if err != nil {
return nil, false, err
}
defer Rollback(tx)
jobID := newNullInt64(b.jobID)
buildID := newNullInt64(b.id)
pipelineID, isNewPipeline, err := savePipeline(tx, pipelineName, config, from, initiallyPaused, teamID, jobID, buildID)
if err != nil {
return nil, false, err
}
pipeline := newPipeline(b.conn, b.lockFactory)
err = scanPipeline(
pipeline,
pipelinesQuery.
Where(sq.Eq{"p.id": pipelineID}).
RunWith(tx).
QueryRow(),
)
if err != nil {
return nil, false, err
}
err = tx.Commit()
if err != nil {
return nil, false, err
}
return pipeline, isNewPipeline, nil
}
func newNullInt64(i int) sql.NullInt64 {
return sql.NullInt64{
Valid: true,
Int64: int64(i),
}
}
func createBuildEventSeq(tx Tx, buildid int) error {
_, err := tx.Exec(fmt.Sprintf(`
CREATE SEQUENCE %s MINVALUE 0

View File

@ -2575,6 +2575,124 @@ var _ = Describe("Build", func() {
})
})
})
Describe("SavePipeline", func() {
It("saves the parent job and build ids", func() {
By("creating a build")
build, err := defaultJob.CreateBuild()
Expect(err).ToNot(HaveOccurred())
By("saving a pipeline with the build")
pipeline, _, err := build.SavePipeline("other-pipeline", build.TeamID(), atc.Config{
Jobs: atc.JobConfigs{
{
Name: "some-job",
},
},
Resources: atc.ResourceConfigs{
{
Name: "some-resource",
Type: "some-base-resource-type",
Source: atc.Source{
"some": "source",
},
},
},
ResourceTypes: atc.ResourceTypes{
{
Name: "some-type",
Type: "some-base-resource-type",
Source: atc.Source{
"some-type": "source",
},
},
},
}, db.ConfigVersion(0), false)
Expect(err).ToNot(HaveOccurred())
Expect(pipeline.ParentJobID()).To(Equal(build.JobID()))
Expect(pipeline.ParentBuildID()).To(Equal(build.ID()))
})
It("only saves the pipeline if it is the latest build", func() {
By("creating two builds")
buildOne, err := defaultJob.CreateBuild()
Expect(err).ToNot(HaveOccurred())
buildTwo, err := defaultJob.CreateBuild()
Expect(err).ToNot(HaveOccurred())
By("saving a pipeline with the second build")
pipeline, _, err := buildTwo.SavePipeline("other-pipeline", buildTwo.TeamID(), atc.Config{
Jobs: atc.JobConfigs{
{
Name: "some-job",
},
},
Resources: atc.ResourceConfigs{
{
Name: "some-resource",
Type: "some-base-resource-type",
Source: atc.Source{
"some": "source",
},
},
},
ResourceTypes: atc.ResourceTypes{
{
Name: "some-type",
Type: "some-base-resource-type",
Source: atc.Source{
"some-type": "source",
},
},
},
}, db.ConfigVersion(0), false)
Expect(err).ToNot(HaveOccurred())
Expect(pipeline.ParentJobID()).To(Equal(buildTwo.JobID()))
Expect(pipeline.ParentBuildID()).To(Equal(buildTwo.ID()))
By("saving a pipeline with the first build")
pipeline, _, err = buildOne.SavePipeline("other-pipeline", buildOne.TeamID(), atc.Config{
Jobs: atc.JobConfigs{
{
Name: "some-job",
},
},
Resources: atc.ResourceConfigs{
{
Name: "some-resource",
Type: "some-base-resource-type",
Source: atc.Source{
"some": "source",
},
},
},
ResourceTypes: atc.ResourceTypes{
{
Name: "some-type",
Type: "some-base-resource-type",
Source: atc.Source{
"some-type": "source",
},
},
},
}, pipeline.ConfigVersion(), false)
Expect(err).To(Equal(db.ErrSetByNewerBuild))
})
Context("a pipeline is previously saved by team.SavePipeline", func() {
It("the parent job and build ID are updated", func() {
By("creating a build")
build, err := defaultJob.CreateBuild()
Expect(err).ToNot(HaveOccurred())
By("re-saving the default pipeline with the build")
pipeline, _, err := build.SavePipeline("default-pipeline", build.TeamID(), defaultPipelineConfig, db.ConfigVersion(1), false)
Expect(err).ToNot(HaveOccurred())
Expect(pipeline.ParentJobID()).To(Equal(build.JobID()))
Expect(pipeline.ParentBuildID()).To(Equal(build.ID()))
})
})
})
})
func envelope(ev atc.Event) event.Envelope {

View File

@ -59,6 +59,7 @@ var (
otherWorkerPayload atc.Worker
defaultResourceType db.ResourceType
defaultResource db.Resource
defaultPipelineConfig atc.Config
defaultPipeline db.Pipeline
defaultJob db.Job
logger *lagertest.TestLogger
@ -155,7 +156,7 @@ var _ = BeforeEach(func() {
otherWorker, err = workerFactory.SaveWorker(otherWorkerPayload, 0)
Expect(err).NotTo(HaveOccurred())
defaultPipeline, _, err = defaultTeam.SavePipeline("default-pipeline", atc.Config{
defaultPipelineConfig := atc.Config{
Jobs: atc.JobConfigs{
{
Name: "some-job",
@ -179,7 +180,9 @@ var _ = BeforeEach(func() {
},
},
},
}, db.ConfigVersion(0), false)
}
defaultPipeline, _, err = defaultTeam.SavePipeline("default-pipeline", defaultPipelineConfig, db.ConfigVersion(0), false)
Expect(err).NotTo(HaveOccurred())
var found bool

View File

@ -479,6 +479,25 @@ type FakeBuild struct {
saveOutputReturnsOnCall map[int]struct {
result1 error
}
SavePipelineStub func(string, int, atc.Config, db.ConfigVersion, bool) (db.Pipeline, bool, error)
savePipelineMutex sync.RWMutex
savePipelineArgsForCall []struct {
arg1 string
arg2 int
arg3 atc.Config
arg4 db.ConfigVersion
arg5 bool
}
savePipelineReturns struct {
result1 db.Pipeline
result2 bool
result3 error
}
savePipelineReturnsOnCall map[int]struct {
result1 db.Pipeline
result2 bool
result3 error
}
SchemaStub func() string
schemaMutex sync.RWMutex
schemaArgsForCall []struct {
@ -2841,6 +2860,76 @@ func (fake *FakeBuild) SaveOutputReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeBuild) SavePipeline(arg1 string, arg2 int, arg3 atc.Config, arg4 db.ConfigVersion, arg5 bool) (db.Pipeline, bool, error) {
fake.savePipelineMutex.Lock()
ret, specificReturn := fake.savePipelineReturnsOnCall[len(fake.savePipelineArgsForCall)]
fake.savePipelineArgsForCall = append(fake.savePipelineArgsForCall, struct {
arg1 string
arg2 int
arg3 atc.Config
arg4 db.ConfigVersion
arg5 bool
}{arg1, arg2, arg3, arg4, arg5})
fake.recordInvocation("SavePipeline", []interface{}{arg1, arg2, arg3, arg4, arg5})
fake.savePipelineMutex.Unlock()
if fake.SavePipelineStub != nil {
return fake.SavePipelineStub(arg1, arg2, arg3, arg4, arg5)
}
if specificReturn {
return ret.result1, ret.result2, ret.result3
}
fakeReturns := fake.savePipelineReturns
return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3
}
func (fake *FakeBuild) SavePipelineCallCount() int {
fake.savePipelineMutex.RLock()
defer fake.savePipelineMutex.RUnlock()
return len(fake.savePipelineArgsForCall)
}
func (fake *FakeBuild) SavePipelineCalls(stub func(string, int, atc.Config, db.ConfigVersion, bool) (db.Pipeline, bool, error)) {
fake.savePipelineMutex.Lock()
defer fake.savePipelineMutex.Unlock()
fake.SavePipelineStub = stub
}
func (fake *FakeBuild) SavePipelineArgsForCall(i int) (string, int, atc.Config, db.ConfigVersion, bool) {
fake.savePipelineMutex.RLock()
defer fake.savePipelineMutex.RUnlock()
argsForCall := fake.savePipelineArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5
}
func (fake *FakeBuild) SavePipelineReturns(result1 db.Pipeline, result2 bool, result3 error) {
fake.savePipelineMutex.Lock()
defer fake.savePipelineMutex.Unlock()
fake.SavePipelineStub = nil
fake.savePipelineReturns = struct {
result1 db.Pipeline
result2 bool
result3 error
}{result1, result2, result3}
}
func (fake *FakeBuild) SavePipelineReturnsOnCall(i int, result1 db.Pipeline, result2 bool, result3 error) {
fake.savePipelineMutex.Lock()
defer fake.savePipelineMutex.Unlock()
fake.SavePipelineStub = nil
if fake.savePipelineReturnsOnCall == nil {
fake.savePipelineReturnsOnCall = make(map[int]struct {
result1 db.Pipeline
result2 bool
result3 error
})
}
fake.savePipelineReturnsOnCall[i] = struct {
result1 db.Pipeline
result2 bool
result3 error
}{result1, result2, result3}
}
func (fake *FakeBuild) Schema() string {
fake.schemaMutex.Lock()
ret, specificReturn := fake.schemaReturnsOnCall[len(fake.schemaArgsForCall)]
@ -3421,6 +3510,8 @@ func (fake *FakeBuild) Invocations() map[string][][]interface{} {
defer fake.saveImageResourceVersionMutex.RUnlock()
fake.saveOutputMutex.RLock()
defer fake.saveOutputMutex.RUnlock()
fake.savePipelineMutex.RLock()
defer fake.savePipelineMutex.RUnlock()
fake.schemaMutex.RLock()
defer fake.schemaMutex.RUnlock()
fake.setDrainedMutex.RLock()

View File

@ -295,6 +295,26 @@ type FakePipeline struct {
nameReturnsOnCall map[int]struct {
result1 string
}
ParentBuildIDStub func() int
parentBuildIDMutex sync.RWMutex
parentBuildIDArgsForCall []struct {
}
parentBuildIDReturns struct {
result1 int
}
parentBuildIDReturnsOnCall map[int]struct {
result1 int
}
ParentJobIDStub func() int
parentJobIDMutex sync.RWMutex
parentJobIDArgsForCall []struct {
}
parentJobIDReturns struct {
result1 int
}
parentJobIDReturnsOnCall map[int]struct {
result1 int
}
PauseStub func() error
pauseMutex sync.RWMutex
pauseArgsForCall []struct {
@ -447,6 +467,18 @@ type FakePipeline struct {
result1 db.Resources
result2 error
}
SetParentIDsStub func(int, int) error
setParentIDsMutex sync.RWMutex
setParentIDsArgsForCall []struct {
arg1 int
arg2 int
}
setParentIDsReturns struct {
result1 error
}
setParentIDsReturnsOnCall map[int]struct {
result1 error
}
TeamIDStub func() int
teamIDMutex sync.RWMutex
teamIDArgsForCall []struct {
@ -1873,6 +1905,110 @@ func (fake *FakePipeline) NameReturnsOnCall(i int, result1 string) {
}{result1}
}
func (fake *FakePipeline) ParentBuildID() int {
fake.parentBuildIDMutex.Lock()
ret, specificReturn := fake.parentBuildIDReturnsOnCall[len(fake.parentBuildIDArgsForCall)]
fake.parentBuildIDArgsForCall = append(fake.parentBuildIDArgsForCall, struct {
}{})
fake.recordInvocation("ParentBuildID", []interface{}{})
fake.parentBuildIDMutex.Unlock()
if fake.ParentBuildIDStub != nil {
return fake.ParentBuildIDStub()
}
if specificReturn {
return ret.result1
}
fakeReturns := fake.parentBuildIDReturns
return fakeReturns.result1
}
func (fake *FakePipeline) ParentBuildIDCallCount() int {
fake.parentBuildIDMutex.RLock()
defer fake.parentBuildIDMutex.RUnlock()
return len(fake.parentBuildIDArgsForCall)
}
func (fake *FakePipeline) ParentBuildIDCalls(stub func() int) {
fake.parentBuildIDMutex.Lock()
defer fake.parentBuildIDMutex.Unlock()
fake.ParentBuildIDStub = stub
}
func (fake *FakePipeline) ParentBuildIDReturns(result1 int) {
fake.parentBuildIDMutex.Lock()
defer fake.parentBuildIDMutex.Unlock()
fake.ParentBuildIDStub = nil
fake.parentBuildIDReturns = struct {
result1 int
}{result1}
}
func (fake *FakePipeline) ParentBuildIDReturnsOnCall(i int, result1 int) {
fake.parentBuildIDMutex.Lock()
defer fake.parentBuildIDMutex.Unlock()
fake.ParentBuildIDStub = nil
if fake.parentBuildIDReturnsOnCall == nil {
fake.parentBuildIDReturnsOnCall = make(map[int]struct {
result1 int
})
}
fake.parentBuildIDReturnsOnCall[i] = struct {
result1 int
}{result1}
}
func (fake *FakePipeline) ParentJobID() int {
fake.parentJobIDMutex.Lock()
ret, specificReturn := fake.parentJobIDReturnsOnCall[len(fake.parentJobIDArgsForCall)]
fake.parentJobIDArgsForCall = append(fake.parentJobIDArgsForCall, struct {
}{})
fake.recordInvocation("ParentJobID", []interface{}{})
fake.parentJobIDMutex.Unlock()
if fake.ParentJobIDStub != nil {
return fake.ParentJobIDStub()
}
if specificReturn {
return ret.result1
}
fakeReturns := fake.parentJobIDReturns
return fakeReturns.result1
}
func (fake *FakePipeline) ParentJobIDCallCount() int {
fake.parentJobIDMutex.RLock()
defer fake.parentJobIDMutex.RUnlock()
return len(fake.parentJobIDArgsForCall)
}
func (fake *FakePipeline) ParentJobIDCalls(stub func() int) {
fake.parentJobIDMutex.Lock()
defer fake.parentJobIDMutex.Unlock()
fake.ParentJobIDStub = stub
}
func (fake *FakePipeline) ParentJobIDReturns(result1 int) {
fake.parentJobIDMutex.Lock()
defer fake.parentJobIDMutex.Unlock()
fake.ParentJobIDStub = nil
fake.parentJobIDReturns = struct {
result1 int
}{result1}
}
func (fake *FakePipeline) ParentJobIDReturnsOnCall(i int, result1 int) {
fake.parentJobIDMutex.Lock()
defer fake.parentJobIDMutex.Unlock()
fake.ParentJobIDStub = nil
if fake.parentJobIDReturnsOnCall == nil {
fake.parentJobIDReturnsOnCall = make(map[int]struct {
result1 int
})
}
fake.parentJobIDReturnsOnCall[i] = struct {
result1 int
}{result1}
}
func (fake *FakePipeline) Pause() error {
fake.pauseMutex.Lock()
ret, specificReturn := fake.pauseReturnsOnCall[len(fake.pauseArgsForCall)]
@ -2584,6 +2720,67 @@ func (fake *FakePipeline) ResourcesReturnsOnCall(i int, result1 db.Resources, re
}{result1, result2}
}
func (fake *FakePipeline) SetParentIDs(arg1 int, arg2 int) error {
fake.setParentIDsMutex.Lock()
ret, specificReturn := fake.setParentIDsReturnsOnCall[len(fake.setParentIDsArgsForCall)]
fake.setParentIDsArgsForCall = append(fake.setParentIDsArgsForCall, struct {
arg1 int
arg2 int
}{arg1, arg2})
fake.recordInvocation("SetParentIDs", []interface{}{arg1, arg2})
fake.setParentIDsMutex.Unlock()
if fake.SetParentIDsStub != nil {
return fake.SetParentIDsStub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
fakeReturns := fake.setParentIDsReturns
return fakeReturns.result1
}
func (fake *FakePipeline) SetParentIDsCallCount() int {
fake.setParentIDsMutex.RLock()
defer fake.setParentIDsMutex.RUnlock()
return len(fake.setParentIDsArgsForCall)
}
func (fake *FakePipeline) SetParentIDsCalls(stub func(int, int) error) {
fake.setParentIDsMutex.Lock()
defer fake.setParentIDsMutex.Unlock()
fake.SetParentIDsStub = stub
}
func (fake *FakePipeline) SetParentIDsArgsForCall(i int) (int, int) {
fake.setParentIDsMutex.RLock()
defer fake.setParentIDsMutex.RUnlock()
argsForCall := fake.setParentIDsArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakePipeline) SetParentIDsReturns(result1 error) {
fake.setParentIDsMutex.Lock()
defer fake.setParentIDsMutex.Unlock()
fake.SetParentIDsStub = nil
fake.setParentIDsReturns = struct {
result1 error
}{result1}
}
func (fake *FakePipeline) SetParentIDsReturnsOnCall(i int, result1 error) {
fake.setParentIDsMutex.Lock()
defer fake.setParentIDsMutex.Unlock()
fake.SetParentIDsStub = nil
if fake.setParentIDsReturnsOnCall == nil {
fake.setParentIDsReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.setParentIDsReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakePipeline) TeamID() int {
fake.teamIDMutex.Lock()
ret, specificReturn := fake.teamIDReturnsOnCall[len(fake.teamIDArgsForCall)]
@ -2908,6 +3105,10 @@ func (fake *FakePipeline) Invocations() map[string][][]interface{} {
defer fake.loadDebugVersionsDBMutex.RUnlock()
fake.nameMutex.RLock()
defer fake.nameMutex.RUnlock()
fake.parentBuildIDMutex.RLock()
defer fake.parentBuildIDMutex.RUnlock()
fake.parentJobIDMutex.RLock()
defer fake.parentJobIDMutex.RUnlock()
fake.pauseMutex.RLock()
defer fake.pauseMutex.RUnlock()
fake.pausedMutex.RLock()
@ -2932,6 +3133,8 @@ func (fake *FakePipeline) Invocations() map[string][][]interface{} {
defer fake.resourceVersionMutex.RUnlock()
fake.resourcesMutex.RLock()
defer fake.resourcesMutex.RUnlock()
fake.setParentIDsMutex.RLock()
defer fake.setParentIDsMutex.RUnlock()
fake.teamIDMutex.RLock()
defer fake.teamIDMutex.RUnlock()
fake.teamNameMutex.RLock()

View File

@ -8,6 +8,7 @@ import (
"github.com/Masterminds/squirrel"
"github.com/concourse/concourse/atc/db"
"github.com/concourse/concourse/atc/db/encryption"
)
type FakeTx struct {
@ -21,6 +22,16 @@ type FakeTx struct {
commitReturnsOnCall map[int]struct {
result1 error
}
EncryptionStrategyStub func() encryption.Strategy
encryptionStrategyMutex sync.RWMutex
encryptionStrategyArgsForCall []struct {
}
encryptionStrategyReturns struct {
result1 encryption.Strategy
}
encryptionStrategyReturnsOnCall map[int]struct {
result1 encryption.Strategy
}
ExecStub func(string, ...interface{}) (sql.Result, error)
execMutex sync.RWMutex
execArgsForCall []struct {
@ -208,6 +219,58 @@ func (fake *FakeTx) CommitReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeTx) EncryptionStrategy() encryption.Strategy {
fake.encryptionStrategyMutex.Lock()
ret, specificReturn := fake.encryptionStrategyReturnsOnCall[len(fake.encryptionStrategyArgsForCall)]
fake.encryptionStrategyArgsForCall = append(fake.encryptionStrategyArgsForCall, struct {
}{})
fake.recordInvocation("EncryptionStrategy", []interface{}{})
fake.encryptionStrategyMutex.Unlock()
if fake.EncryptionStrategyStub != nil {
return fake.EncryptionStrategyStub()
}
if specificReturn {
return ret.result1
}
fakeReturns := fake.encryptionStrategyReturns
return fakeReturns.result1
}
func (fake *FakeTx) EncryptionStrategyCallCount() int {
fake.encryptionStrategyMutex.RLock()
defer fake.encryptionStrategyMutex.RUnlock()
return len(fake.encryptionStrategyArgsForCall)
}
func (fake *FakeTx) EncryptionStrategyCalls(stub func() encryption.Strategy) {
fake.encryptionStrategyMutex.Lock()
defer fake.encryptionStrategyMutex.Unlock()
fake.EncryptionStrategyStub = stub
}
func (fake *FakeTx) EncryptionStrategyReturns(result1 encryption.Strategy) {
fake.encryptionStrategyMutex.Lock()
defer fake.encryptionStrategyMutex.Unlock()
fake.EncryptionStrategyStub = nil
fake.encryptionStrategyReturns = struct {
result1 encryption.Strategy
}{result1}
}
func (fake *FakeTx) EncryptionStrategyReturnsOnCall(i int, result1 encryption.Strategy) {
fake.encryptionStrategyMutex.Lock()
defer fake.encryptionStrategyMutex.Unlock()
fake.EncryptionStrategyStub = nil
if fake.encryptionStrategyReturnsOnCall == nil {
fake.encryptionStrategyReturnsOnCall = make(map[int]struct {
result1 encryption.Strategy
})
}
fake.encryptionStrategyReturnsOnCall[i] = struct {
result1 encryption.Strategy
}{result1}
}
func (fake *FakeTx) Exec(arg1 string, arg2 ...interface{}) (sql.Result, error) {
fake.execMutex.Lock()
ret, specificReturn := fake.execReturnsOnCall[len(fake.execArgsForCall)]
@ -833,6 +896,8 @@ func (fake *FakeTx) Invocations() map[string][][]interface{} {
defer fake.invocationsMutex.RUnlock()
fake.commitMutex.RLock()
defer fake.commitMutex.RUnlock()
fake.encryptionStrategyMutex.RLock()
defer fake.encryptionStrategyMutex.RUnlock()
fake.execMutex.RLock()
defer fake.execMutex.RUnlock()
fake.execContextMutex.RLock()

View File

@ -0,0 +1,3 @@
BEGIN;
ALTER TABLE pipelines DROP COLUMN parent_job_id, DROP COLUMN parent_build_id;
COMMIT;

View File

@ -0,0 +1,3 @@
BEGIN;
ALTER TABLE pipelines ADD COLUMN parent_job_id integer, ADD COLUMN parent_build_id integer;
COMMIT;

View File

@ -62,6 +62,7 @@ type Tx interface {
QueryRowContext(context.Context, string, ...interface{}) squirrel.RowScanner
Rollback() error
Stmt(*sql.Stmt) *sql.Stmt
EncryptionStrategy() encryption.Strategy
}
func Open(logger lager.Logger, sqlDriver string, sqlDataSource string, newKey *encryption.Key, oldKey *encryption.Key, connectionName string, lockFactory lock.LockFactory) (Conn, error) {
@ -394,7 +395,7 @@ func (db *db) Begin() (Tx, error) {
return nil, err
}
return &dbTx{tx, GlobalConnectionTracker.Track()}, nil
return &dbTx{tx, GlobalConnectionTracker.Track(), db.EncryptionStrategy()}, nil
}
func (db *db) Exec(query string, args ...interface{}) (sql.Result, error) {
@ -424,7 +425,7 @@ func (db *db) BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error) {
return nil, err
}
return &dbTx{tx, GlobalConnectionTracker.Track()}, nil
return &dbTx{tx, GlobalConnectionTracker.Track(), db.EncryptionStrategy()}, nil
}
func (db *db) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
@ -451,7 +452,8 @@ func (db *db) QueryRowContext(ctx context.Context, query string, args ...interfa
type dbTx struct {
*sql.Tx
session *ConnectionSession
session *ConnectionSession
encryptionStrategy encryption.Strategy
}
// to conform to squirrel.Runner interface
@ -473,6 +475,10 @@ func (tx *dbTx) Rollback() error {
return tx.Tx.Rollback()
}
func (tx *dbTx) EncryptionStrategy() encryption.Strategy {
return tx.encryptionStrategy
}
// Rollback ignores errors, and should be used with defer.
// makes errcheck happy that those errs are captured
func Rollback(tx Tx) {

View File

@ -40,6 +40,8 @@ type Pipeline interface {
Name() string
TeamID() int
TeamName() string
ParentJobID() int
ParentBuildID() int
Groups() atc.GroupConfigs
VarSources() atc.VarSourceConfigs
ConfigVersion() ConfigVersion
@ -92,6 +94,8 @@ type Pipeline interface {
Rename(string) error
Variables(lager.Logger, creds.Secrets, creds.VarSourcePool) (vars.Variables, error)
SetParentIDs(jobID, buildID int) error
}
type pipeline struct {
@ -99,6 +103,8 @@ type pipeline struct {
name string
teamID int
teamName string
parentJobID int
parentBuildID int
groups atc.GroupConfigs
varSources atc.VarSourceConfigs
configVersion ConfigVersion
@ -126,7 +132,9 @@ var pipelinesQuery = psql.Select(`
p.paused,
p.public,
p.archived,
p.last_updated
p.last_updated,
p.parent_job_id,
p.parent_build_id
`).
From("pipelines p").
LeftJoin("teams t ON p.team_id = t.id")
@ -142,6 +150,8 @@ func (p *pipeline) ID() int { return p.id }
func (p *pipeline) Name() string { return p.name }
func (p *pipeline) TeamID() int { return p.teamID }
func (p *pipeline) TeamName() string { return p.teamName }
func (p *pipeline) ParentJobID() int { return p.parentJobID }
func (p *pipeline) ParentBuildID() int { return p.parentBuildID }
func (p *pipeline) Groups() atc.GroupConfigs { return p.groups }
func (p *pipeline) VarSources() atc.VarSourceConfigs { return p.varSources }
@ -1136,6 +1146,43 @@ func (p *pipeline) Variables(logger lager.Logger, globalSecrets creds.Secrets, v
return allVars, nil
}
func (p *pipeline) SetParentIDs(jobID, buildID int) error {
if jobID <= 0 || buildID <= 0 {
return errors.New("job and build id cannot be negative or zero-value")
}
tx, err := p.conn.Begin()
if err != nil {
return err
}
defer Rollback(tx)
result, err := psql.Update("pipelines").
Set("parent_job_id", jobID).
Set("parent_build_id", buildID).
Where(sq.Eq{
"id": p.id,
}).
Where(sq.Or{sq.Lt{"parent_build_id": buildID}, sq.Eq{"parent_build_id": nil}}).
RunWith(tx).
Exec()
if err != nil {
return err
}
rows, err := result.RowsAffected()
if err != nil {
return err
}
if rows == 0 {
return ErrSetByNewerBuild
}
return tx.Commit()
}
func getNewBuildNameForJob(tx Tx, jobName string, pipelineID int) (string, int, error) {
var buildName string
var jobID int

View File

@ -2177,6 +2177,52 @@ var _ = Describe("Pipeline", func() {
})
})
Describe("SetParentIDs", func() {
It("sets the parent_job_id and parent_build_id fields", func() {
jobID := 123
buildID := 456
Expect(pipeline.SetParentIDs(jobID, buildID)).To(Succeed())
found, err := pipeline.Reload()
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
Expect(pipeline.ParentJobID()).To(Equal(jobID))
Expect(pipeline.ParentBuildID()).To(Equal(buildID))
})
It("returns an error if job or build ID are less than or equal to zero", func() {
err := pipeline.SetParentIDs(0, 0)
Expect(err).To(MatchError("job and build id cannot be negative or zero-value"))
err = pipeline.SetParentIDs(-1, -6)
Expect(err).To(MatchError("job and build id cannot be negative or zero-value"))
})
Context("pipeline was saved by a newer build", func() {
It("returns ErrSetByNewerBuild", func() {
By("setting the build ID to a high number")
pipeline.SetParentIDs(1, 60)
By("trying to set the build ID to a lower number")
err := pipeline.SetParentIDs(1, 2)
Expect(err).To(MatchError(db.ErrSetByNewerBuild))
})
})
Context("pipeline was previously saved by team.SavePipeline", func() {
It("successfully updates the parent build and job IDs", func() {
By("using the defaultPipeline saved by defaultTeam at the suite level")
Expect(defaultPipeline.ParentJobID()).To(Equal(0), "should be zero if sql value is null")
Expect(defaultPipeline.ParentBuildID()).To(Equal(0), "should be zero if sql value is null")
err := defaultPipeline.SetParentIDs(1, 6)
Expect(err).ToNot(HaveOccurred())
defaultPipeline.Reload()
Expect(defaultPipeline.ParentJobID()).To(Equal(1), "should be zero if sql value is null")
Expect(defaultPipeline.ParentBuildID()).To(Equal(6), "should be zero if sql value is null")
})
})
})
Context("Config", func() {
It("should return config correctly", func() {
Expect(pipeline.Config()).To(Equal(pipelineConfig))

View File

@ -339,6 +339,167 @@ func (t *team) FindCreatedContainerByHandle(
return nil, false, nil
}
func savePipeline(
tx Tx,
pipelineName string,
config atc.Config,
from ConfigVersion,
initiallyPaused bool,
teamID int,
jobID sql.NullInt64,
buildID sql.NullInt64,
) (int, bool, error) {
var existingConfig bool
err := tx.QueryRow(`SELECT EXISTS (
SELECT 1
FROM pipelines
WHERE name = $1
AND team_id = $2
)`, pipelineName, teamID).Scan(&existingConfig)
if err != nil {
return 0, false, err
}
groupsPayload, err := json.Marshal(config.Groups)
if err != nil {
return 0, false, err
}
varSourcesPayload, err := json.Marshal(config.VarSources)
if err != nil {
return 0, false, err
}
encryptedVarSourcesPayload, nonce, err := tx.EncryptionStrategy().Encrypt(varSourcesPayload)
if err != nil {
return 0, false, err
}
var pipelineID int
if !existingConfig {
err = psql.Insert("pipelines").
SetMap(map[string]interface{}{
"name": pipelineName,
"groups": groupsPayload,
"var_sources": encryptedVarSourcesPayload,
"nonce": nonce,
"version": sq.Expr("nextval('config_version_seq')"),
"ordering": sq.Expr("currval('pipelines_id_seq')"),
"paused": initiallyPaused,
"last_updated": sq.Expr("now()"),
"team_id": teamID,
"parent_job_id": jobID,
"parent_build_id": buildID,
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().Scan(&pipelineID)
if err != nil {
return 0, false, err
}
} else {
q := psql.Update("pipelines").
Set("archived", false).
Set("groups", groupsPayload).
Set("var_sources", encryptedVarSourcesPayload).
Set("nonce", nonce).
Set("version", sq.Expr("nextval('config_version_seq')")).
Set("last_updated", sq.Expr("now()")).
Set("parent_job_id", jobID).
Set("parent_build_id", buildID).
Where(sq.Eq{
"name": pipelineName,
"version": from,
"team_id": teamID,
})
if buildID.Valid {
q = q.Where(sq.Or{sq.Lt{"parent_build_id": buildID}, sq.Eq{"parent_build_id": nil}})
}
err := q.Suffix("RETURNING id").
RunWith(tx).
QueryRow().
Scan(&pipelineID)
if err != nil {
if err == sql.ErrNoRows {
var currentParentBuildID sql.NullInt64
err = tx.QueryRow(`
SELECT parent_build_id
FROM pipelines
WHERE name = $1
AND team_id = $2 `,
pipelineName, teamID).
Scan(&currentParentBuildID)
if err != nil {
return 0, false, err
}
if currentParentBuildID.Valid && int(buildID.Int64) < int(currentParentBuildID.Int64) {
return 0, false, ErrSetByNewerBuild
}
return 0, false, ErrConfigComparisonFailed
}
return 0, false, err
}
err = resetDependentTableStates(tx, pipelineID)
if err != nil {
return 0, false, err
}
}
resourceNameToID, err := saveResources(tx, config.Resources, pipelineID)
if err != nil {
return 0, false, err
}
_, err = psql.Update("resources").
Set("resource_config_id", nil).
Where(sq.Eq{
"pipeline_id": pipelineID,
"active": false,
}).
RunWith(tx).
Exec()
if err != nil {
return 0, false, err
}
err = saveResourceTypes(tx, config.ResourceTypes, pipelineID)
if err != nil {
return 0, false, err
}
err = updateName(tx, config.Jobs, pipelineID)
if err != nil {
return 0, false, err
}
jobNameToID, err := saveJobsAndSerialGroups(tx, config.Jobs, config.Groups, pipelineID)
if err != nil {
return 0, false, err
}
err = removeUnusedWorkerTaskCaches(tx, pipelineID, config.Jobs)
if err != nil {
return 0, false, err
}
err = insertJobPipes(tx, config.Jobs, resourceNameToID, jobNameToID, pipelineID)
if err != nil {
return 0, false, err
}
err = requestScheduleForJobsInPipeline(tx, pipelineID)
if err != nil {
return 0, false, err
}
return pipelineID, !existingConfig, nil
}
func (t *team) SavePipeline(
pipelineName string,
config atc.Config,
@ -352,126 +513,14 @@ func (t *team) SavePipeline(
defer Rollback(tx)
var existingConfig bool
err = tx.QueryRow(`SELECT EXISTS (
SELECT 1
FROM pipelines
WHERE name = $1
AND team_id = $2
)`, pipelineName, t.id).Scan(&existingConfig)
if err != nil {
return nil, false, err
}
groupsPayload, err := json.Marshal(config.Groups)
if err != nil {
return nil, false, err
}
varSourcesPayload, err := json.Marshal(config.VarSources)
if err != nil {
return nil, false, err
}
encryptedVarSourcesPayload, nonce, err := t.conn.EncryptionStrategy().Encrypt(varSourcesPayload)
if err != nil {
return nil, false, err
}
var pipelineID int
if !existingConfig {
err = psql.Insert("pipelines").
SetMap(map[string]interface{}{
"name": pipelineName,
"groups": groupsPayload,
"var_sources": encryptedVarSourcesPayload,
"nonce": nonce,
"version": sq.Expr("nextval('config_version_seq')"),
"ordering": sq.Expr("currval('pipelines_id_seq')"),
"paused": initiallyPaused,
"last_updated": sq.Expr("now()"),
"team_id": t.id,
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().Scan(&pipelineID)
if err != nil {
return nil, false, err
}
} else {
err := psql.Update("pipelines").
Set("archived", false).
Set("groups", groupsPayload).
Set("var_sources", encryptedVarSourcesPayload).
Set("nonce", nonce).
Set("version", sq.Expr("nextval('config_version_seq')")).
Set("last_updated", sq.Expr("now()")).
Where(sq.Eq{
"name": pipelineName,
"version": from,
"team_id": t.id,
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().
Scan(&pipelineID)
if err != nil {
if err == sql.ErrNoRows {
return nil, false, ErrConfigComparisonFailed
}
return nil, false, err
}
err = t.resetDependentTableStates(tx, pipelineID)
if err != nil {
return nil, false, err
}
}
resourceNameToID, err := t.saveResources(tx, config.Resources, pipelineID)
if err != nil {
return nil, false, err
}
_, err = psql.Update("resources").
Set("resource_config_id", nil).
Where(sq.Eq{
"pipeline_id": pipelineID,
"active": false,
}).
RunWith(tx).
Exec()
if err != nil {
return nil, false, err
}
err = t.saveResourceTypes(tx, config.ResourceTypes, pipelineID)
if err != nil {
return nil, false, err
}
err = t.updateName(tx, config.Jobs, pipelineID)
if err != nil {
return nil, false, err
}
jobNameToID, err := t.saveJobsAndSerialGroups(tx, config.Jobs, config.Groups, pipelineID)
if err != nil {
return nil, false, err
}
err = removeUnusedWorkerTaskCaches(tx, pipelineID, config.Jobs)
if err != nil {
return nil, false, err
}
err = t.insertJobPipes(tx, config.Jobs, resourceNameToID, jobNameToID, pipelineID)
nullID := sql.NullInt64{Valid: false}
pipelineID, isNewPipeline, err := savePipeline(tx, pipelineName, config, from, initiallyPaused, t.id, nullID, nullID)
if err != nil {
return nil, false, err
}
pipeline := newPipeline(t.conn, t.lockFactory)
err = scanPipeline(
pipeline,
pipelinesQuery.
@ -483,17 +532,12 @@ func (t *team) SavePipeline(
return nil, false, err
}
err = requestScheduleForJobsInPipeline(tx, pipelineID)
if err != nil {
return nil, false, err
}
err = tx.Commit()
if err != nil {
return nil, false, err
}
return pipeline, !existingConfig, nil
return pipeline, isNewPipeline, nil
}
func (t *team) Pipeline(pipelineName string) (Pipeline, bool, error) {
@ -844,7 +888,7 @@ type UpdateName struct {
NewName string
}
func (t *team) updateName(tx Tx, jobs []atc.JobConfig, pipelineID int) error {
func updateName(tx Tx, jobs []atc.JobConfig, pipelineID int) error {
jobsToUpdate := []UpdateName{}
for _, job := range jobs {
@ -938,13 +982,13 @@ func sortUpdateNames(jobNames []UpdateName) []UpdateName {
return jobNames
}
func (t *team) saveJob(tx Tx, job atc.JobConfig, pipelineID int, groups []string) (int, error) {
func saveJob(tx Tx, job atc.JobConfig, pipelineID int, groups []string) (int, error) {
configPayload, err := json.Marshal(job)
if err != nil {
return 0, err
}
es := t.conn.EncryptionStrategy()
es := tx.EncryptionStrategy()
encryptedPayload, nonce, err := es.Encrypt(configPayload)
if err != nil {
return 0, err
@ -966,7 +1010,7 @@ func (t *team) saveJob(tx Tx, job atc.JobConfig, pipelineID int, groups []string
return jobID, nil
}
func (t *team) registerSerialGroup(tx Tx, serialGroup string, jobID int) error {
func registerSerialGroup(tx Tx, serialGroup string, jobID int) error {
_, err := psql.Insert("jobs_serial_groups").
Columns("serial_group", "job_id").
Values(serialGroup, jobID).
@ -975,13 +1019,13 @@ func (t *team) registerSerialGroup(tx Tx, serialGroup string, jobID int) error {
return err
}
func (t *team) saveResource(tx Tx, resource atc.ResourceConfig, pipelineID int) (int, error) {
func saveResource(tx Tx, resource atc.ResourceConfig, pipelineID int) (int, error) {
configPayload, err := json.Marshal(resource)
if err != nil {
return 0, err
}
es := t.conn.EncryptionStrategy()
es := tx.EncryptionStrategy()
encryptedPayload, nonce, err := es.Encrypt(configPayload)
if err != nil {
return 0, err
@ -1031,13 +1075,13 @@ func (t *team) saveResource(tx Tx, resource atc.ResourceConfig, pipelineID int)
return resourceID, nil
}
func (t *team) saveResourceType(tx Tx, resourceType atc.ResourceType, pipelineID int) error {
func saveResourceType(tx Tx, resourceType atc.ResourceType, pipelineID int) error {
configPayload, err := json.Marshal(resourceType)
if err != nil {
return err
}
es := t.conn.EncryptionStrategy()
es := tx.EncryptionStrategy()
encryptedPayload, nonce, err := es.Encrypt(configPayload)
if err != nil {
return err
@ -1109,18 +1153,22 @@ func (t *team) findContainer(whereClause sq.Sqlizer) (CreatingContainer, Created
func scanPipeline(p *pipeline, scan scannable) error {
var (
groups sql.NullString
varSources sql.NullString
nonce sql.NullString
nonceStr *string
lastUpdated pq.NullTime
groups sql.NullString
varSources sql.NullString
nonce sql.NullString
nonceStr *string
lastUpdated pq.NullTime
parentJobID sql.NullInt64
parentBuildID sql.NullInt64
)
err := scan.Scan(&p.id, &p.name, &groups, &varSources, &nonce, &p.configVersion, &p.teamID, &p.teamName, &p.paused, &p.public, &p.archived, &lastUpdated)
err := scan.Scan(&p.id, &p.name, &groups, &varSources, &nonce, &p.configVersion, &p.teamID, &p.teamName, &p.paused, &p.public, &p.archived, &lastUpdated, &parentJobID, &parentBuildID)
if err != nil {
return err
}
p.lastUpdated = lastUpdated.Time
p.parentJobID = int(parentJobID.Int64)
p.parentBuildID = int(parentBuildID.Int64)
if groups.Valid {
var pipelineGroups atc.GroupConfigs
@ -1225,7 +1273,7 @@ func (t *team) queryTeam(tx Tx, query string, params ...interface{}) error {
return nil
}
func (t *team) resetDependentTableStates(tx Tx, pipelineID int) error {
func resetDependentTableStates(tx Tx, pipelineID int) error {
_, err := psql.Delete("jobs_serial_groups").
Where(sq.Expr(`job_id in (
SELECT j.id
@ -1240,7 +1288,7 @@ func (t *team) resetDependentTableStates(tx Tx, pipelineID int) error {
tableNames := []string{"jobs", "resources", "resource_types"}
for _, table := range tableNames {
err = t.inactivateTableForPipeline(tx, pipelineID, table)
err = inactivateTableForPipeline(tx, pipelineID, table)
if err != nil {
return err
}
@ -1248,7 +1296,7 @@ func (t *team) resetDependentTableStates(tx Tx, pipelineID int) error {
return err
}
func (t *team) inactivateTableForPipeline(tx Tx, pipelineID int, tableName string) error {
func inactivateTableForPipeline(tx Tx, pipelineID int, tableName string) error {
_, err := psql.Update(tableName).
Set("active", false).
Where(sq.Eq{
@ -1259,10 +1307,10 @@ func (t *team) inactivateTableForPipeline(tx Tx, pipelineID int, tableName strin
return err
}
func (t *team) saveResources(tx Tx, resources atc.ResourceConfigs, pipelineID int) (map[string]int, error) {
func saveResources(tx Tx, resources atc.ResourceConfigs, pipelineID int) (map[string]int, error) {
resourceNameToID := make(map[string]int)
for _, resource := range resources {
resourceID, err := t.saveResource(tx, resource, pipelineID)
resourceID, err := saveResource(tx, resource, pipelineID)
if err != nil {
return nil, err
}
@ -1273,9 +1321,9 @@ func (t *team) saveResources(tx Tx, resources atc.ResourceConfigs, pipelineID in
return resourceNameToID, nil
}
func (t *team) saveResourceTypes(tx Tx, resourceTypes atc.ResourceTypes, pipelineID int) error {
func saveResourceTypes(tx Tx, resourceTypes atc.ResourceTypes, pipelineID int) error {
for _, resourceType := range resourceTypes {
err := t.saveResourceType(tx, resourceType, pipelineID)
err := saveResourceType(tx, resourceType, pipelineID)
if err != nil {
return err
}
@ -1284,7 +1332,7 @@ func (t *team) saveResourceTypes(tx Tx, resourceTypes atc.ResourceTypes, pipelin
return nil
}
func (t *team) saveJobsAndSerialGroups(tx Tx, jobs atc.JobConfigs, groups atc.GroupConfigs, pipelineID int) (map[string]int, error) {
func saveJobsAndSerialGroups(tx Tx, jobs atc.JobConfigs, groups atc.GroupConfigs, pipelineID int) (map[string]int, error) {
jobGroups := make(map[string][]string)
for _, group := range groups {
for _, job := range group.Jobs {
@ -1294,7 +1342,7 @@ func (t *team) saveJobsAndSerialGroups(tx Tx, jobs atc.JobConfigs, groups atc.Gr
jobNameToID := make(map[string]int)
for _, job := range jobs {
jobID, err := t.saveJob(tx, job, pipelineID, jobGroups[job.Name])
jobID, err := saveJob(tx, job, pipelineID, jobGroups[job.Name])
if err != nil {
return nil, err
}
@ -1303,14 +1351,14 @@ func (t *team) saveJobsAndSerialGroups(tx Tx, jobs atc.JobConfigs, groups atc.Gr
if len(job.SerialGroups) != 0 {
for _, sg := range job.SerialGroups {
err = t.registerSerialGroup(tx, sg, jobID)
err = registerSerialGroup(tx, sg, jobID)
if err != nil {
return nil, err
}
}
} else {
if job.Serial || job.RawMaxInFlight > 0 {
err = t.registerSerialGroup(tx, job.Name, jobID)
err = registerSerialGroup(tx, job.Name, jobID)
if err != nil {
return nil, err
}
@ -1321,7 +1369,7 @@ func (t *team) saveJobsAndSerialGroups(tx Tx, jobs atc.JobConfigs, groups atc.Gr
return jobNameToID, nil
}
func (t *team) insertJobPipes(tx Tx, jobConfigs atc.JobConfigs, resourceNameToID map[string]int, jobNameToID map[string]int, pipelineID int) error {
func insertJobPipes(tx Tx, jobConfigs atc.JobConfigs, resourceNameToID map[string]int, jobNameToID map[string]int, pipelineID int) error {
_, err := psql.Delete("job_inputs").
Where(sq.Expr(`job_id in (
SELECT j.id

View File

@ -18,6 +18,7 @@ type stepFactory struct {
client worker.Client
resourceFactory resource.ResourceFactory
teamFactory db.TeamFactory
buildFactory db.BuildFactory
resourceCacheFactory db.ResourceCacheFactory
resourceConfigFactory db.ResourceConfigFactory
defaultLimits atc.ContainerLimits
@ -31,6 +32,7 @@ func NewStepFactory(
client worker.Client,
resourceFactory resource.ResourceFactory,
teamFactory db.TeamFactory,
buildFactory db.BuildFactory,
resourceCacheFactory db.ResourceCacheFactory,
resourceConfigFactory db.ResourceConfigFactory,
defaultLimits atc.ContainerLimits,
@ -43,6 +45,7 @@ func NewStepFactory(
client: client,
resourceFactory: resourceFactory,
teamFactory: teamFactory,
buildFactory: buildFactory,
resourceCacheFactory: resourceCacheFactory,
resourceConfigFactory: resourceConfigFactory,
defaultLimits: defaultLimits,
@ -168,6 +171,7 @@ func (factory *stepFactory) SetPipelineStep(
stepMetadata,
delegate,
factory.teamFactory,
factory.buildFactory,
factory.client,
)

View File

@ -27,13 +27,14 @@ import (
// SetPipelineStep sets a pipeline to current team. This step takes pipeline
// configure file and var files from some resource in the pipeline, like git.
type SetPipelineStep struct {
planID atc.PlanID
plan atc.SetPipelinePlan
metadata StepMetadata
delegate BuildStepDelegate
teamFactory db.TeamFactory
client worker.Client
succeeded bool
planID atc.PlanID
plan atc.SetPipelinePlan
metadata StepMetadata
delegate BuildStepDelegate
teamFactory db.TeamFactory
buildFactory db.BuildFactory
client worker.Client
succeeded bool
}
func NewSetPipelineStep(
@ -42,15 +43,17 @@ func NewSetPipelineStep(
metadata StepMetadata,
delegate BuildStepDelegate,
teamFactory db.TeamFactory,
buildFactory db.BuildFactory,
client worker.Client,
) Step {
return &SetPipelineStep{
planID: planID,
plan: plan,
metadata: metadata,
delegate: delegate,
teamFactory: teamFactory,
client: client,
planID: planID,
plan: plan,
metadata: metadata,
delegate: delegate,
teamFactory: teamFactory,
buildFactory: buildFactory,
client: client,
}
}
@ -194,17 +197,36 @@ func (step *SetPipelineStep) run(ctx context.Context, state RunState) error {
logger.Debug("no-diff")
fmt.Fprintf(stdout, "no diff found.\n")
err := pipeline.SetParentIDs(step.metadata.JobID, step.metadata.BuildID)
if err != nil {
return err
}
step.succeeded = true
step.delegate.Finished(logger, true)
return nil
}
fmt.Fprintf(stdout, "setting pipeline: %s\n", step.plan.Name)
pipeline, _, err = team.SavePipeline(step.plan.Name, atcConfig, fromVersion, false)
parentBuild, found, err := step.buildFactory.Build(step.metadata.BuildID)
if err != nil {
return err
}
if !found {
return fmt.Errorf("set_pipeline step not attached to a buildID")
}
pipeline, _, err = parentBuild.SavePipeline(step.plan.Name, team.ID(), atcConfig, fromVersion, false)
if err != nil {
if err == db.ErrSetByNewerBuild {
fmt.Fprintln(stderr, "\x1b[1;33mWARNING: the pipeline was not saved because it was already saved by a newer build\x1b[0m")
step.succeeded = true
step.delegate.Finished(logger, true)
return nil
}
return err
}
fmt.Fprintf(stdout, "done\n")
logger.Info("saved-pipeline", lager.Data{"team": team.Name(), "pipeline": pipeline.Name()})
step.succeeded = true

View File

@ -3,6 +3,7 @@ package exec_test
import (
"code.cloudfoundry.org/lager/lagerctx"
"code.cloudfoundry.org/lager/lagertest"
"github.com/concourse/concourse/atc/db"
"github.com/concourse/concourse/atc/worker/workerfakes"
"context"
@ -79,10 +80,12 @@ jobs:
cancel func()
testLogger *lagertest.TestLogger
fakeDelegate *execfakes.FakeBuildStepDelegate
fakeTeamFactory *dbfakes.FakeTeamFactory
fakeTeam *dbfakes.FakeTeam
fakePipeline *dbfakes.FakePipeline
fakeDelegate *execfakes.FakeBuildStepDelegate
fakeTeamFactory *dbfakes.FakeTeamFactory
fakeBuildFactory *dbfakes.FakeBuildFactory
fakeBuild *dbfakes.FakeBuild
fakeTeam *dbfakes.FakeTeam
fakePipeline *dbfakes.FakePipeline
fakeWorkerClient *workerfakes.FakeClient
@ -96,7 +99,16 @@ jobs:
credVarsTracker vars.CredVarsTracker
stepMetadata exec.StepMetadata
stepMetadata = exec.StepMetadata{
TeamID: 123,
TeamName: "some-team",
JobID: 87,
JobName: "some-job",
BuildID: 42,
BuildName: "some-build",
PipelineID: 4567,
PipelineName: "some-pipeline",
}
stdout, stderr *gbytes.Buffer
@ -127,6 +139,8 @@ jobs:
fakeDelegate.StderrReturns(stderr)
fakeTeamFactory = new(dbfakes.FakeTeamFactory)
fakeBuildFactory = new(dbfakes.FakeBuildFactory)
fakeBuild = new(dbfakes.FakeBuild)
fakeTeam = new(dbfakes.FakeTeam)
fakePipeline = new(dbfakes.FakePipeline)
@ -144,6 +158,7 @@ jobs:
fakePipeline.NameReturns("some-pipeline")
fakeTeamFactory.GetByIDReturns(fakeTeam)
fakeBuildFactory.BuildReturns(fakeBuild, true, nil)
fakeWorkerClient = new(workerfakes.FakeClient)
@ -169,6 +184,7 @@ jobs:
stepMetadata,
fakeDelegate,
fakeTeamFactory,
fakeBuildFactory,
fakeWorkerClient,
)
@ -240,12 +256,12 @@ jobs:
Context("when specified pipeline not found", func() {
BeforeEach(func() {
fakeTeam.PipelineReturns(nil, false, nil)
fakeTeam.SavePipelineReturns(fakePipeline, true, nil)
fakeBuild.SavePipelineReturns(fakePipeline, true, nil)
})
It("should save the pipeline un-paused", func() {
Expect(fakeTeam.SavePipelineCallCount()).To(Equal(1))
name, _, _, paused := fakeTeam.SavePipelineArgsForCall(0)
It("should save the pipeline", func() {
Expect(fakeBuild.SavePipelineCallCount()).To(Equal(1))
name, _, _, _, paused := fakeBuild.SavePipelineArgsForCall(0)
Expect(name).To(Equal("some-pipeline"))
Expect(paused).To(BeFalse())
})
@ -258,17 +274,25 @@ jobs:
Context("when specified pipeline exists already", func() {
BeforeEach(func() {
fakeTeam.PipelineReturns(fakePipeline, true, nil)
fakeTeam.SavePipelineReturns(fakePipeline, false, nil)
fakeBuild.SavePipelineReturns(fakePipeline, false, nil)
})
Context("when no diff", func() {
BeforeEach(func() {
fakePipeline.ConfigReturns(pipelineObject, nil)
fakePipeline.SetParentIDsReturns(nil)
})
It("should log no-diff", func() {
Expect(stdout).To(gbytes.Say("no diff found."))
})
It("should update the job and build id", func() {
Expect(fakePipeline.SetParentIDsCallCount()).To(Equal(1))
jobID, buildID := fakePipeline.SetParentIDsArgsForCall(0)
Expect(jobID).To(Equal(stepMetadata.JobID))
Expect(buildID).To(Equal(stepMetadata.BuildID))
})
})
Context("when there are some diff", func() {
@ -284,18 +308,31 @@ jobs:
Context("when SavePipeline fails", func() {
BeforeEach(func() {
fakeTeam.SavePipelineReturns(nil, false, errors.New("failed to save"))
fakeBuild.SavePipelineReturns(nil, false, errors.New("failed to save"))
})
It("should return error", func() {
Expect(stepErr).To(HaveOccurred())
Expect(stepErr.Error()).To(Equal("failed to save"))
})
Context("due to the pipeline being set by a newer build", func() {
BeforeEach(func() {
fakeBuild.SavePipelineReturns(nil, false, db.ErrSetByNewerBuild)
})
It("logs a warning", func() {
Expect(stderr).To(gbytes.Say("WARNING: the pipeline was not saved because it was already saved by a newer build"))
})
It("does not fail the step", func() {
Expect(stepErr).ToNot(HaveOccurred())
Expect(spStep.Succeeded()).To(BeTrue())
})
})
})
It("should save the pipeline un-paused", func() {
Expect(fakeTeam.SavePipelineCallCount()).To(Equal(1))
name, _, _, paused := fakeTeam.SavePipelineArgsForCall(0)
Expect(fakeBuild.SavePipelineCallCount()).To(Equal(1))
name, _, _, _, paused := fakeBuild.SavePipelineArgsForCall(0)
Expect(name).To(Equal("some-pipeline"))
Expect(paused).To(BeFalse())
})
@ -333,8 +370,8 @@ jobs:
Context("when team is set to the empty string", func() {
BeforeEach(func() {
fakeTeam.PipelineReturns(fakePipeline, true, nil)
fakeTeam.SavePipelineReturns(fakePipeline, false, nil)
fakeBuild.PipelineReturns(fakePipeline, true, nil)
fakeBuild.SavePipelineReturns(fakePipeline, false, nil)
spPlan.Team = ""
})
@ -369,11 +406,13 @@ jobs:
fakeUserCurrentTeam, true, nil,
)
fakeUserCurrentTeam.PipelineReturns(fakePipeline, true, nil)
fakeUserCurrentTeam.SavePipelineReturns(fakePipeline, false, nil)
fakeBuild.PipelineReturns(fakePipeline, true, nil)
fakeBuild.SavePipelineReturns(fakePipeline, false, nil)
})
It("should finish successfully", func() {
_, teamID, _, _, _ := fakeBuild.SavePipelineArgsForCall(0)
Expect(teamID).To(Equal(fakeUserCurrentTeam.ID()))
Expect(fakeDelegate.FinishedCallCount()).To(Equal(1))
_, succeeded := fakeDelegate.FinishedArgsForCall(0)
Expect(succeeded).To(BeTrue())
@ -399,11 +438,13 @@ jobs:
BeforeEach(func() {
fakeUserCurrentTeam.AdminReturns(true)
fakeTeam.PipelineReturns(fakePipeline, true, nil)
fakeTeam.SavePipelineReturns(fakePipeline, false, nil)
fakeBuild.PipelineReturns(fakePipeline, true, nil)
fakeBuild.SavePipelineReturns(fakePipeline, false, nil)
})
It("should finish successfully", func() {
_, teamID, _, _, _ := fakeBuild.SavePipelineArgsForCall(0)
Expect(teamID).To(Equal(fakeTeam.ID()))
Expect(fakeDelegate.FinishedCallCount()).To(Equal(1))
_, succeeded := fakeDelegate.FinishedArgsForCall(0)
Expect(succeeded).To(BeTrue())