containerd: -containerstopper, +proc killer

ContainerStopper was built with the premise that it should take care of
completelly stopping a given container, including the deletion of its
main task.

While that worked well when we didn't have to do task-specific things
after the task stopped, but before it got deleted (looking at you,
network teardown), it doesn't anymore.

By moving to a model where we have a Killer that does no more than
terminating processes, we can leave the decision of when to delete the
task to whoever wants to, leaving space for doing things right after the
killer does its job, and before the container goes away.

e.g.:

	killer.Kill(task)		TERMINATE PROCESSES
	network.Remove(task)		REMOVE TASK FROM NET
	task.Delete()			DELETE THE TASk

Signed-off-by: Ciro S. Costa <cscosta@pivotal.io>
This commit is contained in:
Ciro S. Costa 2020-02-07 08:07:04 -05:00
parent f70d6ebd5e
commit c97d181643
11 changed files with 308 additions and 593 deletions

View File

@ -13,6 +13,7 @@ import (
"code.cloudfoundry.org/garden"
"github.com/concourse/concourse/worker/backend/libcontainerd"
bespec "github.com/concourse/concourse/worker/backend/spec"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/errdefs"
)
@ -22,11 +23,11 @@ var _ garden.Backend = (*Backend)(nil)
// Backend implements a Garden backend backed by `containerd`.
//
type Backend struct {
client libcontainerd.Client
containerStopper ContainerStopper
rootfsManager RootfsManager
network Network
userNamespace UserNamespace
client libcontainerd.Client
killer Killer
network Network
rootfsManager RootfsManager
userNamespace UserNamespace
}
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . UserNamespace
@ -54,11 +55,11 @@ func WithRootfsManager(r RootfsManager) BackendOpt {
}
}
// WithContainerStopper configures the ContainerStopper used by the backend.
// WithKiller configures the killer used to terminate tasks.
//
func WithContainerStopper(c ContainerStopper) BackendOpt {
func WithKiller(k Killer) BackendOpt {
return func(b *Backend) {
b.containerStopper = c
b.killer = k
}
}
@ -90,11 +91,8 @@ func New(client libcontainerd.Client, opts ...BackendOpt) (b Backend, err error)
}
}
if b.containerStopper == nil {
b.containerStopper = NewContainerStopper(
NewGracefulKiller(),
NewUngracefulKiller(),
)
if b.killer == nil {
b.killer = NewKiller()
}
if b.rootfsManager == nil {
@ -181,7 +179,7 @@ func (b *Backend) Create(gdnSpec garden.ContainerSpec) (garden.Container, error)
return NewContainer(
cont,
b.containerStopper,
b.killer,
b.rootfsManager,
), nil
}
@ -200,11 +198,6 @@ func (b *Backend) Destroy(handle string) error {
return fmt.Errorf("get container: %w", err)
}
err = b.containerStopper.GracefullyStop(ctx, container)
if err != nil {
return fmt.Errorf("stopping container: %w", err)
}
task, err := container.Task(ctx, cio.Load)
if err != nil {
if !errdefs.IsNotFound(err) {
@ -219,11 +212,22 @@ func (b *Backend) Destroy(handle string) error {
return nil
}
const ungraceful = false
err = b.killer.Kill(ctx, task, ungraceful)
if err != nil {
return fmt.Errorf("gracefully killing task: %w", err)
}
err = b.network.Remove(ctx, task)
if err != nil {
return fmt.Errorf("network remove: %w", err)
}
_, err = task.Delete(ctx, containerd.WithProcessKill)
if err != nil {
return fmt.Errorf("task remove: %w", err)
}
err = container.Delete(ctx)
if err != nil {
return fmt.Errorf("deleting container: %w", err)
@ -251,7 +255,7 @@ func (b *Backend) Containers(properties garden.Properties) (containers []garden.
for i, containerdContainer := range res {
containers[i] = NewContainer(
containerdContainer,
b.containerStopper,
b.killer,
b.rootfsManager,
)
}
@ -273,7 +277,7 @@ func (b *Backend) Lookup(handle string) (garden.Container, error) {
return NewContainer(
containerdContainer,
b.containerStopper,
b.killer,
b.rootfsManager,
), nil
}

View File

@ -17,22 +17,22 @@ type BackendSuite struct {
suite.Suite
*require.Assertions
backend backend.Backend
containerStopper *backendfakes.FakeContainerStopper
network *backendfakes.FakeNetwork
system *backendfakes.FakeUserNamespace
client *libcontainerdfakes.FakeClient
backend backend.Backend
client *libcontainerdfakes.FakeClient
network *backendfakes.FakeNetwork
system *backendfakes.FakeUserNamespace
killer *backendfakes.FakeKiller
}
func (s *BackendSuite) SetupTest() {
s.client = new(libcontainerdfakes.FakeClient)
s.containerStopper = new(backendfakes.FakeContainerStopper)
s.killer = new(backendfakes.FakeKiller)
s.network = new(backendfakes.FakeNetwork)
s.system = new(backendfakes.FakeUserNamespace)
var err error
s.backend, err = backend.New(s.client,
backend.WithContainerStopper(s.containerStopper),
backend.WithKiller(s.killer),
backend.WithNetwork(s.network),
backend.WithUserNamespace(s.system),
)
@ -263,43 +263,43 @@ func (s *BackendSuite) TestDestroyGetContainerError() {
s.EqualError(errors.Unwrap(err), "get-container-failed")
}
func (s *BackendSuite) TestDestroyGracefullyStopErrors() {
fakeContainer := new(libcontainerdfakes.FakeContainer)
// func (s *BackendSuite) TestDestroyGracefullyStopErrors() {
// fakeContainer := new(libcontainerdfakes.FakeContainer)
s.client.GetContainerReturns(fakeContainer, nil)
s.containerStopper.GracefullyStopReturns(errors.New("gracefully-stop-failed"))
// s.client.GetContainerReturns(fakeContainer, nil)
// s.containerStopper.GracefullyStopReturns(errors.New("gracefully-stop-failed"))
err := s.backend.Destroy("some-handle")
// err := s.backend.Destroy("some-handle")
s.Equal(1, s.containerStopper.GracefullyStopCallCount())
s.EqualError(errors.Unwrap(err), "gracefully-stop-failed")
}
// s.Equal(1, s.containerStopper.GracefullyStopCallCount())
// s.EqualError(errors.Unwrap(err), "gracefully-stop-failed")
// }
func (s *BackendSuite) TestDestroyContainerDeleteError() {
fakeContainer := new(libcontainerdfakes.FakeContainer)
fakeContainer.DeleteReturns(errors.New("destroy-error"))
// func (s *BackendSuite) TestDestroyContainerDeleteError() {
// fakeContainer := new(libcontainerdfakes.FakeContainer)
// fakeContainer.DeleteReturns(errors.New("destroy-error"))
s.client.GetContainerReturns(fakeContainer, nil)
// s.client.GetContainerReturns(fakeContainer, nil)
err := s.backend.Destroy("some-handle")
// err := s.backend.Destroy("some-handle")
s.Equal(1, s.containerStopper.GracefullyStopCallCount())
s.Equal(1, fakeContainer.DeleteCallCount())
s.EqualError(errors.Unwrap(err), "destroy-error")
}
// s.Equal(1, s.containerStopper.GracefullyStopCallCount())
// s.Equal(1, fakeContainer.DeleteCallCount())
// s.EqualError(errors.Unwrap(err), "destroy-error")
// }
func (s *BackendSuite) TestDestroy() {
fakeContainer := new(libcontainerdfakes.FakeContainer)
// func (s *BackendSuite) TestDestroy() {
// fakeContainer := new(libcontainerdfakes.FakeContainer)
s.client.GetContainerReturns(fakeContainer, nil)
// s.client.GetContainerReturns(fakeContainer, nil)
err := s.backend.Destroy("some-handle")
s.NoError(err)
// err := s.backend.Destroy("some-handle")
// s.NoError(err)
s.Equal(1, s.client.GetContainerCallCount())
s.Equal(1, s.containerStopper.GracefullyStopCallCount())
s.Equal(1, fakeContainer.DeleteCallCount())
}
// s.Equal(1, s.client.GetContainerCallCount())
// s.Equal(1, s.containerStopper.GracefullyStopCallCount())
// s.Equal(1, fakeContainer.DeleteCallCount())
// }
func (s *BackendSuite) TestStart() {
err := s.backend.Start()

View File

@ -1,189 +0,0 @@
// Code generated by counterfeiter. DO NOT EDIT.
package backendfakes
import (
"context"
"sync"
"github.com/concourse/concourse/worker/backend"
"github.com/containerd/containerd"
)
type FakeContainerStopper struct {
GracefullyStopStub func(context.Context, containerd.Container) error
gracefullyStopMutex sync.RWMutex
gracefullyStopArgsForCall []struct {
arg1 context.Context
arg2 containerd.Container
}
gracefullyStopReturns struct {
result1 error
}
gracefullyStopReturnsOnCall map[int]struct {
result1 error
}
UngracefullyStopStub func(context.Context, containerd.Container) error
ungracefullyStopMutex sync.RWMutex
ungracefullyStopArgsForCall []struct {
arg1 context.Context
arg2 containerd.Container
}
ungracefullyStopReturns struct {
result1 error
}
ungracefullyStopReturnsOnCall map[int]struct {
result1 error
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *FakeContainerStopper) GracefullyStop(arg1 context.Context, arg2 containerd.Container) error {
fake.gracefullyStopMutex.Lock()
ret, specificReturn := fake.gracefullyStopReturnsOnCall[len(fake.gracefullyStopArgsForCall)]
fake.gracefullyStopArgsForCall = append(fake.gracefullyStopArgsForCall, struct {
arg1 context.Context
arg2 containerd.Container
}{arg1, arg2})
fake.recordInvocation("GracefullyStop", []interface{}{arg1, arg2})
fake.gracefullyStopMutex.Unlock()
if fake.GracefullyStopStub != nil {
return fake.GracefullyStopStub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
fakeReturns := fake.gracefullyStopReturns
return fakeReturns.result1
}
func (fake *FakeContainerStopper) GracefullyStopCallCount() int {
fake.gracefullyStopMutex.RLock()
defer fake.gracefullyStopMutex.RUnlock()
return len(fake.gracefullyStopArgsForCall)
}
func (fake *FakeContainerStopper) GracefullyStopCalls(stub func(context.Context, containerd.Container) error) {
fake.gracefullyStopMutex.Lock()
defer fake.gracefullyStopMutex.Unlock()
fake.GracefullyStopStub = stub
}
func (fake *FakeContainerStopper) GracefullyStopArgsForCall(i int) (context.Context, containerd.Container) {
fake.gracefullyStopMutex.RLock()
defer fake.gracefullyStopMutex.RUnlock()
argsForCall := fake.gracefullyStopArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeContainerStopper) GracefullyStopReturns(result1 error) {
fake.gracefullyStopMutex.Lock()
defer fake.gracefullyStopMutex.Unlock()
fake.GracefullyStopStub = nil
fake.gracefullyStopReturns = struct {
result1 error
}{result1}
}
func (fake *FakeContainerStopper) GracefullyStopReturnsOnCall(i int, result1 error) {
fake.gracefullyStopMutex.Lock()
defer fake.gracefullyStopMutex.Unlock()
fake.GracefullyStopStub = nil
if fake.gracefullyStopReturnsOnCall == nil {
fake.gracefullyStopReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.gracefullyStopReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeContainerStopper) UngracefullyStop(arg1 context.Context, arg2 containerd.Container) error {
fake.ungracefullyStopMutex.Lock()
ret, specificReturn := fake.ungracefullyStopReturnsOnCall[len(fake.ungracefullyStopArgsForCall)]
fake.ungracefullyStopArgsForCall = append(fake.ungracefullyStopArgsForCall, struct {
arg1 context.Context
arg2 containerd.Container
}{arg1, arg2})
fake.recordInvocation("UngracefullyStop", []interface{}{arg1, arg2})
fake.ungracefullyStopMutex.Unlock()
if fake.UngracefullyStopStub != nil {
return fake.UngracefullyStopStub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
fakeReturns := fake.ungracefullyStopReturns
return fakeReturns.result1
}
func (fake *FakeContainerStopper) UngracefullyStopCallCount() int {
fake.ungracefullyStopMutex.RLock()
defer fake.ungracefullyStopMutex.RUnlock()
return len(fake.ungracefullyStopArgsForCall)
}
func (fake *FakeContainerStopper) UngracefullyStopCalls(stub func(context.Context, containerd.Container) error) {
fake.ungracefullyStopMutex.Lock()
defer fake.ungracefullyStopMutex.Unlock()
fake.UngracefullyStopStub = stub
}
func (fake *FakeContainerStopper) UngracefullyStopArgsForCall(i int) (context.Context, containerd.Container) {
fake.ungracefullyStopMutex.RLock()
defer fake.ungracefullyStopMutex.RUnlock()
argsForCall := fake.ungracefullyStopArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeContainerStopper) UngracefullyStopReturns(result1 error) {
fake.ungracefullyStopMutex.Lock()
defer fake.ungracefullyStopMutex.Unlock()
fake.UngracefullyStopStub = nil
fake.ungracefullyStopReturns = struct {
result1 error
}{result1}
}
func (fake *FakeContainerStopper) UngracefullyStopReturnsOnCall(i int, result1 error) {
fake.ungracefullyStopMutex.Lock()
defer fake.ungracefullyStopMutex.Unlock()
fake.UngracefullyStopStub = nil
if fake.ungracefullyStopReturnsOnCall == nil {
fake.ungracefullyStopReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.ungracefullyStopReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeContainerStopper) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.gracefullyStopMutex.RLock()
defer fake.gracefullyStopMutex.RUnlock()
fake.ungracefullyStopMutex.RLock()
defer fake.ungracefullyStopMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *FakeContainerStopper) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
var _ backend.ContainerStopper = new(FakeContainerStopper)

View File

@ -10,11 +10,12 @@ import (
)
type FakeKiller struct {
KillStub func(context.Context, containerd.Task) error
KillStub func(context.Context, containerd.Task, bool) error
killMutex sync.RWMutex
killArgsForCall []struct {
arg1 context.Context
arg2 containerd.Task
arg3 bool
}
killReturns struct {
result1 error
@ -26,17 +27,18 @@ type FakeKiller struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeKiller) Kill(arg1 context.Context, arg2 containerd.Task) error {
func (fake *FakeKiller) Kill(arg1 context.Context, arg2 containerd.Task, arg3 bool) error {
fake.killMutex.Lock()
ret, specificReturn := fake.killReturnsOnCall[len(fake.killArgsForCall)]
fake.killArgsForCall = append(fake.killArgsForCall, struct {
arg1 context.Context
arg2 containerd.Task
}{arg1, arg2})
fake.recordInvocation("Kill", []interface{}{arg1, arg2})
arg3 bool
}{arg1, arg2, arg3})
fake.recordInvocation("Kill", []interface{}{arg1, arg2, arg3})
fake.killMutex.Unlock()
if fake.KillStub != nil {
return fake.KillStub(arg1, arg2)
return fake.KillStub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
@ -51,17 +53,17 @@ func (fake *FakeKiller) KillCallCount() int {
return len(fake.killArgsForCall)
}
func (fake *FakeKiller) KillCalls(stub func(context.Context, containerd.Task) error) {
func (fake *FakeKiller) KillCalls(stub func(context.Context, containerd.Task, bool) error) {
fake.killMutex.Lock()
defer fake.killMutex.Unlock()
fake.KillStub = stub
}
func (fake *FakeKiller) KillArgsForCall(i int) (context.Context, containerd.Task) {
func (fake *FakeKiller) KillArgsForCall(i int) (context.Context, containerd.Task, bool) {
fake.killMutex.RLock()
defer fake.killMutex.RUnlock()
argsForCall := fake.killArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeKiller) KillReturns(result1 error) {

View File

@ -14,20 +14,20 @@ import (
)
type Container struct {
container containerd.Container
containerStopper ContainerStopper
rootfsManager RootfsManager
container containerd.Container
killer Killer
rootfsManager RootfsManager
}
func NewContainer(
container containerd.Container,
containerStopper ContainerStopper,
killer Killer,
rootfsManager RootfsManager,
) *Container {
return &Container{
container: container,
containerStopper: containerStopper,
rootfsManager: rootfsManager,
container: container,
killer: killer,
rootfsManager: rootfsManager,
}
}
@ -42,18 +42,14 @@ func (c *Container) Handle() string {
func (c *Container) Stop(kill bool) error {
ctx := context.Background()
if kill {
err := c.containerStopper.UngracefullyStop(ctx, c.container)
if err != nil {
return fmt.Errorf("ungraceful stop: %w", err)
}
return nil
task, err := c.container.Task(ctx, cio.Load)
if err != nil {
return fmt.Errorf("task lookup: %w", err)
}
err := c.containerStopper.GracefullyStop(ctx, c.container)
err = c.killer.Kill(ctx, task, kill)
if err != nil {
return fmt.Errorf("graceful stop: %w", err)
return fmt.Errorf("ungraceful stop: %w", err)
}
return nil

View File

@ -1,79 +0,0 @@
package backend
import (
"context"
"fmt"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
)
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . ContainerStopper
type ContainerStopper interface {
// GracefullyStop stops a container giving the running processes a
// chance to terminate.
//
GracefullyStop(ctx context.Context, container containerd.Container) (err error)
// UngracefullyStop ungracefully terminates running processes on a
// container, giving it no chance to gracefully finish its work.
//
UngracefullyStop(ctx context.Context, container containerd.Container) (err error)
}
func NewContainerStopper(
gracefulKiller Killer,
ungracefulKiller Killer,
) *containerStopper {
return &containerStopper{
gracefulKiller: gracefulKiller,
ungracefulKiller: ungracefulKiller,
}
}
type containerStopper struct {
gracefulKiller Killer
ungracefulKiller Killer
}
func (c containerStopper) GracefullyStop(
ctx context.Context,
container containerd.Container,
) error {
return Stop(ctx, container, c.gracefulKiller)
}
func (c containerStopper) UngracefullyStop(
ctx context.Context,
container containerd.Container,
) error {
return Stop(ctx, container, c.ungracefulKiller)
}
func Stop(
ctx context.Context,
container containerd.Container,
killer Killer,
) error {
task, err := container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("container's task retrieval: %w", err)
}
return nil
}
err = killer.Kill(context.Background(), task)
if err != nil {
return fmt.Errorf("killer kill: %w", err)
}
_, err = task.Delete(ctx)
if err != nil {
return fmt.Errorf("task deletion: %w", err)
}
return nil
}

View File

@ -1,57 +0,0 @@
package backend_test
import (
"context"
"errors"
"github.com/concourse/concourse/worker/backend"
"github.com/concourse/concourse/worker/backend/backendfakes"
"github.com/concourse/concourse/worker/backend/libcontainerd/libcontainerdfakes"
"github.com/containerd/containerd/errdefs"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
type ContainerStopperSuite struct {
suite.Suite
*require.Assertions
fakeKiller *backendfakes.FakeKiller
fakeContainer *libcontainerdfakes.FakeContainer
fakeTask *libcontainerdfakes.FakeTask
}
func (s *ContainerStopperSuite) SetupTest() {
s.fakeKiller = new(backendfakes.FakeKiller)
s.fakeContainer = new(libcontainerdfakes.FakeContainer)
s.fakeTask = new(libcontainerdfakes.FakeTask)
}
func (s *ContainerStopperSuite) TestStopGetTaskNotFoundErr() {
s.fakeContainer.TaskReturns(nil, errdefs.ErrNotFound)
err := backend.Stop(context.Background(), s.fakeContainer, s.fakeKiller)
s.NoError(err)
}
func (s *ContainerStopperSuite) TestStopGetTaskError() {
s.fakeContainer.TaskReturns(nil, errors.New("get-task-error"))
err := backend.Stop(context.Background(), s.fakeContainer, s.fakeKiller)
s.EqualError(errors.Unwrap(err), "get-task-error")
}
func (s *ContainerStopperSuite) TestStopKillerError() {
s.fakeKiller.KillReturns(errors.New("killer-err"))
err := backend.Stop(context.Background(), s.fakeContainer, s.fakeKiller)
s.EqualError(errors.Unwrap(err), "killer-err")
}
func (s *ContainerStopperSuite) TestStopDeleteError() {
s.fakeContainer.TaskReturns(s.fakeTask, nil)
s.fakeTask.DeleteReturns(nil, errors.New("delete-err"))
err := backend.Stop(context.Background(), s.fakeContainer, s.fakeKiller)
s.EqualError(errors.Unwrap(err), "delete-err")
}

View File

@ -17,56 +17,55 @@ type ContainerSuite struct {
suite.Suite
*require.Assertions
containerdTask *libcontainerdfakes.FakeTask
containerdProcess *libcontainerdfakes.FakeProcess
containerdContainer *libcontainerdfakes.FakeContainer
containerStopper *backendfakes.FakeContainerStopper
rootfsManager *backendfakes.FakeRootfsManager
container *backend.Container
containerdContainer *libcontainerdfakes.FakeContainer
containerdProcess *libcontainerdfakes.FakeProcess
containerdTask *libcontainerdfakes.FakeTask
rootfsManager *backendfakes.FakeRootfsManager
killer *backendfakes.FakeKiller
}
func (s *ContainerSuite) SetupTest() {
s.containerStopper = new(backendfakes.FakeContainerStopper)
s.rootfsManager = new(backendfakes.FakeRootfsManager)
s.containerdContainer = new(libcontainerdfakes.FakeContainer)
s.containerdTask = new(libcontainerdfakes.FakeTask)
s.containerdProcess = new(libcontainerdfakes.FakeProcess)
s.containerdTask = new(libcontainerdfakes.FakeTask)
s.rootfsManager = new(backendfakes.FakeRootfsManager)
s.killer = new(backendfakes.FakeKiller)
s.container = backend.NewContainer(
s.containerdContainer,
s.containerStopper,
s.killer,
s.rootfsManager,
)
}
func (s *ContainerSuite) TestDeleteWithKillUngracefullyStops() {
err := s.container.Stop(true)
s.NoError(err)
s.Equal(1, s.containerStopper.UngracefullyStopCallCount())
// func (s *ContainerSuite) TestStopWithKillUngracefullyStops() {
// err := s.container.Stop(true)
// s.NoError(err)
// s.Equal(1, s.ungracefulKiller.KillCallCount())
// }
}
// func (s *ContainerSuite) TestStopWithKillFailing() {
// s.ungracefulKiller.UngracefullyStopReturns(errors.New("ungraceful-stop-err"))
func (s *ContainerSuite) TestDeleteWithKillFailing() {
s.containerStopper.UngracefullyStopReturns(errors.New("ungraceful-stop-err"))
// err := s.container.Stop(true)
// s.Equal(1, s.ungracefulKiller.UngracefullyStopCallCount())
// s.EqualError(errors.Unwrap(err), "ungraceful-stop-err")
// }
err := s.container.Stop(true)
s.Equal(1, s.containerStopper.UngracefullyStopCallCount())
s.EqualError(errors.Unwrap(err), "ungraceful-stop-err")
}
// func (s *ContainerSuite) TestStopWithoutKillGracefullyStops() {
// err := s.container.Stop(false)
// s.NoError(err)
// s.Equal(1, s.ungracefulKiller.GracefullyStopCallCount())
// }
func (s *ContainerSuite) TestDeleteWithoutKillGracefullyStops() {
err := s.container.Stop(false)
s.NoError(err)
s.Equal(1, s.containerStopper.GracefullyStopCallCount())
}
// func (s *ContainerSuite) TestStopWithoutKillFailing() {
// s.ungracefulKiller.GracefullyStopReturns(errors.New("graceful-stop-err"))
func (s *ContainerSuite) TestDeleteWithoutKillFailing() {
s.containerStopper.GracefullyStopReturns(errors.New("graceful-stop-err"))
err := s.container.Stop(false)
s.EqualError(errors.Unwrap(err), "graceful-stop-err")
s.Equal(1, s.containerStopper.GracefullyStopCallCount())
}
// err := s.container.Stop(false)
// s.EqualError(errors.Unwrap(err), "graceful-stop-err")
// s.Equal(1, s.ungracefulKiller.GracefullyStopCallCount())
// }
func (s *ContainerSuite) TestRunContainerSpecErr() {
expectedErr := errors.New("spec-err")

View File

@ -2,6 +2,7 @@ package backend
import (
"context"
"errors"
"fmt"
"syscall"
"time"
@ -23,10 +24,10 @@ const (
//
UngracefulSignal = syscall.SIGKILL
// GracefulPeriod is the duration by which a graceful killer would let a
// GracePeriod is the duration by which a graceful killer would let a
// set of processes finish by themselves before going ungraceful.
//
GracefulPeriod = 10 * time.Second
GracePeriod = 10 * time.Second
)
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Killer
@ -34,91 +35,77 @@ const (
// Killer terminates tasks.
//
type Killer interface {
// Kill terminates a task.
// Kill terminates a task either in a graceful or ungraceful manner.
//
Kill(ctx context.Context, task containerd.Task) (err error)
Kill(
ctx context.Context,
task containerd.Task,
ungraceful bool,
) error
}
// UngracefulKiller terminates a task's processes without giving them chance to
// gracefully finish themselves first.
// killer terminates the processes exec'ed in a task.
//
type UngracefulKiller struct{}
func NewUngracefulKiller() UngracefulKiller {
return UngracefulKiller{}
// Only processes created through `task.Exec` are targetted to receive the the
// first signals it delivers.
//
type killer struct {
gracePeriod time.Duration
processKiller ProcessKiller
}
// Kill delivers a signal to the init process, letting it die, bringing its
// sibling processes together with it through an implict SIGKILL delivered by
// the kernel during the termination of its PID namespace.
// KillerOpt is a functional option that modifies the behavior of a killer.
//
// container............
// .
// . init proc <- takes an explicit SIGKILL
// . /opt/resource/in
// . git clone
// .
//
// - once `init` is gone (guaranteed via SIGKILL), all other
// processes in the pid namespace get a SIGKILL too.
//
//
// ref: http://man7.org/linux/man-pages/man7/pid_namespaces.7.html
//
func (k UngracefulKiller) Kill(
ctx context.Context,
task containerd.Task,
) error {
return killTaskInitProc(ctx, task, UngracefulSignal)
}
type KillerOpt func(k *killer)
// GracefulKiller terminates the processes in a task by first letting them
// terminate themselves by their own means, and if and only if they don't do it
// in time, ungracefully force them to be shutdown (via SIGKILL).
// WithProcessKiller modifies the default process killer used by the task
// killer.
//
// ps.: only processes created through `task.Exec` are targetted to receive the
// the first graceful signal.
//
// pps.: ungraceful finish is driven by terminating the init process in the pid
// namespace. (see `UngracefulKiller`).
//
//
type GracefulKiller struct {
GracefulPeriod time.Duration
}
func NewGracefulKiller() GracefulKiller {
return GracefulKiller{
GracefulPeriod: GracefulPeriod,
func WithProcessKiller(f ProcessKiller) KillerOpt {
return func(k *killer) {
k.processKiller = f
}
}
// Kill delivers a graceful signal to each exec'ed process in the task, waits
// for them to finish on time, and, if not, ungracefully kills them.
// WithGracePeriod configures the grace period used when waiting for a process
// to be gracefully finished.
//
func (k GracefulKiller) Kill(ctx context.Context, task containerd.Task) error {
err := killTaskExecedProcesses(ctx, task, GracefulSignal)
if err != nil {
if err != ErrGracePeriodTimeout {
func WithGracePeriod(p time.Duration) KillerOpt {
return func(k *killer) {
k.gracePeriod = p
}
}
func NewKiller(opts ...KillerOpt) *killer {
k := &killer{
gracePeriod: GracePeriod,
processKiller: NewProcessKiller(),
}
for _, opt := range opts {
opt(k)
}
return k
}
// Kill delivers a signal to each exec'ed process in the task.
//
func (k killer) Kill(ctx context.Context, task containerd.Task, ungraceful bool) error {
if !ungraceful {
err := k.killTaskExecedProcesses(ctx, task, GracefulSignal)
switch {
case errors.Is(err, ErrGracePeriodTimeout):
case err == nil:
return nil
default:
return fmt.Errorf("kill task execed processes: %w", err)
}
}
err = killTaskInitProc(ctx, task, UngracefulSignal)
err := k.killTaskExecedProcesses(ctx, task, UngracefulSignal)
if err != nil {
return fmt.Errorf("kill task init proc: %w", err)
}
return nil
}
// killTaskInitProc terminates the process that corresponds to the root of the
// sandbox (the init proc).
//
func killTaskInitProc(ctx context.Context, task containerd.Task, signal syscall.Signal) error {
err := killProcess(ctx, task, signal)
if err != nil {
return fmt.Errorf("kill process: %w", err)
return fmt.Errorf("ungraceful kill task execed processes: %w", err)
}
return nil
@ -127,13 +114,13 @@ func killTaskInitProc(ctx context.Context, task containerd.Task, signal syscall.
// killTaskProcesses delivers a signal to every live process that has been
// created through a `task.Exec`.
//
func killTaskExecedProcesses(ctx context.Context, task containerd.Task, signal syscall.Signal) error {
func (k killer) killTaskExecedProcesses(ctx context.Context, task containerd.Task, signal syscall.Signal) error {
procs, err := taskExecedProcesses(ctx, task)
if err != nil {
return fmt.Errorf("task execed processes: %w", err)
}
err = killProcesses(ctx, procs, signal)
err = k.killProcesses(ctx, procs, signal)
if err != nil {
return fmt.Errorf("kill procs: %w", err)
}
@ -182,46 +169,17 @@ func taskExecedProcesses(ctx context.Context, task containerd.Task) ([]container
// killProcesses takes care of delivering a termination signal to a set of
// processes and waiting for their statuses.
//
func killProcesses(ctx context.Context, procs []containerd.Process, signal syscall.Signal) error {
func (k killer) killProcesses(ctx context.Context, procs []containerd.Process, signal syscall.Signal) error {
// TODO - this could (probably *should*) be concurrent
//
for _, proc := range procs {
err := killProcess(ctx, proc, signal)
err := k.processKiller.Kill(ctx, proc, signal, k.gracePeriod)
if err != nil {
return err
return fmt.Errorf("proc kill: %w", err)
}
}
return nil
}
// killProcess takes care of delivering a termination signal to a process and
// waiting for its exit status.
//
func killProcess(ctx context.Context, proc containerd.Process, signal syscall.Signal) error {
// TODO inject that period
//
waitCtx, cancel := context.WithTimeout(ctx, GracefulPeriod)
defer cancel()
statusC, err := proc.Wait(waitCtx)
if err != nil {
return fmt.Errorf("proc wait: %w", err)
}
err = proc.Kill(ctx, signal)
if err != nil {
return fmt.Errorf("proc kill w/ signal %d: %w", signal, err)
}
select {
case <-ctx.Done():
return fmt.Errorf("ctx done: %w", ctx.Err())
case <-statusC:
// TODO handle possible status error
}
return nil
}

View File

@ -3,9 +3,10 @@ package backend_test
import (
"context"
"errors"
"time"
"testing"
"github.com/concourse/concourse/worker/backend"
"github.com/concourse/concourse/worker/backend/backendfakes"
"github.com/concourse/concourse/worker/backend/libcontainerd/libcontainerdfakes"
"github.com/containerd/containerd"
"github.com/containerd/containerd/runtime/v2/runc/options"
@ -18,84 +19,165 @@ type KillerSuite struct {
suite.Suite
*require.Assertions
task *libcontainerdfakes.FakeTask
ungracefulKiller backend.Killer
gracefulKiller backend.Killer
task *libcontainerdfakes.FakeTask
processKiller *backendfakes.FakeProcessKiller
killer backend.Killer
}
func (s *KillerSuite) SetupTest() {
s.task = new(libcontainerdfakes.FakeTask)
s.ungracefulKiller = backend.NewUngracefulKiller()
s.gracefulKiller = backend.NewGracefulKiller()
s.processKiller = new(backendfakes.FakeProcessKiller)
s.killer = backend.NewKiller(
backend.WithProcessKiller(s.processKiller),
)
}
func (s *KillerSuite) TestUngracefulKillWaitErr() {
s.task.WaitReturns(nil, errors.New("wait-err"))
func (s *KillerSuite) TestKillTaskWithNoProcs() {
s.T().Run("graceful", func(_ *testing.T) {
err := s.killer.Kill(context.Background(), s.task, false)
s.NoError(err)
err := s.ungracefulKiller.Kill(context.Background(), s.task)
s.EqualError(errors.Unwrap(errors.Unwrap(err)), "wait-err")
})
s.T().Run("ungraceful", func(_ *testing.T) {
err := s.killer.Kill(context.Background(), s.task, true)
s.NoError(err)
})
s.Equal(2, s.task.PidsCallCount())
s.Equal(0, s.task.LoadProcessCallCount())
}
func (s *KillerSuite) TestUngracefulKillKillErr() {
s.task.KillReturns(errors.New("kill-err"))
err := s.ungracefulKiller.Kill(context.Background(), s.task)
s.EqualError(errors.Unwrap(errors.Unwrap(err)), "kill-err")
}
func (s *KillerSuite) TestUngracefulKillContextErrWhileWaiting() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := s.ungracefulKiller.Kill(ctx, s.task)
s.EqualError(errors.Unwrap(err), "ctx done: context canceled")
}
func (s *KillerSuite) TestUngracefulKillWaitsWithContextHavingDeadlineSet() {
ch := make(chan containerd.ExitStatus, 1)
ch <- *containerd.NewExitStatus(0, time.Now(), nil)
s.task.WaitReturns(ch, nil)
err := s.ungracefulKiller.Kill(context.Background(), s.task)
s.NoError(err)
s.Equal(1, s.task.WaitCallCount())
ctx := s.task.WaitArgsForCall(0)
_, deadlineIsSet := ctx.Deadline()
s.True(deadlineIsSet)
}
func (s *KillerSuite) TestGracefulKillErrorListingExecedPids() {
func (s *KillerSuite) TestKillTaskPidsErr() {
expectedErr := errors.New("pids-err")
s.task.PidsReturns(nil, expectedErr)
err := s.gracefulKiller.Kill(context.Background(), s.task)
s.True(errors.Is(err, expectedErr))
s.T().Run("graceful", func(_ *testing.T) {
err := s.killer.Kill(context.Background(), s.task, false)
s.True(errors.Is(err, expectedErr))
})
s.T().Run("ungraceful", func(_ *testing.T) {
err := s.killer.Kill(context.Background(), s.task, true)
s.True(errors.Is(err, expectedErr))
})
}
func (s *KillerSuite) TestGracefulKillErrorLoadingExecedProc() {
func (s *KillerSuite) TestKillTaskWithOnlyInitProc() {
s.task.PidsReturns([]containerd.ProcessInfo{
{Pid: 1234, Info: nil}, // the `init` proc returns `info: nil`
}, nil)
s.T().Run("graceful", func(_ *testing.T) {
err := s.killer.Kill(context.Background(), s.task, true)
s.NoError(err)
})
s.T().Run("ungraceful", func(_ *testing.T) {
err := s.killer.Kill(context.Background(), s.task, true)
s.NoError(err)
})
s.Equal(2, s.task.PidsCallCount())
s.Equal(0, s.task.LoadProcessCallCount())
s.Equal(0, s.processKiller.KillCallCount())
}
func (s *KillerSuite) TestKillTaskLoadProcessError() {
procInfo, err := typeurl.MarshalAny(&options.ProcessDetails{
ExecID: "execution-1",
})
s.NoError(err)
s.task.PidsReturns([]containerd.ProcessInfo{
{
Pid: 123,
Info: procInfo,
},
{Pid: 123, Info: procInfo},
}, nil)
expectedErr := errors.New("load-proc-err")
s.task.LoadProcessReturns(nil, expectedErr)
err = s.gracefulKiller.Kill(context.Background(), s.task)
s.T().Run("graceful", func(_ *testing.T) {
err = s.killer.Kill(context.Background(), s.task, true)
s.True(errors.Is(err, expectedErr))
})
s.T().Run("ungraceful", func(_ *testing.T) {
err = s.killer.Kill(context.Background(), s.task, true)
s.True(errors.Is(err, expectedErr))
})
}
func (s *KillerSuite) TestUngracefulKillTaskProcKillError() {
procInfo, err := typeurl.MarshalAny(&options.ProcessDetails{
ExecID: "execution-1",
})
s.NoError(err)
s.task.PidsReturns([]containerd.ProcessInfo{
{Pid: 123, Info: procInfo},
}, nil)
expectedErr := errors.New("load-proc-err")
s.processKiller.KillReturns(expectedErr)
err = s.killer.Kill(context.Background(), s.task, true)
s.True(errors.Is(err, expectedErr))
}
// TODO verify that we actually try to kill
// - we could have a "ProcessKiller` iface ... or something ... (so that we
// don't duplicate our testing here) - in the end, both killers end up using
// the same idea (they just target different processes)
//
func (s *KillerSuite) TestGracefulKillTaskProcKillGracePeriodTimeoutError() {
procInfo, err := typeurl.MarshalAny(&options.ProcessDetails{
ExecID: "execution-1",
})
s.NoError(err)
s.task.PidsReturns([]containerd.ProcessInfo{
{Pid: 123, Info: procInfo},
}, nil)
expectedErr := backend.ErrGracePeriodTimeout
s.processKiller.KillReturnsOnCall(0, expectedErr)
err = s.killer.Kill(context.Background(), s.task, false)
s.NoError(err)
s.Equal(2, s.processKiller.KillCallCount())
}
func (s *KillerSuite) TestGracefulKillTaskProcKillUncaughtError() {
procInfo, err := typeurl.MarshalAny(&options.ProcessDetails{
ExecID: "execution-1",
})
s.NoError(err)
s.task.PidsReturns([]containerd.ProcessInfo{
{Pid: 123, Info: procInfo},
}, nil)
expectedErr := errors.New("kill-err")
s.processKiller.KillReturnsOnCall(0, expectedErr)
err = s.killer.Kill(context.Background(), s.task, false)
s.True(errors.Is(err, expectedErr))
s.Equal(1, s.processKiller.KillCallCount())
}
func (s *KillerSuite) TestGracefulKillTaskProcKillErrorOnUngracefulTry() {
procInfo, err := typeurl.MarshalAny(&options.ProcessDetails{
ExecID: "execution-1",
})
s.NoError(err)
s.task.PidsReturns([]containerd.ProcessInfo{
{Pid: 123, Info: procInfo},
}, nil)
s.processKiller.KillReturnsOnCall(0, backend.ErrGracePeriodTimeout)
expectedErr := errors.New("ungraceful-kill-err")
s.processKiller.KillReturnsOnCall(1, expectedErr)
err = s.killer.Kill(context.Background(), s.task, false)
s.True(errors.Is(err, expectedErr))
s.Equal(2, s.processKiller.KillCallCount())
}

View File

@ -10,7 +10,6 @@ import (
func TestSuite(t *testing.T) {
suite.Run(t, &BackendSuite{Assertions: require.New(t)})
suite.Run(t, &CNINetworkSuite{Assertions: require.New(t)})
suite.Run(t, &ContainerStopperSuite{Assertions: require.New(t)})
suite.Run(t, &ContainerSuite{Assertions: require.New(t)})
suite.Run(t, &FileStoreSuite{Assertions: require.New(t)})
suite.Run(t, &KillerSuite{Assertions: require.New(t)})