diff --git a/atc/worker/artifact_source.go b/atc/worker/artifact_source.go
index 0ba500194..021b27e12 100644
--- a/atc/worker/artifact_source.go
+++ b/atc/worker/artifact_source.go
@@ -8,6 +8,7 @@ import (
"code.cloudfoundry.org/lager"
"github.com/concourse/concourse/atc/compression"
"github.com/concourse/concourse/atc/runtime"
+ "github.com/concourse/concourse/tracing"
"github.com/hashicorp/go-multierror"
)
@@ -60,18 +61,26 @@ func (source *artifactSource) StreamTo(
logger lager.Logger,
destination ArtifactDestination,
) error {
+ ctx, span := tracing.StartSpan(ctx, "artifactSource.StreamTo", nil)
+ defer span.End()
+
+ _, outSpan := tracing.StartSpan(ctx, "volume.StreamOut", tracing.Attrs{
+ "origin-volume": source.volume.Handle(),
+ "origin-worker": source.volume.WorkerName(),
+ })
+ defer outSpan.End()
out, err := source.volume.StreamOut(ctx, ".", source.compression.Encoding())
+
if err != nil {
+ tracing.End(outSpan, err)
return err
}
defer out.Close()
err = destination.StreamIn(ctx, ".", source.compression.Encoding(), out)
- if err != nil {
- return err
- }
- return nil
+
+ return err
}
// TODO: figure out if we want logging before and after streams, I remove logger from private methods
diff --git a/atc/worker/image/image.go b/atc/worker/image/image.go
index cdc4421cb..fa498e73d 100644
--- a/atc/worker/image/image.go
+++ b/atc/worker/image/image.go
@@ -2,6 +2,7 @@ package image
import (
"context"
+ "github.com/concourse/concourse/tracing"
"io"
"net/url"
"path"
@@ -77,6 +78,9 @@ func (i *imageProvidedByPreviousStepOnDifferentWorker) FetchForContainer(
logger lager.Logger,
container db.CreatingContainer,
) (worker.FetchedImage, error) {
+ ctx, span := tracing.StartSpan(ctx, "imageProvidedByPreviousStepOnDifferentWorker.FetchForContainer", tracing.Attrs{"container_id": container.Handle()})
+ defer span.End()
+
imageVolume, err := i.volumeClient.FindOrCreateVolumeForContainer(
logger,
worker.VolumeSpec{
diff --git a/atc/worker/volume.go b/atc/worker/volume.go
index ebb6f032f..36eafc102 100644
--- a/atc/worker/volume.go
+++ b/atc/worker/volume.go
@@ -2,6 +2,7 @@ package worker
import (
"context"
+ "github.com/concourse/concourse/tracing"
"io"
"code.cloudfoundry.org/lager"
@@ -86,7 +87,15 @@ func (v *volume) SetPrivileged(privileged bool) error {
}
func (v *volume) StreamIn(ctx context.Context, path string, encoding baggageclaim.Encoding, tarStream io.Reader) error {
- return v.bcVolume.StreamIn(ctx, path, encoding, tarStream)
+ _, span := tracing.StartSpan(ctx, "volume.StreamIn", tracing.Attrs{
+ "destination-volume": v.Handle(),
+ "destination-worker": v.WorkerName(),
+ })
+
+ err := v.bcVolume.StreamIn(ctx, path, encoding, tarStream)
+ tracing.End(span, err)
+
+ return err
}
func (v *volume) StreamOut(ctx context.Context, path string, encoding baggageclaim.Encoding) (io.ReadCloser, error) {
diff --git a/atc/worker/worker.go b/atc/worker/worker.go
index 84b23ec04..48d919d44 100644
--- a/atc/worker/worker.go
+++ b/atc/worker/worker.go
@@ -22,6 +22,7 @@ import (
"github.com/concourse/concourse/atc/resource"
"github.com/concourse/concourse/atc/runtime"
"github.com/concourse/concourse/atc/worker/gclient"
+ "github.com/concourse/concourse/tracing"
"github.com/cppforlife/go-semi-semantic/version"
"golang.org/x/sync/errgroup"
)
@@ -595,7 +596,15 @@ func (worker *gardenWorker) cloneRemoteVolumes(
container db.CreatingContainer,
nonLocals []mountableRemoteInput,
) ([]VolumeMount, error) {
+
mounts := make([]VolumeMount, len(nonLocals))
+ if len(nonLocals) <= 0 {
+ return mounts, nil
+ }
+
+ ctx, span := tracing.StartSpan(ctx, "worker.cloneRemoteVolumes", tracing.Attrs{"container_id": container.Handle()})
+ defer span.End()
+
g, groupCtx := errgroup.WithContext(ctx)
for i, nonLocalInput := range nonLocals {
@@ -615,8 +624,8 @@ func (worker *gardenWorker) cloneRemoteVolumes(
return []VolumeMount{}, err
}
destData := lager.Data{
- "dest-volume": inputVolume.Handle(),
- "dest-worker": inputVolume.WorkerName(),
+ "destination-volume": inputVolume.Handle(),
+ "destination-worker": inputVolume.WorkerName(),
}
g.Go(func() error {
@@ -639,9 +648,7 @@ func (worker *gardenWorker) cloneRemoteVolumes(
return nil, err
}
- if len(nonLocals) > 0 {
- logger.Debug("streamed-non-local-volumes", lager.Data{"volumes-streamed": len(nonLocals)})
- }
+ logger.Debug("streamed-non-local-volumes", lager.Data{"volumes-streamed": len(nonLocals)})
return mounts, nil
}
diff --git a/release-notes/latest.md b/release-notes/latest.md
index 8559ccb95..9842bd94a 100644
--- a/release-notes/latest.md
+++ b/release-notes/latest.md
@@ -81,3 +81,7 @@ Currently the only API action that can be limited in this way is `ListAllJobs` -
#### :link: feature
* Proxy support for NewRelic emitter
+
+#### :link: feature
+
+* Add tracing to allow users and developers to observe volume streaming from source to destination volumes. #5579
diff --git a/tracing/tracer.go b/tracing/tracer.go
index 3d41d88bd..5c8b3e900 100644
--- a/tracing/tracer.go
+++ b/tracing/tracer.go
@@ -190,7 +190,7 @@ func End(span trace.Span, err error) {
if err != nil {
span.SetStatus(codes.Internal)
span.SetAttributes(
- key.New("error").String(err.Error()),
+ key.New("error-message").String(err.Error()),
)
}