Merge pull request #3830 from concourse/feature/3811-task-cache

atc: separate worker_task_cache table
This commit is contained in:
Alex Suraci 2019-05-22 09:43:00 -04:00 committed by GitHub
commit d61d5dab80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 991 additions and 255 deletions

View File

@ -531,6 +531,7 @@ func (cmd *RunCommand) constructAPIMembers(
dbWorkerBaseResourceTypeFactory := db.NewWorkerBaseResourceTypeFactory(dbConn)
dbWorkerTaskCacheFactory := db.NewWorkerTaskCacheFactory(dbConn)
dbTaskCacheFactory := db.NewTaskCacheFactory(dbConn)
dbVolumeRepository := db.NewVolumeRepository(dbConn)
dbWorkerFactory := db.NewWorkerFactory(dbConn)
workerVersion, err := workerVersion()
@ -545,6 +546,7 @@ func (cmd *RunCommand) constructAPIMembers(
dbResourceCacheFactory,
dbResourceConfigFactory,
dbWorkerBaseResourceTypeFactory,
dbTaskCacheFactory,
dbWorkerTaskCacheFactory,
dbVolumeRepository,
teamFactory,
@ -707,6 +709,7 @@ func (cmd *RunCommand) constructBackendMembers(
)
dbWorkerBaseResourceTypeFactory := db.NewWorkerBaseResourceTypeFactory(dbConn)
dbTaskCacheFactory := db.NewTaskCacheFactory(dbConn)
dbWorkerTaskCacheFactory := db.NewWorkerTaskCacheFactory(dbConn)
dbVolumeRepository := db.NewVolumeRepository(dbConn)
dbWorkerFactory := db.NewWorkerFactory(dbConn)
@ -722,6 +725,7 @@ func (cmd *RunCommand) constructBackendMembers(
dbResourceCacheFactory,
dbResourceConfigFactory,
dbWorkerBaseResourceTypeFactory,
dbTaskCacheFactory,
dbWorkerTaskCacheFactory,
dbVolumeRepository,
teamFactory,

View File

@ -38,6 +38,7 @@ var (
resourceConfigCheckSessionLifecycle db.ResourceConfigCheckSessionLifecycle
resourceConfigFactory db.ResourceConfigFactory
resourceCacheFactory db.ResourceCacheFactory
taskCacheFactory db.TaskCacheFactory
workerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory
workerTaskCacheFactory db.WorkerTaskCacheFactory
@ -101,6 +102,7 @@ var _ = BeforeEach(func() {
resourceConfigCheckSessionLifecycle = db.NewResourceConfigCheckSessionLifecycle(dbConn)
resourceConfigFactory = db.NewResourceConfigFactory(dbConn, lockFactory)
resourceCacheFactory = db.NewResourceCacheFactory(dbConn, lockFactory)
taskCacheFactory = db.NewTaskCacheFactory(dbConn)
workerBaseResourceTypeFactory = db.NewWorkerBaseResourceTypeFactory(dbConn)
workerTaskCacheFactory = db.NewWorkerTaskCacheFactory(dbConn)

View File

@ -0,0 +1,206 @@
// Code generated by counterfeiter. DO NOT EDIT.
package dbfakes
import (
sync "sync"
db "github.com/concourse/concourse/atc/db"
)
type FakeTaskCacheFactory struct {
FindStub func(int, string, string) (db.UsedTaskCache, bool, error)
findMutex sync.RWMutex
findArgsForCall []struct {
arg1 int
arg2 string
arg3 string
}
findReturns struct {
result1 db.UsedTaskCache
result2 bool
result3 error
}
findReturnsOnCall map[int]struct {
result1 db.UsedTaskCache
result2 bool
result3 error
}
FindOrCreateStub func(int, string, string) (db.UsedTaskCache, error)
findOrCreateMutex sync.RWMutex
findOrCreateArgsForCall []struct {
arg1 int
arg2 string
arg3 string
}
findOrCreateReturns struct {
result1 db.UsedTaskCache
result2 error
}
findOrCreateReturnsOnCall map[int]struct {
result1 db.UsedTaskCache
result2 error
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *FakeTaskCacheFactory) Find(arg1 int, arg2 string, arg3 string) (db.UsedTaskCache, bool, error) {
fake.findMutex.Lock()
ret, specificReturn := fake.findReturnsOnCall[len(fake.findArgsForCall)]
fake.findArgsForCall = append(fake.findArgsForCall, struct {
arg1 int
arg2 string
arg3 string
}{arg1, arg2, arg3})
fake.recordInvocation("Find", []interface{}{arg1, arg2, arg3})
fake.findMutex.Unlock()
if fake.FindStub != nil {
return fake.FindStub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1, ret.result2, ret.result3
}
fakeReturns := fake.findReturns
return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3
}
func (fake *FakeTaskCacheFactory) FindCallCount() int {
fake.findMutex.RLock()
defer fake.findMutex.RUnlock()
return len(fake.findArgsForCall)
}
func (fake *FakeTaskCacheFactory) FindCalls(stub func(int, string, string) (db.UsedTaskCache, bool, error)) {
fake.findMutex.Lock()
defer fake.findMutex.Unlock()
fake.FindStub = stub
}
func (fake *FakeTaskCacheFactory) FindArgsForCall(i int) (int, string, string) {
fake.findMutex.RLock()
defer fake.findMutex.RUnlock()
argsForCall := fake.findArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeTaskCacheFactory) FindReturns(result1 db.UsedTaskCache, result2 bool, result3 error) {
fake.findMutex.Lock()
defer fake.findMutex.Unlock()
fake.FindStub = nil
fake.findReturns = struct {
result1 db.UsedTaskCache
result2 bool
result3 error
}{result1, result2, result3}
}
func (fake *FakeTaskCacheFactory) FindReturnsOnCall(i int, result1 db.UsedTaskCache, result2 bool, result3 error) {
fake.findMutex.Lock()
defer fake.findMutex.Unlock()
fake.FindStub = nil
if fake.findReturnsOnCall == nil {
fake.findReturnsOnCall = make(map[int]struct {
result1 db.UsedTaskCache
result2 bool
result3 error
})
}
fake.findReturnsOnCall[i] = struct {
result1 db.UsedTaskCache
result2 bool
result3 error
}{result1, result2, result3}
}
func (fake *FakeTaskCacheFactory) FindOrCreate(arg1 int, arg2 string, arg3 string) (db.UsedTaskCache, error) {
fake.findOrCreateMutex.Lock()
ret, specificReturn := fake.findOrCreateReturnsOnCall[len(fake.findOrCreateArgsForCall)]
fake.findOrCreateArgsForCall = append(fake.findOrCreateArgsForCall, struct {
arg1 int
arg2 string
arg3 string
}{arg1, arg2, arg3})
fake.recordInvocation("FindOrCreate", []interface{}{arg1, arg2, arg3})
fake.findOrCreateMutex.Unlock()
if fake.FindOrCreateStub != nil {
return fake.FindOrCreateStub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1, ret.result2
}
fakeReturns := fake.findOrCreateReturns
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeTaskCacheFactory) FindOrCreateCallCount() int {
fake.findOrCreateMutex.RLock()
defer fake.findOrCreateMutex.RUnlock()
return len(fake.findOrCreateArgsForCall)
}
func (fake *FakeTaskCacheFactory) FindOrCreateCalls(stub func(int, string, string) (db.UsedTaskCache, error)) {
fake.findOrCreateMutex.Lock()
defer fake.findOrCreateMutex.Unlock()
fake.FindOrCreateStub = stub
}
func (fake *FakeTaskCacheFactory) FindOrCreateArgsForCall(i int) (int, string, string) {
fake.findOrCreateMutex.RLock()
defer fake.findOrCreateMutex.RUnlock()
argsForCall := fake.findOrCreateArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeTaskCacheFactory) FindOrCreateReturns(result1 db.UsedTaskCache, result2 error) {
fake.findOrCreateMutex.Lock()
defer fake.findOrCreateMutex.Unlock()
fake.FindOrCreateStub = nil
fake.findOrCreateReturns = struct {
result1 db.UsedTaskCache
result2 error
}{result1, result2}
}
func (fake *FakeTaskCacheFactory) FindOrCreateReturnsOnCall(i int, result1 db.UsedTaskCache, result2 error) {
fake.findOrCreateMutex.Lock()
defer fake.findOrCreateMutex.Unlock()
fake.FindOrCreateStub = nil
if fake.findOrCreateReturnsOnCall == nil {
fake.findOrCreateReturnsOnCall = make(map[int]struct {
result1 db.UsedTaskCache
result2 error
})
}
fake.findOrCreateReturnsOnCall[i] = struct {
result1 db.UsedTaskCache
result2 error
}{result1, result2}
}
func (fake *FakeTaskCacheFactory) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.findMutex.RLock()
defer fake.findMutex.RUnlock()
fake.findOrCreateMutex.RLock()
defer fake.findOrCreateMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *FakeTaskCacheFactory) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
var _ db.TaskCacheFactory = new(FakeTaskCacheFactory)

View File

@ -2,10 +2,10 @@
package dbfakes
import (
"sync"
"time"
sync "sync"
time "time"
"github.com/concourse/concourse/atc/db"
db "github.com/concourse/concourse/atc/db"
)
type FakeVolumeRepository struct {
@ -173,20 +173,21 @@ type FakeVolumeRepository struct {
result2 db.CreatedVolume
result3 error
}
FindTaskCacheVolumeStub func(int, *db.UsedWorkerTaskCache) (db.CreatingVolume, db.CreatedVolume, error)
FindTaskCacheVolumeStub func(int, string, db.UsedTaskCache) (db.CreatedVolume, bool, error)
findTaskCacheVolumeMutex sync.RWMutex
findTaskCacheVolumeArgsForCall []struct {
arg1 int
arg2 *db.UsedWorkerTaskCache
arg2 string
arg3 db.UsedTaskCache
}
findTaskCacheVolumeReturns struct {
result1 db.CreatingVolume
result2 db.CreatedVolume
result1 db.CreatedVolume
result2 bool
result3 error
}
findTaskCacheVolumeReturnsOnCall map[int]struct {
result1 db.CreatingVolume
result2 db.CreatedVolume
result1 db.CreatedVolume
result2 bool
result3 error
}
FindVolumesForContainerStub func(db.CreatedContainer) ([]db.CreatedVolume, error)
@ -995,17 +996,18 @@ func (fake *FakeVolumeRepository) FindResourceCertsVolumeReturnsOnCall(i int, re
}{result1, result2, result3}
}
func (fake *FakeVolumeRepository) FindTaskCacheVolume(arg1 int, arg2 *db.UsedWorkerTaskCache) (db.CreatingVolume, db.CreatedVolume, error) {
func (fake *FakeVolumeRepository) FindTaskCacheVolume(arg1 int, arg2 string, arg3 db.UsedTaskCache) (db.CreatedVolume, bool, error) {
fake.findTaskCacheVolumeMutex.Lock()
ret, specificReturn := fake.findTaskCacheVolumeReturnsOnCall[len(fake.findTaskCacheVolumeArgsForCall)]
fake.findTaskCacheVolumeArgsForCall = append(fake.findTaskCacheVolumeArgsForCall, struct {
arg1 int
arg2 *db.UsedWorkerTaskCache
}{arg1, arg2})
fake.recordInvocation("FindTaskCacheVolume", []interface{}{arg1, arg2})
arg2 string
arg3 db.UsedTaskCache
}{arg1, arg2, arg3})
fake.recordInvocation("FindTaskCacheVolume", []interface{}{arg1, arg2, arg3})
fake.findTaskCacheVolumeMutex.Unlock()
if fake.FindTaskCacheVolumeStub != nil {
return fake.FindTaskCacheVolumeStub(arg1, arg2)
return fake.FindTaskCacheVolumeStub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1, ret.result2, ret.result3
@ -1020,44 +1022,44 @@ func (fake *FakeVolumeRepository) FindTaskCacheVolumeCallCount() int {
return len(fake.findTaskCacheVolumeArgsForCall)
}
func (fake *FakeVolumeRepository) FindTaskCacheVolumeCalls(stub func(int, *db.UsedWorkerTaskCache) (db.CreatingVolume, db.CreatedVolume, error)) {
func (fake *FakeVolumeRepository) FindTaskCacheVolumeCalls(stub func(int, string, db.UsedTaskCache) (db.CreatedVolume, bool, error)) {
fake.findTaskCacheVolumeMutex.Lock()
defer fake.findTaskCacheVolumeMutex.Unlock()
fake.FindTaskCacheVolumeStub = stub
}
func (fake *FakeVolumeRepository) FindTaskCacheVolumeArgsForCall(i int) (int, *db.UsedWorkerTaskCache) {
func (fake *FakeVolumeRepository) FindTaskCacheVolumeArgsForCall(i int) (int, string, db.UsedTaskCache) {
fake.findTaskCacheVolumeMutex.RLock()
defer fake.findTaskCacheVolumeMutex.RUnlock()
argsForCall := fake.findTaskCacheVolumeArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeVolumeRepository) FindTaskCacheVolumeReturns(result1 db.CreatingVolume, result2 db.CreatedVolume, result3 error) {
func (fake *FakeVolumeRepository) FindTaskCacheVolumeReturns(result1 db.CreatedVolume, result2 bool, result3 error) {
fake.findTaskCacheVolumeMutex.Lock()
defer fake.findTaskCacheVolumeMutex.Unlock()
fake.FindTaskCacheVolumeStub = nil
fake.findTaskCacheVolumeReturns = struct {
result1 db.CreatingVolume
result2 db.CreatedVolume
result1 db.CreatedVolume
result2 bool
result3 error
}{result1, result2, result3}
}
func (fake *FakeVolumeRepository) FindTaskCacheVolumeReturnsOnCall(i int, result1 db.CreatingVolume, result2 db.CreatedVolume, result3 error) {
func (fake *FakeVolumeRepository) FindTaskCacheVolumeReturnsOnCall(i int, result1 db.CreatedVolume, result2 bool, result3 error) {
fake.findTaskCacheVolumeMutex.Lock()
defer fake.findTaskCacheVolumeMutex.Unlock()
fake.FindTaskCacheVolumeStub = nil
if fake.findTaskCacheVolumeReturnsOnCall == nil {
fake.findTaskCacheVolumeReturnsOnCall = make(map[int]struct {
result1 db.CreatingVolume
result2 db.CreatedVolume
result1 db.CreatedVolume
result2 bool
result3 error
})
}
fake.findTaskCacheVolumeReturnsOnCall[i] = struct {
result1 db.CreatingVolume
result2 db.CreatedVolume
result1 db.CreatedVolume
result2 bool
result3 error
}{result1, result2, result3}
}

View File

@ -2,19 +2,16 @@
package dbfakes
import (
"sync"
sync "sync"
"github.com/concourse/concourse/atc/db"
db "github.com/concourse/concourse/atc/db"
)
type FakeWorkerTaskCacheFactory struct {
FindStub func(int, string, string, string) (*db.UsedWorkerTaskCache, bool, error)
FindStub func(db.WorkerTaskCache) (*db.UsedWorkerTaskCache, bool, error)
findMutex sync.RWMutex
findArgsForCall []struct {
arg1 int
arg2 string
arg3 string
arg4 string
arg1 db.WorkerTaskCache
}
findReturns struct {
result1 *db.UsedWorkerTaskCache
@ -26,13 +23,10 @@ type FakeWorkerTaskCacheFactory struct {
result2 bool
result3 error
}
FindOrCreateStub func(int, string, string, string) (*db.UsedWorkerTaskCache, error)
FindOrCreateStub func(db.WorkerTaskCache) (*db.UsedWorkerTaskCache, error)
findOrCreateMutex sync.RWMutex
findOrCreateArgsForCall []struct {
arg1 int
arg2 string
arg3 string
arg4 string
arg1 db.WorkerTaskCache
}
findOrCreateReturns struct {
result1 *db.UsedWorkerTaskCache
@ -46,19 +40,16 @@ type FakeWorkerTaskCacheFactory struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeWorkerTaskCacheFactory) Find(arg1 int, arg2 string, arg3 string, arg4 string) (*db.UsedWorkerTaskCache, bool, error) {
func (fake *FakeWorkerTaskCacheFactory) Find(arg1 db.WorkerTaskCache) (*db.UsedWorkerTaskCache, bool, error) {
fake.findMutex.Lock()
ret, specificReturn := fake.findReturnsOnCall[len(fake.findArgsForCall)]
fake.findArgsForCall = append(fake.findArgsForCall, struct {
arg1 int
arg2 string
arg3 string
arg4 string
}{arg1, arg2, arg3, arg4})
fake.recordInvocation("Find", []interface{}{arg1, arg2, arg3, arg4})
arg1 db.WorkerTaskCache
}{arg1})
fake.recordInvocation("Find", []interface{}{arg1})
fake.findMutex.Unlock()
if fake.FindStub != nil {
return fake.FindStub(arg1, arg2, arg3, arg4)
return fake.FindStub(arg1)
}
if specificReturn {
return ret.result1, ret.result2, ret.result3
@ -73,17 +64,17 @@ func (fake *FakeWorkerTaskCacheFactory) FindCallCount() int {
return len(fake.findArgsForCall)
}
func (fake *FakeWorkerTaskCacheFactory) FindCalls(stub func(int, string, string, string) (*db.UsedWorkerTaskCache, bool, error)) {
func (fake *FakeWorkerTaskCacheFactory) FindCalls(stub func(db.WorkerTaskCache) (*db.UsedWorkerTaskCache, bool, error)) {
fake.findMutex.Lock()
defer fake.findMutex.Unlock()
fake.FindStub = stub
}
func (fake *FakeWorkerTaskCacheFactory) FindArgsForCall(i int) (int, string, string, string) {
func (fake *FakeWorkerTaskCacheFactory) FindArgsForCall(i int) db.WorkerTaskCache {
fake.findMutex.RLock()
defer fake.findMutex.RUnlock()
argsForCall := fake.findArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
return argsForCall.arg1
}
func (fake *FakeWorkerTaskCacheFactory) FindReturns(result1 *db.UsedWorkerTaskCache, result2 bool, result3 error) {
@ -115,19 +106,16 @@ func (fake *FakeWorkerTaskCacheFactory) FindReturnsOnCall(i int, result1 *db.Use
}{result1, result2, result3}
}
func (fake *FakeWorkerTaskCacheFactory) FindOrCreate(arg1 int, arg2 string, arg3 string, arg4 string) (*db.UsedWorkerTaskCache, error) {
func (fake *FakeWorkerTaskCacheFactory) FindOrCreate(arg1 db.WorkerTaskCache) (*db.UsedWorkerTaskCache, error) {
fake.findOrCreateMutex.Lock()
ret, specificReturn := fake.findOrCreateReturnsOnCall[len(fake.findOrCreateArgsForCall)]
fake.findOrCreateArgsForCall = append(fake.findOrCreateArgsForCall, struct {
arg1 int
arg2 string
arg3 string
arg4 string
}{arg1, arg2, arg3, arg4})
fake.recordInvocation("FindOrCreate", []interface{}{arg1, arg2, arg3, arg4})
arg1 db.WorkerTaskCache
}{arg1})
fake.recordInvocation("FindOrCreate", []interface{}{arg1})
fake.findOrCreateMutex.Unlock()
if fake.FindOrCreateStub != nil {
return fake.FindOrCreateStub(arg1, arg2, arg3, arg4)
return fake.FindOrCreateStub(arg1)
}
if specificReturn {
return ret.result1, ret.result2
@ -142,17 +130,17 @@ func (fake *FakeWorkerTaskCacheFactory) FindOrCreateCallCount() int {
return len(fake.findOrCreateArgsForCall)
}
func (fake *FakeWorkerTaskCacheFactory) FindOrCreateCalls(stub func(int, string, string, string) (*db.UsedWorkerTaskCache, error)) {
func (fake *FakeWorkerTaskCacheFactory) FindOrCreateCalls(stub func(db.WorkerTaskCache) (*db.UsedWorkerTaskCache, error)) {
fake.findOrCreateMutex.Lock()
defer fake.findOrCreateMutex.Unlock()
fake.FindOrCreateStub = stub
}
func (fake *FakeWorkerTaskCacheFactory) FindOrCreateArgsForCall(i int) (int, string, string, string) {
func (fake *FakeWorkerTaskCacheFactory) FindOrCreateArgsForCall(i int) db.WorkerTaskCache {
fake.findOrCreateMutex.RLock()
defer fake.findOrCreateMutex.RUnlock()
argsForCall := fake.findOrCreateArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
return argsForCall.arg1
}
func (fake *FakeWorkerTaskCacheFactory) FindOrCreateReturns(result1 *db.UsedWorkerTaskCache, result2 error) {

View File

@ -563,7 +563,7 @@ func (j *job) ClearTaskCache(stepName string, cachePath string) (int64, error) {
defer Rollback(tx)
var sqlBuilder sq.DeleteBuilder = psql.Delete("worker_task_caches").
var sqlBuilder sq.DeleteBuilder = psql.Delete("task_caches").
Where(sq.Eq{
"job_id": j.id,
"step_name": stepName,

View File

@ -1319,8 +1319,8 @@ var _ = Describe("Job", func() {
})
})
Describe("Clear worker task cache", func() {
Context("when worker task cache exists", func() {
Describe("Clear task cache", func() {
Context("when task cache exists", func() {
var (
someOtherJob db.Job
rowsDeleted int64
@ -1332,7 +1332,13 @@ var _ = Describe("Job", func() {
found bool
)
_, err = workerTaskCacheFactory.FindOrCreate(job.ID(), "some-task", "some-path", defaultWorker.Name())
usedTaskCache, err := taskCacheFactory.FindOrCreate(job.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
_, err = workerTaskCacheFactory.FindOrCreate(db.WorkerTaskCache{
TaskCache: usedTaskCache,
WorkerName: defaultWorker.Name(),
})
Expect(err).ToNot(HaveOccurred())
someOtherJob, found, err = pipeline.Job("some-other-job")
@ -1340,7 +1346,13 @@ var _ = Describe("Job", func() {
Expect(found).To(BeTrue())
Expect(someOtherJob).ToNot(BeNil())
_, err = workerTaskCacheFactory.FindOrCreate(someOtherJob.ID(), "some-other-task", "some-other-path", defaultWorker.Name())
otherUsedTaskCache, err := taskCacheFactory.FindOrCreate(someOtherJob.ID(), "some-other-task", "some-other-path")
Expect(err).ToNot(HaveOccurred())
_, err = workerTaskCacheFactory.FindOrCreate(db.WorkerTaskCache{
TaskCache: otherUsedTaskCache,
WorkerName: defaultWorker.Name(),
})
Expect(err).ToNot(HaveOccurred())
})
@ -1352,20 +1364,28 @@ var _ = Describe("Job", func() {
Expect(err).NotTo(HaveOccurred())
})
It("deletes a row from the worker_task_caches table", func() {
It("deletes a row from the task_caches table", func() {
Expect(rowsDeleted).To(Equal(int64(1)))
})
It("removes the task cache", func() {
_, found, err := workerTaskCacheFactory.Find(job.ID(), "some-task", "some-path", defaultWorker.Name())
Expect(found).To(BeFalse())
usedTaskCache, found, err := taskCacheFactory.Find(job.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
Expect(usedTaskCache).To(BeNil())
Expect(found).To(BeFalse())
})
It("doesn't remove other jobs caches", func() {
_, found, err := workerTaskCacheFactory.Find(someOtherJob.ID(), "some-other-task", "some-other-path", defaultWorker.Name())
otherUsedTaskCache, found, err := taskCacheFactory.Find(someOtherJob.ID(), "some-other-task", "some-other-path")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
Expect(err).ToNot(HaveOccurred())
_, err = workerTaskCacheFactory.FindOrCreate(db.WorkerTaskCache{
TaskCache: otherUsedTaskCache,
WorkerName: defaultWorker.Name(),
})
Expect(err).ToNot(HaveOccurred())
})
Context("but the cache path doesn't exist", func() {
@ -1389,12 +1409,20 @@ var _ = Describe("Job", func() {
Expect(err).NotTo(HaveOccurred())
})
It("does not delete any rows from the worker_task_caches table", func() {
It("does not delete any rows from the task_caches table", func() {
Expect(rowsDeleted).To(BeZero())
})
It("should not delete any other task steps", func() {
_, found, err := workerTaskCacheFactory.Find(job.ID(), "some-task", "some-path", defaultWorker.Name())
It("should not delete any task steps", func() {
usedTaskCache, found, err := taskCacheFactory.Find(job.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
Expect(err).ToNot(HaveOccurred())
_, found, err = workerTaskCacheFactory.Find(db.WorkerTaskCache{
TaskCache: usedTaskCache,
WorkerName: defaultWorker.Name(),
})
Expect(found).To(BeTrue())
Expect(err).ToNot(HaveOccurred())
})
@ -1408,18 +1436,18 @@ var _ = Describe("Job", func() {
Expect(err).NotTo(HaveOccurred())
})
It("deletes a row from the worker_task_caches table", func() {
It("deletes a row from the task_caches table", func() {
Expect(rowsDeleted).To(Equal(int64(1)))
})
It("removes the task cache", func() {
_, found, err := workerTaskCacheFactory.Find(job.ID(), "some-task", "some-path", defaultWorker.Name())
_, found, err := taskCacheFactory.Find(job.ID(), "some-task", "some-path")
Expect(found).To(BeFalse())
Expect(err).ToNot(HaveOccurred())
})
It("doesn't remove other jobs caches", func() {
_, found, err := workerTaskCacheFactory.Find(someOtherJob.ID(), "some-other-task", "some-other-path", defaultWorker.Name())
_, found, err := taskCacheFactory.Find(someOtherJob.ID(), "some-other-task", "some-other-path")
Expect(found).To(BeTrue())
Expect(err).ToNot(HaveOccurred())
})

View File

@ -0,0 +1,36 @@
BEGIN;
ALTER TABLE worker_task_caches
ADD job_id integer,
ADD step_name text,
ADD path text;
UPDATE worker_task_caches
SET job_id=tc.job_id,
step_name=tc.step_name,
path=tc.path
FROM task_caches tc
WHERE worker_task_caches.task_cache_id=tc.id;
ALTER TABLE worker_task_caches
ALTER COLUMN step_name SET NOT NULL,
ALTER COLUMN path SET NOT NULL;
DROP INDEX task_caches_job_id_step_name_path_uniq;
DROP INDEX worker_task_caches_worker_name_task_cache_id_uniq;
CREATE UNIQUE INDEX worker_task_caches_uniq
ON worker_task_caches (job_id, step_name, worker_name, path);
CREATE INDEX worker_task_caches_job_id ON worker_task_caches USING btree (job_id);
ALTER TABLE ONLY worker_task_caches
ADD CONSTRAINT worker_task_caches_job_id_fkey FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE;
ALTER TABLE worker_task_caches DROP CONSTRAINT worker_task_caches_task_cache_fkey;
ALTER TABLE ONLY worker_task_caches
DROP COLUMN task_cache_id;
DROP TABLE task_caches;
COMMIT;

View File

@ -0,0 +1,52 @@
BEGIN;
CREATE TABLE task_caches (
id serial,
worker_name text,
job_id integer,
step_name text NOT NULL,
path text NOT NULL,
PRIMARY KEY ("id")
);
CREATE UNIQUE INDEX task_caches_job_id_step_name_path_uniq
ON task_caches (job_id, step_name, path);
CREATE INDEX task_caches_job_id ON task_caches USING btree (job_id);
ALTER TABLE ONLY task_caches
ADD CONSTRAINT task_caches_job_id_fkey FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE;
ALTER TABLE worker_task_caches
ADD COLUMN task_cache_id INTEGER;
ALTER TABLE worker_task_caches
ADD CONSTRAINT worker_task_caches_task_cache_fkey FOREIGN KEY (task_cache_id) REFERENCES task_caches (id) ON DELETE CASCADE;
CREATE INDEX worker_task_caches_task_cache_id ON worker_task_caches (task_cache_id);
WITH ins AS (
INSERT INTO task_caches (worker_name, job_id, step_name, path)
SELECT DISTINCT wtc.worker_name, wtc.job_id, wtc.step_name, wtc.path
FROM worker_task_caches wtc
RETURNING *
)
UPDATE worker_task_caches wtc
SET task_cache_id = ins.id
FROM ins
WHERE ins.worker_name = wtc.worker_name
AND ins.job_id = wtc.job_id
AND ins.step_name = wtc.step_name
AND ins.path = wtc.path;
DROP INDEX worker_task_caches_uniq;
CREATE UNIQUE INDEX worker_task_caches_worker_name_task_cache_id_uniq
ON worker_task_caches (worker_name, task_cache_id);
ALTER TABLE worker_task_caches
DROP COLUMN job_id,
DROP COLUMN step_name,
DROP COLUMN path;
ALTER TABLE task_caches
DROP COLUMN worker_name;
COMMIT;

98
atc/db/task_cache.go Normal file
View File

@ -0,0 +1,98 @@
package db
import (
"database/sql"
sq "github.com/Masterminds/squirrel"
)
type usedTaskCache struct {
id int
jobID int
stepName string
path string
}
type UsedTaskCache interface {
ID() int
JobID() int
StepName() string
Path() string
}
func (tc *usedTaskCache) ID() int { return tc.id }
func (tc *usedTaskCache) JobID() int { return tc.jobID }
func (tc *usedTaskCache) StepName() string { return tc.stepName }
func (tc *usedTaskCache) Path() string { return tc.path }
func (f usedTaskCache) findOrCreate(tx Tx) (UsedTaskCache, error) {
utc, found, err := f.find(tx)
if err != nil {
return nil, err
}
if !found {
var id int
err = psql.Insert("task_caches").
Columns(
"job_id",
"step_name",
"path",
).
Values(
f.jobID,
f.stepName,
f.path,
).
Suffix(`
ON CONFLICT (job_id, step_name, path) DO UPDATE SET
job_id = ?
RETURNING id
`, f.jobID).
RunWith(tx).
QueryRow().
Scan(&id)
if err != nil {
return nil, err
}
return &usedTaskCache{
id: id,
jobID: f.jobID,
stepName: f.stepName,
path: f.path,
}, nil
}
return utc, nil
}
func (f usedTaskCache) find(runner sq.Runner) (UsedTaskCache, bool, error) {
var id int
err := psql.Select("id").
From("task_caches").
Where(sq.Eq{
"job_id": f.jobID,
"step_name": f.stepName,
"path": f.path,
}).
RunWith(runner).
QueryRow().
Scan(&id)
if err != nil {
if err == sql.ErrNoRows {
return nil, false, nil
}
return nil, false, err
}
return &usedTaskCache{
id: id,
jobID: f.jobID,
stepName: f.stepName,
path: f.path,
}, true, nil
}

View File

@ -0,0 +1,52 @@
package db
//go:generate counterfeiter . TaskCacheFactory
type TaskCacheFactory interface {
Find(jobID int, stepName string, path string) (UsedTaskCache, bool, error)
FindOrCreate(jobID int, stepName string, path string) (UsedTaskCache, error)
}
type taskCacheFactory struct {
conn Conn
}
func NewTaskCacheFactory(conn Conn) TaskCacheFactory {
return &taskCacheFactory{
conn: conn,
}
}
func (f *taskCacheFactory) Find(jobID int, stepName string, path string) (UsedTaskCache, bool, error) {
return usedTaskCache{
jobID: jobID,
stepName: stepName,
path: path,
}.find(f.conn)
}
func (f *taskCacheFactory) FindOrCreate(jobID int, stepName string, path string) (UsedTaskCache, error) {
tx, err := f.conn.Begin()
if err != nil {
return nil, err
}
defer Rollback(tx)
utc, err := usedTaskCache{
jobID: jobID,
stepName: stepName,
path: path,
}.findOrCreate(tx)
if err != nil {
return nil, err
}
err = tx.Commit()
if err != nil {
return nil, err
}
return utc, nil
}

View File

@ -0,0 +1,89 @@
package db_test
import (
"github.com/concourse/concourse/atc/db"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("TaskCacheFactory", func() {
Describe("FindOrCreate", func() {
Context("when there is no existing task cache", func() {
It("creates resource cache in database", func() {
usedTaskCache, err := taskCacheFactory.FindOrCreate(
defaultJob.ID(),
"some-step",
"some-path",
)
Expect(err).ToNot(HaveOccurred())
Expect(usedTaskCache.ID()).ToNot(BeNil())
})
})
Context("when there is existing task cache", func() {
var (
usedTaskCache db.UsedTaskCache
err error
)
BeforeEach(func() {
usedTaskCache, err = taskCacheFactory.FindOrCreate(
defaultJob.ID(),
"some-step",
"some-path",
)
Expect(err).ToNot(HaveOccurred())
})
It("creates a new task cache for another task", func() {
otherTaskCache, err := taskCacheFactory.FindOrCreate(
defaultJob.ID(),
"some-other-step",
"some-path",
)
Expect(err).ToNot(HaveOccurred())
Expect(otherTaskCache.ID()).ToNot(Equal(usedTaskCache.ID()))
})
})
})
Describe("Find", func() {
Context("when there is no existing task cache", func() {
It("returns no found", func() {
usedTaskCache, found, err := taskCacheFactory.Find(
defaultJob.ID(),
"some-step",
"some-path",
)
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeFalse())
Expect(usedTaskCache).To(BeNil())
})
})
Context("when there is existing task cache", func() {
var (
usedTaskCache db.UsedTaskCache
err error
)
BeforeEach(func() {
usedTaskCache, err = taskCacheFactory.FindOrCreate(
defaultJob.ID(),
"some-step",
"some-path",
)
Expect(err).ToNot(HaveOccurred())
})
It("finds task cache in database", func() {
utc, found, err := taskCacheFactory.Find(defaultJob.ID(), "some-step", "some-path")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
Expect(utc.ID()).To(Equal(usedTaskCache.ID()))
})
})
})
})

View File

@ -2058,7 +2058,7 @@ var _ = Describe("Team", func() {
})
})
It("removes worker task caches for jobs that are no longer in pipeline", func() {
It("removes task caches for jobs that are no longer in pipeline", func() {
pipeline, _, err := team.SavePipeline(pipelineName, config, 0, db.PipelineNoChange)
Expect(err).ToNot(HaveOccurred())
@ -2066,10 +2066,10 @@ var _ = Describe("Team", func() {
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
_, err = workerTaskCacheFactory.FindOrCreate(job.ID(), "some-task", "some-path", defaultWorker.Name())
_, err = taskCacheFactory.FindOrCreate(job.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
_, found, err = workerTaskCacheFactory.Find(job.ID(), "some-task", "some-path", defaultWorker.Name())
_, found, err = taskCacheFactory.Find(job.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
@ -2078,12 +2078,12 @@ var _ = Describe("Team", func() {
_, _, err = team.SavePipeline(pipelineName, config, pipeline.ConfigVersion(), db.PipelineNoChange)
Expect(err).ToNot(HaveOccurred())
_, found, err = workerTaskCacheFactory.Find(job.ID(), "some-task", "some-path", defaultWorker.Name())
_, found, err = taskCacheFactory.Find(job.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeFalse())
})
It("removes worker task caches for tasks that are no longer exist", func() {
It("removes task caches for tasks that are no longer exist", func() {
pipeline, _, err := team.SavePipeline(pipelineName, config, 0, db.PipelineNoChange)
Expect(err).ToNot(HaveOccurred())
@ -2091,10 +2091,10 @@ var _ = Describe("Team", func() {
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
_, err = workerTaskCacheFactory.FindOrCreate(job.ID(), "some-task", "some-path", defaultWorker.Name())
_, err = taskCacheFactory.FindOrCreate(job.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
_, found, err = workerTaskCacheFactory.Find(job.ID(), "some-task", "some-path", defaultWorker.Name())
_, found, err = taskCacheFactory.Find(job.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
@ -2113,11 +2113,64 @@ var _ = Describe("Team", func() {
_, _, err = team.SavePipeline(pipelineName, config, pipeline.ConfigVersion(), db.PipelineNoChange)
Expect(err).ToNot(HaveOccurred())
_, found, err = workerTaskCacheFactory.Find(job.ID(), "some-task", "some-path", defaultWorker.Name())
_, found, err = taskCacheFactory.Find(job.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeFalse())
})
It("should not remove task caches in other pipeline", func() {
pipeline, _, err := team.SavePipeline(pipelineName, config, 0, db.PipelineNoChange)
Expect(err).ToNot(HaveOccurred())
otherPipeline, _, err := team.SavePipeline("other-pipeline", config, 0, db.PipelineNoChange)
Expect(err).ToNot(HaveOccurred())
job, found, err := pipeline.Job("some-job")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
_, err = taskCacheFactory.FindOrCreate(job.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
_, found, err = taskCacheFactory.Find(job.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
otherJob, found, err := otherPipeline.Job("some-job")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
_, err = taskCacheFactory.FindOrCreate(otherJob.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
_, found, err = taskCacheFactory.Find(otherJob.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
config.Jobs = []atc.JobConfig{
{
Name: "some-job",
Plan: atc.PlanSequence{
{
Task: "some-other-task",
TaskConfigPath: "some/config/path.yml",
},
},
},
}
_, _, err = team.SavePipeline(pipelineName, config, pipeline.ConfigVersion(), db.PipelineNoChange)
Expect(err).ToNot(HaveOccurred())
_, found, err = taskCacheFactory.Find(job.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeFalse())
_, found, err = taskCacheFactory.Find(otherJob.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
})
It("creates all of the serial groups from the jobs in the database", func() {
savedPipeline, _, err := team.SavePipeline(pipelineName, config, 0, db.PipelineNoChange)
Expect(err).ToNot(HaveOccurred())

View File

@ -153,7 +153,7 @@ type CreatedVolume interface {
InitializeResourceCache(UsedResourceCache) error
InitializeArtifact(name string, buildID int) (WorkerArtifact, error)
InitializeTaskCache(int, string, string) error
InitializeTaskCache(jobID int, stepName string, path string) error
ContainerHandle() string
ParentHandle() string
@ -219,9 +219,10 @@ func (volume *createdVolume) TaskIdentifier() (string, string, string, error) {
var jobName string
var stepName string
err := psql.Select("p.name, j.name, wtc.step_name").
err := psql.Select("p.name, j.name, tc.step_name").
From("worker_task_caches wtc").
LeftJoin("jobs j ON j.id = wtc.job_id").
LeftJoin("task_caches tc on tc.id = wtc.task_cache_id").
LeftJoin("jobs j ON j.id = tc.job_id").
LeftJoin("pipelines p ON p.id = j.pipeline_id").
Where(sq.Eq{
"wtc.id": volume.workerTaskCacheID,
@ -444,12 +445,19 @@ func (volume *createdVolume) InitializeTaskCache(jobID int, stepName string, pat
defer Rollback(tx)
usedTaskCache, err := usedTaskCache{
jobID: jobID,
stepName: stepName,
path: path,
}.findOrCreate(tx)
if err != nil {
return err
}
usedWorkerTaskCache, err := WorkerTaskCache{
JobID: jobID,
StepName: stepName,
WorkerName: volume.WorkerName(),
Path: path,
}.FindOrCreate(tx)
TaskCache: usedTaskCache,
}.findOrCreate(tx)
if err != nil {
return err
}

View File

@ -6,7 +6,7 @@ import (
"time"
sq "github.com/Masterminds/squirrel"
"github.com/nu7hatch/gouuid"
uuid "github.com/nu7hatch/gouuid"
)
//go:generate counterfeiter . VolumeRepository
@ -22,7 +22,7 @@ type VolumeRepository interface {
FindResourceCacheVolume(workerName string, resourceCache UsedResourceCache) (CreatedVolume, bool, error)
FindTaskCacheVolume(teamID int, uwtc *UsedWorkerTaskCache) (CreatingVolume, CreatedVolume, error)
FindTaskCacheVolume(teamID int, workerName string, taskCache UsedTaskCache) (CreatedVolume, bool, error)
CreateTaskCacheVolume(teamID int, uwtc *UsedWorkerTaskCache) (CreatingVolume, error)
FindResourceCertsVolume(workerName string, uwrc *UsedWorkerResourceCerts) (CreatingVolume, CreatedVolume, error)
@ -339,10 +339,32 @@ func (repository *volumeRepository) FindBaseResourceTypeVolume(uwbrt *UsedWorker
})
}
func (repository *volumeRepository) FindTaskCacheVolume(teamID int, uwtc *UsedWorkerTaskCache) (CreatingVolume, CreatedVolume, error) {
return repository.findVolume(teamID, uwtc.WorkerName, map[string]interface{}{
"v.worker_task_cache_id": uwtc.ID,
func (repository *volumeRepository) FindTaskCacheVolume(teamID int, workerName string, taskCache UsedTaskCache) (CreatedVolume, bool, error) {
usedWorkerTaskCache, found, err := WorkerTaskCache{
WorkerName: workerName,
TaskCache: taskCache,
}.find(repository.conn)
if err != nil {
return nil, false, err
}
if !found {
return nil, false, nil
}
_, createdVolume, err := repository.findVolume(teamID, workerName, map[string]interface{}{
"v.worker_task_cache_id": usedWorkerTaskCache.ID,
})
if err != nil {
return nil, false, err
}
if createdVolume == nil {
return nil, false, nil
}
return createdVolume, true, nil
}
func (repository *volumeRepository) CreateTaskCacheVolume(teamID int, uwtc *UsedWorkerTaskCache) (CreatingVolume, error) {

View File

@ -60,10 +60,16 @@ var _ = Describe("VolumeFactory", func() {
)
It("returns task cache volumes", func() {
taskCache, err := workerTaskCacheFactory.FindOrCreate(defaultJob.ID(), "some-step", "some-path", defaultWorker.Name())
taskCache, err := taskCacheFactory.FindOrCreate(defaultJob.ID(), "some-step", "some-path")
Expect(err).NotTo(HaveOccurred())
creatingVolume, err := volumeRepository.CreateTaskCacheVolume(defaultTeam.ID(), taskCache)
usedWorkerTaskCache, err := workerTaskCacheFactory.FindOrCreate(db.WorkerTaskCache{
TaskCache: taskCache,
WorkerName: defaultWorker.Name(),
})
Expect(err).NotTo(HaveOccurred())
creatingVolume, err := volumeRepository.CreateTaskCacheVolume(defaultTeam.ID(), usedWorkerTaskCache)
Expect(err).NotTo(HaveOccurred())
createdVolume, err := creatingVolume.Created()

View File

@ -327,21 +327,20 @@ var _ = Describe("Volume", func() {
})
It("sets current volume as worker task cache volume", func() {
uwtc, err := workerTaskCacheFactory.FindOrCreate(defaultJob.ID(), "some-step", "some-cache-path", defaultWorker.Name())
taskCache, err := taskCacheFactory.FindOrCreate(defaultJob.ID(), "some-step", "some-cache-path")
Expect(err).ToNot(HaveOccurred())
creatingVolume, createdVolume, err := volumeRepository.FindTaskCacheVolume(defaultTeam.ID(), uwtc)
createdVolume, found, err := volumeRepository.FindTaskCacheVolume(defaultTeam.ID(), defaultWorker.Name(), taskCache)
Expect(err).ToNot(HaveOccurred())
Expect(creatingVolume).To(BeNil())
Expect(found).To(BeTrue())
Expect(createdVolume).ToNot(BeNil())
Expect(createdVolume.Handle()).To(Equal(existingTaskCacheVolume.Handle()))
err = volume.InitializeTaskCache(defaultJob.ID(), "some-step", "some-cache-path")
Expect(err).ToNot(HaveOccurred())
creatingVolume, createdVolume, err = volumeRepository.FindTaskCacheVolume(defaultTeam.ID(), uwtc)
createdVolume, found, err = volumeRepository.FindTaskCacheVolume(defaultTeam.ID(), defaultWorker.Name(), taskCache)
Expect(err).ToNot(HaveOccurred())
Expect(creatingVolume).To(BeNil())
Expect(createdVolume).ToNot(BeNil())
Expect(createdVolume.Handle()).To(Equal(volume.Handle()))
@ -504,7 +503,13 @@ var _ = Describe("Volume", func() {
Describe("Task cache volumes", func() {
It("returns volume type and task identifier", func() {
uwtc, err := workerTaskCacheFactory.FindOrCreate(defaultJob.ID(), "some-task", "some-path", defaultWorker.Name())
taskCache, err := taskCacheFactory.FindOrCreate(defaultJob.ID(), "some-task", "some-path")
Expect(err).ToNot(HaveOccurred())
uwtc, err := workerTaskCacheFactory.FindOrCreate(db.WorkerTaskCache{
WorkerName: defaultWorker.Name(),
TaskCache: taskCache,
})
Expect(err).ToNot(HaveOccurred())
creatingVolume, err := volumeRepository.CreateTaskCacheVolume(defaultTeam.ID(), uwtc)

111
atc/db/worker_task_cache.go Normal file
View File

@ -0,0 +1,111 @@
package db
import (
"database/sql"
sq "github.com/Masterminds/squirrel"
"github.com/concourse/concourse/atc"
)
type WorkerTaskCache struct {
WorkerName string
TaskCache UsedTaskCache
}
type UsedWorkerTaskCache struct {
ID int
WorkerName string
}
func (wtc WorkerTaskCache) findOrCreate(
tx Tx,
) (*UsedWorkerTaskCache, error) {
uwtc, found, err := wtc.find(tx)
if err != nil {
return nil, err
}
if !found {
var id int
err = psql.Insert("worker_task_caches").
Columns(
"worker_name",
"task_cache_id",
).
Values(wtc.WorkerName, wtc.TaskCache.ID()).
Suffix(`
ON CONFLICT (worker_name, task_cache_id) DO UPDATE SET
task_cache_id = EXCLUDED.task_cache_id
RETURNING id
`).
RunWith(tx).
QueryRow().
Scan(&id)
if err != nil {
return nil, err
}
return &UsedWorkerTaskCache{
ID: id,
WorkerName: wtc.WorkerName,
}, nil
}
return uwtc, err
}
func (workerTaskCache WorkerTaskCache) find(runner sq.Runner) (*UsedWorkerTaskCache, bool, error) {
var id int
err := psql.Select("id").
From("worker_task_caches").
Where(sq.Eq{
"worker_name": workerTaskCache.WorkerName,
"task_cache_id": workerTaskCache.TaskCache.ID(),
}).
RunWith(runner).
QueryRow().
Scan(&id)
if err != nil {
if err == sql.ErrNoRows {
return nil, false, nil
}
return nil, false, err
}
return &UsedWorkerTaskCache{
ID: id,
WorkerName: workerTaskCache.WorkerName,
}, true, nil
}
func removeUnusedWorkerTaskCaches(tx Tx, pipelineID int, jobConfigs []atc.JobConfig) error {
steps := make(map[string][]string)
for _, jobConfig := range jobConfigs {
for _, jobConfigPlan := range jobConfig.Plan {
if jobConfigPlan.Task != "" {
steps[jobConfig.Name] = append(steps[jobConfig.Name], jobConfigPlan.Task)
}
}
}
query := sq.Or{}
for jobName, stepNames := range steps {
query = append(query, sq.And{sq.Eq{"j.name": jobName}, sq.NotEq{"tc.step_name": stepNames}})
}
_, err := psql.Delete("task_caches tc USING jobs j").
Where(
sq.Or{
query,
sq.Eq{
"j.active": false,
},
}).
Where(sq.Expr("j.id = tc.job_id")).
Where(sq.Eq{"j.pipeline_id": pipelineID}).
RunWith(tx).
Exec()
return err
}

View File

@ -1,22 +1,10 @@
package db
import (
"database/sql"
sq "github.com/Masterminds/squirrel"
"github.com/concourse/concourse/atc"
)
type UsedWorkerTaskCache struct {
ID int
WorkerName string
}
//go:generate counterfeiter . WorkerTaskCacheFactory
type WorkerTaskCacheFactory interface {
Find(jobID int, stepName string, path string, workerName string) (*UsedWorkerTaskCache, bool, error)
FindOrCreate(jobID int, stepName string, path string, workerName string) (*UsedWorkerTaskCache, error)
FindOrCreate(WorkerTaskCache) (*UsedWorkerTaskCache, error)
Find(WorkerTaskCache) (*UsedWorkerTaskCache, bool, error)
}
type workerTaskCacheFactory struct {
@ -29,41 +17,11 @@ func NewWorkerTaskCacheFactory(conn Conn) WorkerTaskCacheFactory {
}
}
func (f *workerTaskCacheFactory) Find(jobID int, stepName string, path string, workerName string) (*UsedWorkerTaskCache, bool, error) {
var id int
err := psql.Select("id").
From("worker_task_caches").
Where(sq.Eq{
"job_id": jobID,
"step_name": stepName,
"worker_name": workerName,
"path": path,
}).
RunWith(f.conn).
QueryRow().
Scan(&id)
if err != nil {
if err == sql.ErrNoRows {
return nil, false, nil
}
return nil, false, err
}
return &UsedWorkerTaskCache{
ID: id,
WorkerName: workerName,
}, true, nil
func (f *workerTaskCacheFactory) Find(workerTaskCache WorkerTaskCache) (*UsedWorkerTaskCache, bool, error) {
return workerTaskCache.find(f.conn)
}
func (f *workerTaskCacheFactory) FindOrCreate(jobID int, stepName string, path string, workerName string) (*UsedWorkerTaskCache, error) {
workerTaskCache := WorkerTaskCache{
JobID: jobID,
StepName: stepName,
WorkerName: workerName,
Path: path,
}
func (f *workerTaskCacheFactory) FindOrCreate(workerTaskCache WorkerTaskCache) (*UsedWorkerTaskCache, error) {
tx, err := f.conn.Begin()
if err != nil {
return nil, err
@ -71,7 +29,7 @@ func (f *workerTaskCacheFactory) FindOrCreate(jobID int, stepName string, path s
defer Rollback(tx)
usedWorkerTaskCache, err := workerTaskCache.FindOrCreate(tx)
usedWorkerTaskCache, err := workerTaskCache.findOrCreate(tx)
if err != nil {
return nil, err
}
@ -83,97 +41,3 @@ func (f *workerTaskCacheFactory) FindOrCreate(jobID int, stepName string, path s
return usedWorkerTaskCache, nil
}
type WorkerTaskCache struct {
JobID int
StepName string
WorkerName string
Path string
}
func (wtc WorkerTaskCache) FindOrCreate(
tx Tx,
) (*UsedWorkerTaskCache, error) {
var id int
err := psql.Select("id").
From("worker_task_caches").
Where(sq.Eq{
"job_id": wtc.JobID,
"step_name": wtc.StepName,
"worker_name": wtc.WorkerName,
"path": wtc.Path,
}).
RunWith(tx).
QueryRow().
Scan(&id)
if err != nil {
if err == sql.ErrNoRows {
err = psql.Insert("worker_task_caches").
Columns(
"job_id",
"step_name",
"worker_name",
"path",
).
Values(
wtc.JobID,
wtc.StepName,
wtc.WorkerName,
wtc.Path,
).
Suffix(`
ON CONFLICT (job_id, step_name, worker_name, path) DO UPDATE SET
path = ?
RETURNING id
`, wtc.Path).
RunWith(tx).
QueryRow().
Scan(&id)
if err != nil {
return nil, err
}
return &UsedWorkerTaskCache{
ID: id,
WorkerName: wtc.WorkerName,
}, nil
}
return nil, err
}
return &UsedWorkerTaskCache{
ID: id,
WorkerName: wtc.WorkerName,
}, nil
}
func removeUnusedWorkerTaskCaches(tx Tx, pipelineID int, jobConfigs []atc.JobConfig) error {
steps := make(map[string][]string)
for _, jobConfig := range jobConfigs {
for _, jobConfigPlan := range jobConfig.Plan {
if jobConfigPlan.Task != "" {
steps[jobConfig.Name] = append(steps[jobConfig.Name], jobConfigPlan.Task)
}
}
}
query := sq.Or{}
for jobName, stepNames := range steps {
query = append(query, sq.And{sq.Eq{"j.name": jobName}, sq.NotEq{"wtc.step_name": stepNames}})
}
_, err := psql.Delete("worker_task_caches wtc USING jobs j").
Where(sq.Or{
query,
sq.Eq{
"j.pipeline_id": pipelineID,
"j.active": false,
},
}).
Where(sq.Expr("j.id = wtc.job_id")).
RunWith(tx).
Exec()
return err
}

View File

@ -0,0 +1,82 @@
package db_test
import (
"github.com/concourse/concourse/atc/db"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("WorkerTaskCache", func() {
var workerTaskCache db.WorkerTaskCache
BeforeEach(func() {
taskCache, err := taskCacheFactory.FindOrCreate(1, "some-step", "some-path")
Expect(err).ToNot(HaveOccurred())
workerTaskCache = db.WorkerTaskCache{
WorkerName: defaultWorker.Name(),
TaskCache: taskCache,
}
})
Describe("FindOrCreate", func() {
Context("when there is no existing worker task cache", func() {
It("creates worker task cache", func() {
usedWorkerTaskCache, err := workerTaskCacheFactory.FindOrCreate(workerTaskCache)
Expect(err).ToNot(HaveOccurred())
Expect(usedWorkerTaskCache.ID).To(Equal(1))
})
})
Context("when there is existing worker task caches", func() {
BeforeEach(func() {
var err error
_, err = workerTaskCacheFactory.FindOrCreate(workerTaskCache)
Expect(err).ToNot(HaveOccurred())
})
It("finds worker task cache", func() {
usedWorkerTaskCache, err := workerTaskCacheFactory.FindOrCreate(workerTaskCache)
Expect(err).ToNot(HaveOccurred())
Expect(usedWorkerTaskCache.ID).To(Equal(1))
})
})
})
Describe("Find", func() {
var uwtc *db.UsedWorkerTaskCache
var found bool
var findErr error
JustBeforeEach(func() {
uwtc, found, findErr = workerTaskCacheFactory.Find(workerTaskCache)
})
Context("when there are no existing worker task caches", func() {
It("returns false and no error", func() {
Expect(findErr).ToNot(HaveOccurred())
Expect(found).To(BeFalse())
Expect(uwtc).To(BeNil())
})
})
Context("when there is existing worker task caches", func() {
var createdWorkerTaskCache *db.UsedWorkerTaskCache
BeforeEach(func() {
var err error
createdWorkerTaskCache, err = workerTaskCacheFactory.FindOrCreate(workerTaskCache)
Expect(err).ToNot(HaveOccurred())
})
It("finds worker task cache", func() {
Expect(findErr).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
Expect(uwtc.ID).To(Equal(createdWorkerTaskCache.ID))
})
})
})
})

View File

@ -470,7 +470,12 @@ func (action *TaskStep) registerOutputs(logger lager.Logger, repository *artifac
if volumeMount.MountPath == filepath.Join(action.artifactsRoot, cacheConfig.Path) {
logger.Debug("initializing-cache", lager.Data{"path": volumeMount.MountPath})
err := volumeMount.Volume.InitializeTaskCache(logger, action.jobID, action.stepName, cacheConfig.Path, bool(action.privileged))
err := volumeMount.Volume.InitializeTaskCache(
logger,
action.jobID,
action.stepName,
cacheConfig.Path,
bool(action.privileged))
if err != nil {
return err
}

View File

@ -22,6 +22,7 @@ type dbWorkerProvider struct {
dbResourceCacheFactory db.ResourceCacheFactory
dbResourceConfigFactory db.ResourceConfigFactory
dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory
dbTaskCacheFactory db.TaskCacheFactory
dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory
dbVolumeRepository db.VolumeRepository
dbTeamFactory db.TeamFactory
@ -37,6 +38,7 @@ func NewDBWorkerProvider(
dbResourceCacheFactory db.ResourceCacheFactory,
dbResourceConfigFactory db.ResourceConfigFactory,
dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory,
dbTaskCacheFactory db.TaskCacheFactory,
dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory,
dbVolumeRepository db.VolumeRepository,
dbTeamFactory db.TeamFactory,
@ -51,6 +53,7 @@ func NewDBWorkerProvider(
dbResourceCacheFactory: dbResourceCacheFactory,
dbResourceConfigFactory: dbResourceConfigFactory,
dbWorkerBaseResourceTypeFactory: dbWorkerBaseResourceTypeFactory,
dbTaskCacheFactory: dbTaskCacheFactory,
dbWorkerTaskCacheFactory: dbWorkerTaskCacheFactory,
dbVolumeRepository: dbVolumeRepository,
dbTeamFactory: dbTeamFactory,
@ -194,6 +197,7 @@ func (provider *dbWorkerProvider) NewGardenWorker(logger lager.Logger, tikTok cl
provider.lockFactory,
provider.dbVolumeRepository,
provider.dbWorkerBaseResourceTypeFactory,
provider.dbTaskCacheFactory,
provider.dbWorkerTaskCacheFactory,
)

View File

@ -51,6 +51,7 @@ var _ = Describe("DBProvider", func() {
fakeDBTeamFactory *dbfakes.FakeTeamFactory
fakeDBWorkerBaseResourceTypeFactory *dbfakes.FakeWorkerBaseResourceTypeFactory
fakeDBWorkerTaskCacheFactory *dbfakes.FakeWorkerTaskCacheFactory
fakeDBTaskCacheFactory *dbfakes.FakeTaskCacheFactory
fakeDBResourceCacheFactory *dbfakes.FakeResourceCacheFactory
fakeDBResourceConfigFactory *dbfakes.FakeResourceConfigFactory
fakeCreatingContainer *dbfakes.FakeCreatingContainer
@ -149,6 +150,7 @@ var _ = Describe("DBProvider", func() {
fakeDBResourceCacheFactory = new(dbfakes.FakeResourceCacheFactory)
fakeDBResourceConfigFactory = new(dbfakes.FakeResourceConfigFactory)
fakeDBWorkerBaseResourceTypeFactory = new(dbfakes.FakeWorkerBaseResourceTypeFactory)
fakeDBTaskCacheFactory = new(dbfakes.FakeTaskCacheFactory)
fakeDBWorkerTaskCacheFactory = new(dbfakes.FakeWorkerTaskCacheFactory)
fakeLock := new(lockfakes.FakeLock)
@ -167,6 +169,7 @@ var _ = Describe("DBProvider", func() {
fakeDBResourceCacheFactory,
fakeDBResourceConfigFactory,
fakeDBWorkerBaseResourceTypeFactory,
fakeDBTaskCacheFactory,
fakeDBWorkerTaskCacheFactory,
fakeDBVolumeRepository,
fakeDBTeamFactory,

View File

@ -25,7 +25,7 @@ type Volume interface {
COWStrategy() baggageclaim.COWStrategy
InitializeResourceCache(db.UsedResourceCache) error
InitializeTaskCache(lager.Logger, int, string, string, bool) error
InitializeTaskCache(logger lager.Logger, jobID int, stepName string, path string, privileged bool) error
InitializeArtifact(name string, buildID int) (db.WorkerArtifact, error)
CreateChildForContainer(db.CreatingContainer, string) (db.CreatingVolume, error)

View File

@ -105,6 +105,7 @@ type volumeClient struct {
lockFactory lock.LockFactory
dbVolumeRepository db.VolumeRepository
dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory
dbTaskCacheFactory db.TaskCacheFactory
dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory
clock clock.Clock
dbWorker db.Worker
@ -118,6 +119,7 @@ func NewVolumeClient(
lockFactory lock.LockFactory,
dbVolumeRepository db.VolumeRepository,
dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory,
dbTaskCacheFactory db.TaskCacheFactory,
dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory,
) VolumeClient {
return &volumeClient{
@ -125,6 +127,7 @@ func NewVolumeClient(
lockFactory: lockFactory,
dbVolumeRepository: dbVolumeRepository,
dbWorkerBaseResourceTypeFactory: dbWorkerBaseResourceTypeFactory,
dbTaskCacheFactory: dbTaskCacheFactory,
dbWorkerTaskCacheFactory: dbWorkerTaskCacheFactory,
clock: clock,
dbWorker: dbWorker,
@ -252,12 +255,19 @@ func (c *volumeClient) CreateVolumeForTaskCache(
stepName string,
path string,
) (Volume, error) {
taskCache, err := c.dbWorkerTaskCacheFactory.FindOrCreate(jobID, stepName, path, c.dbWorker.Name())
usedTaskCache, err := c.dbTaskCacheFactory.FindOrCreate(jobID, stepName, path)
if err != nil {
logger.Error("failed-to-find-or-create-task-cache-in-db", err)
return nil, err
}
workerTaskCache := db.WorkerTaskCache{
WorkerName: c.dbWorker.Name(),
TaskCache: usedTaskCache,
}
usedWorkerTaskCache, err := c.dbWorkerTaskCacheFactory.FindOrCreate(workerTaskCache)
return c.findOrCreateVolume(
logger.Session("find-or-create-volume-for-container"),
volumeSpec,
@ -265,7 +275,7 @@ func (c *volumeClient) CreateVolumeForTaskCache(
return nil, nil, nil
},
func() (db.CreatingVolume, error) {
return c.dbVolumeRepository.CreateTaskCacheVolume(teamID, taskCache)
return c.dbVolumeRepository.CreateTaskCacheVolume(teamID, usedWorkerTaskCache)
},
)
}
@ -316,7 +326,7 @@ func (c *volumeClient) FindVolumeForTaskCache(
stepName string,
path string,
) (Volume, bool, error) {
taskCache, found, err := c.dbWorkerTaskCacheFactory.Find(jobID, stepName, path, c.dbWorker.Name())
usedTaskCache, found, err := c.dbTaskCacheFactory.Find(jobID, stepName, path)
if err != nil {
logger.Error("failed-to-lookup-task-cache-in-db", err)
return nil, false, err
@ -326,13 +336,13 @@ func (c *volumeClient) FindVolumeForTaskCache(
return nil, false, nil
}
_, dbVolume, err := c.dbVolumeRepository.FindTaskCacheVolume(teamID, taskCache)
dbVolume, found, err := c.dbVolumeRepository.FindTaskCacheVolume(teamID, c.dbWorker.Name(), usedTaskCache)
if err != nil {
logger.Error("failed-to-lookup-tasl-cache-volume-in-db", err)
logger.Error("failed-to-lookup-task-cache-volume-in-db", err)
return nil, false, err
}
if dbVolume == nil {
if !found {
return nil, false, nil
}

View File

@ -29,6 +29,7 @@ var _ = Describe("VolumeClient", func() {
fakeDBVolumeRepository *dbfakes.FakeVolumeRepository
fakeWorkerBaseResourceTypeFactory *dbfakes.FakeWorkerBaseResourceTypeFactory
fakeWorkerTaskCacheFactory *dbfakes.FakeWorkerTaskCacheFactory
fakeTaskCacheFactory *dbfakes.FakeTaskCacheFactory
fakeClock *fakeclock.FakeClock
dbWorker *dbfakes.FakeWorker
@ -46,6 +47,7 @@ var _ = Describe("VolumeClient", func() {
fakeDBVolumeRepository = new(dbfakes.FakeVolumeRepository)
fakeWorkerBaseResourceTypeFactory = new(dbfakes.FakeWorkerBaseResourceTypeFactory)
fakeTaskCacheFactory = new(dbfakes.FakeTaskCacheFactory)
fakeWorkerTaskCacheFactory = new(dbfakes.FakeWorkerTaskCacheFactory)
fakeLock = new(lockfakes.FakeLock)
@ -57,6 +59,7 @@ var _ = Describe("VolumeClient", func() {
fakeLockFactory,
fakeDBVolumeRepository,
fakeWorkerBaseResourceTypeFactory,
fakeTaskCacheFactory,
fakeWorkerTaskCacheFactory,
)
})
@ -567,7 +570,8 @@ var _ = Describe("VolumeClient", func() {
Describe("FindVolumeForTaskCache", func() {
Context("when worker task cache does not exist", func() {
BeforeEach(func() {
fakeWorkerTaskCacheFactory.FindReturns(nil, false, nil)
// fakeWorkerTaskCacheFactory.FindReturns(nil, false, nil)
fakeTaskCacheFactory.FindReturns(nil, false, nil)
})
It("returns false", func() {
@ -589,7 +593,7 @@ var _ = Describe("VolumeClient", func() {
Context("when task cache volume does not exist in db", func() {
BeforeEach(func() {
fakeDBVolumeRepository.FindTaskCacheVolumeReturns(nil, nil, nil)
fakeDBVolumeRepository.FindTaskCacheVolumeReturns(nil, false, nil)
})
It("returns false", func() {
@ -604,7 +608,7 @@ var _ = Describe("VolumeClient", func() {
BeforeEach(func() {
dbVolume = new(dbfakes.FakeCreatedVolume)
fakeDBVolumeRepository.FindTaskCacheVolumeReturns(nil, dbVolume, nil)
fakeDBVolumeRepository.FindTaskCacheVolumeReturns(dbVolume, true, nil)
})
Context("when task cache volume does not exist in baggageclaim", func() {
@ -625,6 +629,7 @@ var _ = Describe("VolumeClient", func() {
BeforeEach(func() {
bcVolume = new(baggageclaimfakes.FakeVolume)
fakeBaggageclaimClient.LookupVolumeReturns(bcVolume, true, nil)
fakeTaskCacheFactory.FindReturns(nil, true, nil)
})
It("returns volume", func() {
@ -764,6 +769,7 @@ var _ = Describe("VolumeClient", func() {
fakeLockFactory,
fakeDBVolumeRepository,
fakeWorkerBaseResourceTypeFactory,
fakeTaskCacheFactory,
fakeWorkerTaskCacheFactory,
).LookupVolume(testLogger, handle)
})