Rename Parallel to InParallel and nest yaml configuration

Signed-off-by: itsdalmo <kristian@doingit.no>
This commit is contained in:
itsdalmo 2019-04-09 13:25:24 +02:00
parent 7543b49364
commit e0a4ee4b91
17 changed files with 158 additions and 139 deletions

View File

@ -292,6 +292,12 @@ func (c InputsConfig) MarshalJSON() ([]byte, error) {
return json.Marshal("")
}
type InParallelConfig struct {
Steps PlanSequence `yaml:"steps,omitempty" json:"steps" mapstructure:"steps"`
Limit int `yaml:"limit,omitempty" json:"limit,omitempty" mapstructure:"limit"`
FailFast bool `yaml:"fail_fast,omitempty" json:"fail_fast,omitempty" mapstructure:"fail_fast"`
}
// A PlanConfig is a flattened set of configuration corresponding to
// a particular Plan, where Source and Version are populated lazily.
type PlanConfig struct {
@ -309,11 +315,7 @@ type PlanConfig struct {
Aggregate *PlanSequence `yaml:"aggregate,omitempty" json:"aggregate,omitempty" mapstructure:"aggregate"`
// a nested chain of steps to run in parallel
Parallel *PlanSequence `yaml:"in_parallel,omitempty" json:"in_parallel,omitempty" mapstructure:"in_parallel"`
// limit parallelism for a Parallel plan
MaxInParallel int `yaml:"max_in_parallel,omitempty" json:"max_in_parallel,omitempty" mapstructure:"max_in_parallel"`
// cancel a parallel step on first error
FailFast bool `yaml:"fail_fast,omitempty" json:"fail_fast,omitempty" mapstructure:"fail_fast"`
InParallel *InParallelConfig `yaml:"in_parallel,omitempty" json:"in_parallel,omitempty" mapstructure:"in_parallel"`
// corresponds to Get and Put resource plans, respectively
// name of 'input', e.g. bosh-stemcell

View File

@ -32,7 +32,7 @@ func (build *execBuild) buildParallelStep(logger lager.Logger, plan atc.Plan) ex
steps = append(steps, step)
}
return exec.Parallel(steps, plan.InParallel.MaxInParallel, plan.InParallel.FailFast)
return exec.InParallel(steps, plan.InParallel.Limit, plan.InParallel.FailFast)
}
func (build *execBuild) buildDoStep(logger lager.Logger, plan atc.Plan) exec.Step {

View File

@ -464,8 +464,8 @@ var _ = Describe("Exec Engine With Hooks", func() {
}),
}),
},
MaxInParallel: 1,
FailFast: true,
Limit: 1,
FailFast: true,
}),
Next: planFactory.NewPlan(atc.GetPlan{
Name: "some-unused-step",

View File

@ -211,8 +211,8 @@ var _ = Describe("ExecEngine", func() {
Next: otherDependentGetPlan,
}),
},
MaxInParallel: 1,
FailFast: true,
Limit: 1,
FailFast: true,
})
})
@ -296,9 +296,9 @@ var _ = Describe("ExecEngine", func() {
aggregatePlan = planFactory.NewPlan(atc.AggregatePlan{retryPlanTwo})
parallelPlan = planFactory.NewPlan(atc.InParallelPlan{
Steps: []atc.Plan{aggregatePlan},
MaxInParallel: 1,
FailFast: true,
Steps: []atc.Plan{aggregatePlan},
Limit: 1,
FailFast: true,
})
doPlan = planFactory.NewPlan(atc.DoPlan{parallelPlan})

View File

@ -7,22 +7,22 @@ import (
"sync"
)
// ParallelStep is a step of steps to run in parallel.
type ParallelStep struct {
steps []Step
maxInParallel int
failFast bool
// InParallelStep is a step of steps to run in parallel.
type InParallelStep struct {
steps []Step
limit int
failFast bool
}
// Parallel constructs a ParallelStep.
func Parallel(steps []Step, maxInParallel int, failFast bool) ParallelStep {
if maxInParallel < 1 {
maxInParallel = len(steps)
// InParallel constructs a ParallelStep.
func InParallel(steps []Step, limit int, failFast bool) InParallelStep {
if limit < 1 {
limit = len(steps)
}
return ParallelStep{
steps: steps,
maxInParallel: maxInParallel,
failFast: failFast,
return InParallelStep{
steps: steps,
limit: limit,
failFast: failFast,
}
}
@ -36,11 +36,11 @@ func Parallel(steps []Step, maxInParallel int, failFast bool) ParallelStep {
// Cancelling a parallel step means that any outstanding steps will not be scheduled to run.
// After all steps finish, their errors (if any) will be collected and returned as a
// single error.
func (step ParallelStep) Run(ctx context.Context, state RunState) error {
func (step InParallelStep) Run(ctx context.Context, state RunState) error {
var (
wg sync.WaitGroup
errs = make(chan error, len(step.steps))
sem = make(chan bool, step.maxInParallel)
sem = make(chan bool, step.limit)
)
runCtx, cancel := context.WithCancel(ctx)
@ -90,7 +90,7 @@ func (step ParallelStep) Run(ctx context.Context, state RunState) error {
}
// Succeeded is true if all of the steps' Succeeded is true
func (step ParallelStep) Succeeded() bool {
func (step InParallelStep) Succeeded() bool {
succeeded := true
for _, step := range step.steps {

View File

@ -36,7 +36,7 @@ var _ = Describe("Parallel", func() {
fakeStepB = new(execfakes.FakeStep)
fakeSteps = []Step{fakeStepA, fakeStepB}
step = Parallel(fakeSteps, len(fakeSteps), false)
step = InParallel(fakeSteps, len(fakeSteps), false)
repo = artifact.NewRepository()
state = new(execfakes.FakeRunState)
@ -92,7 +92,7 @@ var _ = Describe("Parallel", func() {
Context("when maxInParallel is 1", func() {
BeforeEach(func() {
step = Parallel(fakeSteps, 1, false)
step = InParallel(fakeSteps, 1, false)
ch := make(chan struct{}, 1)
fakeStepA.RunStub = func(context.Context, RunState) error {
@ -151,7 +151,7 @@ var _ = Describe("Parallel", func() {
Context("when there are steps pending execution", func() {
BeforeEach(func() {
step = Parallel(fakeSteps, 1, false)
step = InParallel(fakeSteps, 1, false)
fakeStepA.RunStub = func(context.Context, RunState) error {
cancel()
@ -187,7 +187,7 @@ var _ = Describe("Parallel", func() {
Context("and fail fast is false", func() {
BeforeEach(func() {
step = Parallel(fakeSteps, 1, false)
step = InParallel(fakeSteps, 1, false)
})
It("lets all steps finish before exiting", func() {
Expect(fakeStepA.RunCallCount()).To(Equal(1))
@ -201,7 +201,7 @@ var _ = Describe("Parallel", func() {
Context("and fail fast is true", func() {
BeforeEach(func() {
step = Parallel(fakeSteps, 1, true)
step = InParallel(fakeSteps, 1, true)
})
It("it cancels remaining steps", func() {
Expect(fakeStepA.RunCallCount()).To(Equal(1))
@ -250,7 +250,7 @@ var _ = Describe("Parallel", func() {
Context("when there are no steps", func() {
BeforeEach(func() {
step = ParallelStep{}
step = InParallelStep{}
})
It("returns true", func() {

View File

@ -100,8 +100,8 @@ func collectPlans(plan PlanConfig) []PlanConfig {
}
}
if plan.Parallel != nil {
for _, p := range *plan.Parallel {
if plan.InParallel != nil {
for _, p := range plan.InParallel.Steps {
plans = append(plans, collectPlans(p)...)
}
}

View File

@ -561,14 +561,16 @@ var _ = Describe("JobConfig", func() {
BeforeEach(func() {
jobConfig.Plan = atc.PlanSequence{
{
Parallel: &atc.PlanSequence{
{Get: "a"},
{Put: "y"},
{Get: "b", Resource: "some-resource", Passed: []string{"x"}},
{Get: "c", Trigger: true},
InParallel: &atc.InParallelConfig{
Steps: atc.PlanSequence{
{Get: "a"},
{Put: "y"},
{Get: "b", Resource: "some-resource", Passed: []string{"x"}},
{Get: "c", Trigger: true},
},
Limit: 1,
FailFast: true,
},
MaxInParallel: 1,
FailFast: true,
},
}
})
@ -600,18 +602,22 @@ var _ = Describe("JobConfig", func() {
BeforeEach(func() {
jobConfig.Plan = atc.PlanSequence{
{
Parallel: &atc.PlanSequence{
{
Parallel: &atc.PlanSequence{
{Get: "a"},
InParallel: &atc.InParallelConfig{
Steps: atc.PlanSequence{
{
InParallel: &atc.InParallelConfig{
Steps: atc.PlanSequence{
{Get: "a"},
},
Limit: 1,
},
},
MaxInParallel: 1,
{Get: "b", Resource: "some-resource", Passed: []string{"x"}},
{Get: "c", Trigger: true},
},
{Get: "b", Resource: "some-resource", Passed: []string{"x"}},
{Get: "c", Trigger: true},
Limit: 2,
FailFast: true,
},
MaxInParallel: 2,
FailFast: true,
},
}
})
@ -687,8 +693,10 @@ var _ = Describe("JobConfig", func() {
{
Aggregate: &atc.PlanSequence{
{
Parallel: &atc.PlanSequence{
{Put: "a"},
InParallel: &atc.InParallelConfig{
Steps: atc.PlanSequence{
{Put: "a"},
},
},
},
{Put: "b", Resource: "some-resource"},

View File

@ -75,9 +75,9 @@ type TryPlan struct {
type AggregatePlan []Plan
type InParallelPlan struct {
Steps []Plan `json:"steps"`
MaxInParallel int `json:"max_in_parallel,omitempty"`
FailFast bool `json:"fail_fast,omitempty"`
Steps []Plan `json:"steps"`
Limit int `json:"limit,omitempty"`
FailFast bool `json:"fail_fast,omitempty"`
}
type DoPlan []Plan

View File

@ -116,13 +116,13 @@ func (plan InParallelPlan) Public() *json.RawMessage {
}
return enc(struct {
Steps []*json.RawMessage `json:"steps"`
MaxInParallel int `json:"max_in_parallel,omitempty"`
FailFast bool `json:"fail_fast,omitempty"`
Steps []*json.RawMessage `json:"steps"`
Limit int `json:"limit,omitempty"`
FailFast bool `json:"fail_fast,omitempty"`
}{
Steps: steps,
MaxInParallel: plan.MaxInParallel,
FailFast: plan.FailFast,
Steps: steps,
Limit: plan.Limit,
FailFast: plan.FailFast,
})
}

View File

@ -324,8 +324,8 @@ var _ = Describe("Plan", func() {
atc.Plan{
ID: "36",
InParallel: &atc.InParallelPlan{
MaxInParallel: 1,
FailFast: true,
Limit: 1,
FailFast: true,
Steps: []atc.Plan{
atc.Plan{
ID: "37",
@ -588,7 +588,7 @@ var _ = Describe("Plan", func() {
}
}
],
"max_in_parallel": 1,
"limit": 1,
"fail_fast": true
}
}

View File

@ -278,10 +278,10 @@ func (factory *buildFactory) constructUnhookedPlan(
plan = factory.planFactory.NewPlan(aggregate)
case planConfig.Parallel != nil:
case planConfig.InParallel != nil:
var steps []atc.Plan
for _, planConfig := range *planConfig.Parallel {
for _, planConfig := range planConfig.InParallel.Steps {
step, err := factory.constructPlanFromConfig(
planConfig,
resources,
@ -296,9 +296,9 @@ func (factory *buildFactory) constructUnhookedPlan(
}
plan = factory.planFactory.NewPlan(atc.InParallelPlan{
Steps: steps,
MaxInParallel: planConfig.MaxInParallel,
FailFast: planConfig.FailFast,
Steps: steps,
Limit: planConfig.InParallel.Limit,
FailFast: planConfig.InParallel.FailFast,
})
}

View File

@ -48,16 +48,18 @@ var _ = Describe("Factory Parallel", func() {
actual, err := buildFactory.Create(atc.JobConfig{
Plan: atc.PlanSequence{
{
Parallel: &atc.PlanSequence{
{
Task: "some thing",
},
{
Task: "some other thing",
InParallel: &atc.InParallelConfig{
Steps: atc.PlanSequence{
{
Task: "some thing",
},
{
Task: "some other thing",
},
},
Limit: 1,
FailFast: true,
},
MaxInParallel: 1,
FailFast: true,
},
},
}, resources, resourceTypes, nil)
@ -74,8 +76,8 @@ var _ = Describe("Factory Parallel", func() {
VersionedResourceTypes: resourceTypes,
}),
},
MaxInParallel: 1,
FailFast: true,
Limit: 1,
FailFast: true,
})
Expect(actual).To(Equal(expected))
})

View File

@ -297,16 +297,18 @@ var _ = Describe("Factory Put", func() {
input = atc.JobConfig{
Plan: atc.PlanSequence{
{
Parallel: &atc.PlanSequence{
{
Task: "some thing",
},
{
Put: "some-resource",
InParallel: &atc.InParallelConfig{
Steps: atc.PlanSequence{
{
Task: "some thing",
},
{
Put: "some-resource",
},
},
Limit: 1,
FailFast: true,
},
MaxInParallel: 1,
FailFast: true,
},
},
}
@ -346,8 +348,8 @@ var _ = Describe("Factory Put", func() {
}),
}),
},
MaxInParallel: 1,
FailFast: true,
Limit: 1,
FailFast: true,
})
Expect(actual).To(testhelpers.MatchPlan(expected))
})

View File

@ -334,7 +334,7 @@ func validatePlan(c Config, identifier string, plan PlanConfig) ([]ConfigWarning
foundTypes.Find("aggregate")
}
if plan.Parallel != nil {
if plan.InParallel != nil {
foundTypes.Find("parallel")
}
@ -366,8 +366,8 @@ func validatePlan(c Config, identifier string, plan PlanConfig) ([]ConfigWarning
errorMessages = append(errorMessages, planErrMessages...)
}
case plan.Parallel != nil:
for i, plan := range *plan.Parallel {
case plan.InParallel != nil:
for i, plan := range plan.InParallel.Steps {
subIdentifier := fmt.Sprintf("%s.parallel[%d]", identifier, i)
planWarnings, planErrMessages := validatePlan(c, subIdentifier, plan)
warnings = append(warnings, planWarnings...)

View File

@ -308,13 +308,15 @@ var _ = Describe("ValidateConfig", func() {
},
},
{
Parallel: &PlanSequence{
{
Get: "parallel",
InParallel: &InParallelConfig{
Steps: PlanSequence{
{
Get: "parallel",
},
},
Limit: 1,
FailFast: true,
},
MaxInParallel: 1,
FailFast: true,
},
{
Task: "some-task",
@ -571,13 +573,15 @@ var _ = Describe("ValidateConfig", func() {
Get: "some-resource",
})
job.Plan = append(job.Plan, PlanConfig{
Parallel: &PlanSequence{
{
Get: "some-resource",
InParallel: &InParallelConfig{
Steps: PlanSequence{
{
Get: "some-resource",
},
},
Limit: 1,
FailFast: true,
},
MaxInParallel: 1,
FailFast: true,
})
config.Jobs = append(config.Jobs, job)
@ -595,12 +599,12 @@ var _ = Describe("ValidateConfig", func() {
Context("when it's not just Get and Put", func() {
BeforeEach(func() {
job.Plan = append(job.Plan, PlanConfig{
Get: "some-resource",
Put: "some-resource",
Task: "some-resource",
Do: &PlanSequence{},
Aggregate: &PlanSequence{},
Parallel: &PlanSequence{},
Get: "some-resource",
Put: "some-resource",
Task: "some-resource",
Do: &PlanSequence{},
Aggregate: &PlanSequence{},
InParallel: &InParallelConfig{},
})
config.Jobs = append(config.Jobs, job)
@ -616,12 +620,12 @@ var _ = Describe("ValidateConfig", func() {
Context("when it's just Get and Put (this was valid at one point)", func() {
BeforeEach(func() {
job.Plan = append(job.Plan, PlanConfig{
Get: "some-resource",
Put: "some-resource",
Task: "",
Do: nil,
Aggregate: nil,
Parallel: nil,
Get: "some-resource",
Put: "some-resource",
Task: "",
Do: nil,
Aggregate: nil,
InParallel: nil,
})
config.Jobs = append(config.Jobs, job)

View File

@ -30,25 +30,26 @@ jobs:
- get: every-minute
trigger: true
- in_parallel:
- task: echo1
config: *config
- task: echo2
config:
platform: linux
image_resource:
type: registry-image
source:
repository: busybox
run:
path: /bin/sh
args:
- -cex
- |
sleep 5
eccho
- task: echo3
config: *config
- task: echo4
config: *config
max_in_parallel: 3
fail_fast: true
limit: 3
steps:
- task: echo1
config: *config
- task: echo2
config:
platform: linux
image_resource:
type: registry-image
source:
repository: busybox
run:
path: /bin/sh
args:
- -cex
- |
sleep 5
eccho
- task: echo3
config: *config
- task: echo4
config: *config
fail_fast: true