Support "nozip" for volume streaming.

Signed-off-by: Evan <chaol@vmware.com>
This commit is contained in:
Evan 2023-03-13 10:42:51 +08:00
parent deafb28da3
commit 593ce5604d
9 changed files with 84 additions and 29 deletions

View File

@ -157,7 +157,7 @@ type RunCommand struct {
ContainerPlacementStrategyOptions worker.PlacementOptions `group:"Container Placement Strategy"`
BaggageclaimResponseHeaderTimeout time.Duration `long:"baggageclaim-response-header-timeout" default:"1m" description:"How long to wait for Baggageclaim to send the response header."`
StreamingArtifactsCompression string `long:"streaming-artifacts-compression" default:"gzip" choice:"gzip" choice:"zstd" description:"Compression algorithm for internal streaming."`
StreamingArtifactsCompression string `long:"streaming-artifacts-compression" default:"gzip" choice:"gzip" choice:"zstd" description:"Compression algorithm for internal streaming." choice:"nozip" description:"No compression"`
GardenRequestTimeout time.Duration `long:"garden-request-timeout" default:"5m" description:"How long to wait for requests to Garden to complete. 0 means no timeout."`
@ -1172,6 +1172,8 @@ func (cmd *RunCommand) backendComponents(
func (cmd *RunCommand) compression() compression.Compression {
if cmd.StreamingArtifactsCompression == "zstd" {
return compression.NewZstdCompression()
} else if cmd.StreamingArtifactsCompression == "nozip" {
return compression.NewNoZipCompression()
} else {
return compression.NewGzipCompression()
}

33
atc/compression/nozip.go Normal file
View File

@ -0,0 +1,33 @@
package compression
import (
"io"
"github.com/concourse/concourse/worker/baggageclaim"
)
type nozipCompression struct{}
func NewNoZipCompression() Compression {
return &nozipCompression{}
}
func (c *nozipCompression) NewReader(reader io.ReadCloser) (io.ReadCloser, error) {
return &zozipReader{reader: reader}, nil
}
func (c *nozipCompression) Encoding() baggageclaim.Encoding {
return baggageclaim.NoZipEncoding
}
type zozipReader struct {
reader io.ReadCloser
}
func (zr *zozipReader) Read(p []byte) (int, error) {
return zr.reader.Read(p)
}
func (zr *zozipReader) Close() error {
return zr.reader.Close()
}

View File

@ -65,4 +65,4 @@ services:
CONCOURSE_BAGGAGECLAIM_DRIVER: overlay
# work with docker-compose's dns
CONCOURSE_CONTAINERD_DNS_PROXY_ENABLE: "true"
CONCOURSE_CONTAINERD_DNS_PROXY_ENABLE: "true"

View File

@ -11,4 +11,4 @@ var Version = "0.0.0-dev"
//
// New features that are otherwise backwards-compatible should result in a
// minor version bump.
var WorkerVersion = "2.4"
var WorkerVersion = "2.5"

View File

@ -481,7 +481,7 @@ func (vs *VolumeServer) StreamIn(w http.ResponseWriter, req *http.Request) {
subPath = queryPath[0]
}
badStream, err := vs.volumeRepo.StreamIn(ctx, handle, subPath, req.Header.Get("Content-Encoding"), req.Body)
badStream, err := vs.volumeRepo.StreamIn(ctx, handle, subPath, baggageclaim.Encoding(req.Header.Get("Content-Encoding")), req.Body)
if err != nil {
if err == volume.ErrVolumeDoesNotExist {
hLog.Info("volume-not-found")
@ -527,7 +527,7 @@ func (vs *VolumeServer) StreamOut(w http.ResponseWriter, req *http.Request) {
subPath = queryPath[0]
}
err := vs.volumeRepo.StreamOut(ctx, handle, subPath, req.Header.Get("Accept-Encoding"), w)
err := vs.volumeRepo.StreamOut(ctx, handle, subPath, baggageclaim.Encoding(req.Header.Get("Accept-Encoding")), w)
if err != nil {
if err == volume.ErrVolumeDoesNotExist {
hLog.Info("volume-not-found")
@ -604,7 +604,7 @@ func (vs *VolumeServer) StreamP2pOut(w http.ResponseWriter, req *http.Request) {
doneChan := make(chan error, 1)
go func(doneChan chan<- error) {
err := vs.volumeRepo.StreamP2pOut(ctx, handle, subPath, encoding, streamInURL)
err := vs.volumeRepo.StreamP2pOut(ctx, handle, subPath, baggageclaim.Encoding(encoding), streamInURL)
if err != nil {
hLog.Error("failed-to-stream-out", err)
doneChan <- fmt.Errorf("%s: %w", ErrStreamP2pOutFailed, err)

View File

@ -10,6 +10,7 @@ type Encoding string
const GzipEncoding Encoding = "gzip"
const ZstdEncoding Encoding = "zstd"
const NoZipEncoding Encoding = "nozip"
const (
StrategyEmpty = "empty"

View File

@ -12,6 +12,7 @@ import (
"code.cloudfoundry.org/lager"
"code.cloudfoundry.org/lager/lagerctx"
"github.com/concourse/concourse/tracing"
"github.com/concourse/concourse/worker/baggageclaim"
"github.com/concourse/concourse/worker/baggageclaim/uidgid"
)
@ -19,9 +20,6 @@ var ErrVolumeDoesNotExist = errors.New("volume does not exist")
var ErrVolumeIsCorrupted = errors.New("volume is corrupted")
var ErrUnsupportedStreamEncoding = errors.New("unsupported stream encoding")
const GzipEncoding string = "gzip"
const ZstdEncoding string = "zstd"
//go:generate counterfeiter . Repository
type Repository interface {
@ -35,10 +33,10 @@ type Repository interface {
GetPrivileged(ctx context.Context, handle string) (bool, error)
SetPrivileged(ctx context.Context, handle string, privileged bool) error
StreamIn(ctx context.Context, handle string, path string, encoding string, stream io.Reader) (bool, error)
StreamOut(ctx context.Context, handle string, path string, encoding string, dest io.Writer) error
StreamIn(ctx context.Context, handle string, path string, encoding baggageclaim.Encoding, stream io.Reader) (bool, error)
StreamOut(ctx context.Context, handle string, path string, encoding baggageclaim.Encoding, dest io.Writer) error
StreamP2pOut(ctx context.Context, handle string, path string, encoding string, streamInURL string) error
StreamP2pOut(ctx context.Context, handle string, path string, encoding baggageclaim.Encoding, streamInURL string) error
VolumeParent(ctx context.Context, handle string) (Volume, bool, error)
}
@ -48,9 +46,10 @@ type repository struct {
locker LockManager
gzipStreamer Streamer
zstdStreamer Streamer
namespacer func(bool) uidgid.Namespacer
gzipStreamer Streamer
zstdStreamer Streamer
nozipStreamer Streamer
namespacer func(bool) uidgid.Namespacer
}
func NewRepository(
@ -63,6 +62,11 @@ func NewRepository(
filesystem: filesystem,
locker: locker,
nozipStreamer: &tarGzipStreamer{
namespacer: unprivilegedNamespacer,
skipGzip: true,
},
gzipStreamer: &tarGzipStreamer{
namespacer: unprivilegedNamespacer,
},
@ -365,11 +369,11 @@ func (repo *repository) SetPrivileged(ctx context.Context, handle string, privil
return nil
}
func (repo *repository) StreamIn(ctx context.Context, handle string, path string, encoding string, stream io.Reader) (bool, error) {
func (repo *repository) StreamIn(ctx context.Context, handle string, path string, encoding baggageclaim.Encoding, stream io.Reader) (bool, error) {
ctx, span := tracing.StartSpan(ctx, "volumeRepository.StreamIn", tracing.Attrs{
"volume": handle,
"sub-path": path,
"encoding": encoding,
"encoding": string(encoding),
})
defer span.End()
@ -415,16 +419,18 @@ func (repo *repository) StreamIn(ctx context.Context, handle string, path string
}
switch encoding {
case ZstdEncoding:
case baggageclaim.ZstdEncoding:
return repo.zstdStreamer.In(stream, destinationPath, privileged)
case GzipEncoding:
case baggageclaim.GzipEncoding:
return repo.gzipStreamer.In(stream, destinationPath, privileged)
case baggageclaim.NoZipEncoding:
return repo.nozipStreamer.In(stream, destinationPath, privileged)
}
return false, ErrUnsupportedStreamEncoding
}
func (repo *repository) StreamOut(ctx context.Context, handle string, path string, encoding string, dest io.Writer) error {
func (repo *repository) StreamOut(ctx context.Context, handle string, path string, encoding baggageclaim.Encoding, dest io.Writer) error {
ctx, span := tracing.StartSpan(ctx, "volumeRepository.StreamOut", tracing.Attrs{
"volume": handle,
"sub-path": path,
@ -460,20 +466,22 @@ func (repo *repository) StreamOut(ctx context.Context, handle string, path strin
}
switch encoding {
case ZstdEncoding:
case baggageclaim.ZstdEncoding:
return repo.zstdStreamer.Out(dest, srcPath, isPrivileged)
case GzipEncoding:
case baggageclaim.GzipEncoding:
return repo.gzipStreamer.Out(dest, srcPath, isPrivileged)
case baggageclaim.NoZipEncoding:
return repo.nozipStreamer.Out(dest, srcPath, isPrivileged)
}
return ErrUnsupportedStreamEncoding
}
func (repo *repository) StreamP2pOut(ctx context.Context, handle string, path string, encoding string, streamInURL string) error {
func (repo *repository) StreamP2pOut(ctx context.Context, handle string, path string, encoding baggageclaim.Encoding, streamInURL string) error {
ctx, span := tracing.StartSpan(ctx, "volumeRepository.StreamP2pOut", tracing.Attrs{
"volume": handle,
"sub-path": path,
"encoding": encoding,
"encoding": string(encoding),
})
defer span.End()
@ -515,10 +523,12 @@ func (repo *repository) StreamP2pOut(ctx context.Context, handle string, path st
go func() {
var err error
switch encoding {
case ZstdEncoding:
case baggageclaim.ZstdEncoding:
err = repo.zstdStreamer.Out(writer, srcPath, isPrivileged)
case GzipEncoding:
case baggageclaim.GzipEncoding:
err = repo.gzipStreamer.Out(writer, srcPath, isPrivileged)
case baggageclaim.NoZipEncoding:
err = repo.nozipStreamer.Out(writer, srcPath, isPrivileged)
default:
err = ErrUnsupportedStreamEncoding
}
@ -536,7 +546,7 @@ func (repo *repository) StreamP2pOut(ctx context.Context, handle string, path st
return err
}
req.Header.Set("Content-Encoding", encoding)
req.Header.Set("Content-Encoding", string(encoding))
resp, err := client.Do(req)
if err != nil {
return err

View File

@ -19,4 +19,5 @@ type tarZstdStreamer struct {
type tarGzipStreamer struct {
namespacer uidgid.Namespacer
skipGzip bool
}

View File

@ -89,7 +89,11 @@ func (streamer *tarZstdStreamer) Out(tzstOutput io.Writer, src string, privilege
}
func (streamer *tarGzipStreamer) In(tgzStream io.Reader, dest string, privileged bool) (bool, error) {
tarCommand, dirFd, err := tarCmd(streamer.namespacer, privileged, dest, "-xz")
tarOptions := "-xz"
if streamer.skipGzip {
tarOptions = "-x"
}
tarCommand, dirFd, err := tarCmd(streamer.namespacer, privileged, dest, tarOptions)
if err != nil {
return false, err
}
@ -128,7 +132,11 @@ func (streamer *tarGzipStreamer) Out(w io.Writer, src string, privileged bool) e
tarCommandDir = filepath.Dir(src)
}
tarCommand, dirFd, err := tarCmd(streamer.namespacer, privileged, tarCommandDir, "-cz", tarCommandPath)
tarOptions := "-cz"
if streamer.skipGzip {
tarOptions = "-c"
}
tarCommand, dirFd, err := tarCmd(streamer.namespacer, privileged, tarCommandDir, tarOptions, tarCommandPath)
if err != nil {
return err
}