Merge pull request #5845 from concourse/use-concourse-gclient
TSA uses concourse's gclient with a configurable timeout instead of the cloudfoundry/gardent/client
This commit is contained in:
commit
eb37872de6
|
@ -22,3 +22,7 @@
|
|||
#### <sub><sup><a name="5846" href="#5810">:link:</a></sup></sub> feature
|
||||
|
||||
* @evanchaoli Enhanced build log page as well as `fly watch` to display worker name for `get/put/task` steps. #5846
|
||||
|
||||
#### <sub><sup><a name="5146" href="#5146">:link:</a></sup></sub> feature
|
||||
|
||||
* Refactor TSA to use Concourse's gclient which has a configurable timeout Issue: #5146 PR: #5845
|
|
@ -433,7 +433,8 @@ var _ = Describe("Register", func() {
|
|||
Expect(res.StatusCode).To(Equal(http.StatusTeapot))
|
||||
|
||||
By("exiting successfully")
|
||||
Eventually(registerErr).Should(Receive(BeNil()))
|
||||
// https://golang.org/src/net/http/transport.go -> IdleConnTimeout is 90s in the DefaultTransport used by gclient.BasicGardenClientWithRequestTimeout
|
||||
Eventually(registerErr, time.Second*100).Should(Receive(BeNil()))
|
||||
})
|
||||
|
||||
Context("with a drain timeout", func() {
|
||||
|
|
|
@ -55,6 +55,8 @@ var (
|
|||
heartbeatInterval = 1 * time.Second
|
||||
tsaProcess ifrit.Process
|
||||
|
||||
gardenRequestTimeout = 3 * time.Second
|
||||
|
||||
gardenAddr string
|
||||
fakeBackend *gfakes.FakeBackend
|
||||
|
||||
|
@ -158,6 +160,7 @@ var _ = BeforeEach(func() {
|
|||
"--client-secret", "some-client-secret",
|
||||
"--token-url", authServer.URL()+"/token",
|
||||
"--atc-url", atcServer.URL(),
|
||||
"--garden-request-timeout", gardenRequestTimeout.String(),
|
||||
"--heartbeat-interval", heartbeatInterval.String(),
|
||||
)
|
||||
|
||||
|
|
|
@ -10,11 +10,11 @@ import (
|
|||
"time"
|
||||
|
||||
"code.cloudfoundry.org/clock"
|
||||
"code.cloudfoundry.org/garden"
|
||||
"code.cloudfoundry.org/lager"
|
||||
"code.cloudfoundry.org/lager/lagerctx"
|
||||
"github.com/concourse/baggageclaim"
|
||||
"github.com/concourse/concourse/atc"
|
||||
"github.com/concourse/concourse/atc/worker/gclient"
|
||||
"github.com/tedsuo/rata"
|
||||
)
|
||||
|
||||
|
@ -28,7 +28,7 @@ type Heartbeater struct {
|
|||
interval time.Duration
|
||||
cprInterval time.Duration
|
||||
|
||||
gardenClient garden.Client
|
||||
gardenClient gclient.Client
|
||||
baggageclaimClient baggageclaim.Client
|
||||
|
||||
atcEndpointPicker EndpointPicker
|
||||
|
@ -42,7 +42,7 @@ func NewHeartbeater(
|
|||
clock clock.Clock,
|
||||
interval time.Duration,
|
||||
cprInterval time.Duration,
|
||||
gardenClient garden.Client,
|
||||
gardenClient gclient.Client,
|
||||
baggageclaimClient baggageclaim.Client,
|
||||
atcEndpointPicker EndpointPicker,
|
||||
httpClient *http.Client,
|
||||
|
|
|
@ -8,13 +8,14 @@ import (
|
|||
|
||||
"code.cloudfoundry.org/clock/fakeclock"
|
||||
"code.cloudfoundry.org/garden"
|
||||
"code.cloudfoundry.org/garden/gardenfakes"
|
||||
"code.cloudfoundry.org/lager"
|
||||
"code.cloudfoundry.org/lager/lagerctx"
|
||||
"code.cloudfoundry.org/lager/lagertest"
|
||||
"github.com/concourse/baggageclaim"
|
||||
"github.com/concourse/baggageclaim/baggageclaimfakes"
|
||||
"github.com/concourse/concourse/atc"
|
||||
"github.com/concourse/concourse/atc/worker/gclient"
|
||||
"github.com/concourse/concourse/atc/worker/gclient/gclientfakes"
|
||||
. "github.com/concourse/concourse/tsa"
|
||||
"github.com/concourse/concourse/tsa/tsafakes"
|
||||
. "github.com/onsi/ginkgo"
|
||||
|
@ -42,7 +43,7 @@ var _ = Describe("Heartbeater", func() {
|
|||
resourceTypes []atc.WorkerResourceType
|
||||
|
||||
expectedWorker atc.Worker
|
||||
fakeGardenClient *gardenfakes.FakeClient
|
||||
fakeGardenClient *gclientfakes.FakeClient
|
||||
fakeBaggageclaimClient *baggageclaimfakes.FakeClient
|
||||
fakeATC1 *ghttp.Server
|
||||
fakeATC2 *ghttp.Server
|
||||
|
@ -130,7 +131,7 @@ var _ = Describe("Heartbeater", func() {
|
|||
},
|
||||
)
|
||||
|
||||
fakeGardenClient = new(gardenfakes.FakeClient)
|
||||
fakeGardenClient = new(gclientfakes.FakeClient)
|
||||
fakeBaggageclaimClient = new(baggageclaimfakes.FakeClient)
|
||||
|
||||
clientWriter = gbytes.NewBuffer()
|
||||
|
@ -181,12 +182,12 @@ var _ = Describe("Heartbeater", func() {
|
|||
|
||||
Context("when Garden returns containers and Baggageclaim returns volumes", func() {
|
||||
BeforeEach(func() {
|
||||
containers := make(chan []garden.Container, 4)
|
||||
containers := make(chan []gclient.Container, 4)
|
||||
volumes := make(chan []baggageclaim.Volume, 4)
|
||||
|
||||
containers <- []garden.Container{
|
||||
new(gardenfakes.FakeContainer),
|
||||
new(gardenfakes.FakeContainer),
|
||||
containers <- []gclient.Container{
|
||||
new(gclientfakes.FakeContainer),
|
||||
new(gclientfakes.FakeContainer),
|
||||
}
|
||||
|
||||
volumes <- []baggageclaim.Volume{
|
||||
|
@ -195,12 +196,12 @@ var _ = Describe("Heartbeater", func() {
|
|||
new(baggageclaimfakes.FakeVolume),
|
||||
}
|
||||
|
||||
containers <- []garden.Container{
|
||||
new(gardenfakes.FakeContainer),
|
||||
new(gardenfakes.FakeContainer),
|
||||
new(gardenfakes.FakeContainer),
|
||||
new(gardenfakes.FakeContainer),
|
||||
new(gardenfakes.FakeContainer),
|
||||
containers <- []gclient.Container{
|
||||
new(gclientfakes.FakeContainer),
|
||||
new(gclientfakes.FakeContainer),
|
||||
new(gclientfakes.FakeContainer),
|
||||
new(gclientfakes.FakeContainer),
|
||||
new(gclientfakes.FakeContainer),
|
||||
}
|
||||
|
||||
volumes <- []baggageclaim.Volume{
|
||||
|
@ -208,21 +209,21 @@ var _ = Describe("Heartbeater", func() {
|
|||
new(baggageclaimfakes.FakeVolume),
|
||||
}
|
||||
|
||||
containers <- []garden.Container{
|
||||
new(gardenfakes.FakeContainer),
|
||||
new(gardenfakes.FakeContainer),
|
||||
new(gardenfakes.FakeContainer),
|
||||
new(gardenfakes.FakeContainer),
|
||||
containers <- []gclient.Container{
|
||||
new(gclientfakes.FakeContainer),
|
||||
new(gclientfakes.FakeContainer),
|
||||
new(gclientfakes.FakeContainer),
|
||||
new(gclientfakes.FakeContainer),
|
||||
}
|
||||
|
||||
volumes <- []baggageclaim.Volume{
|
||||
new(baggageclaimfakes.FakeVolume),
|
||||
}
|
||||
|
||||
containers <- []garden.Container{
|
||||
new(gardenfakes.FakeContainer),
|
||||
new(gardenfakes.FakeContainer),
|
||||
new(gardenfakes.FakeContainer),
|
||||
containers <- []gclient.Container{
|
||||
new(gclientfakes.FakeContainer),
|
||||
new(gclientfakes.FakeContainer),
|
||||
new(gclientfakes.FakeContainer),
|
||||
}
|
||||
|
||||
volumes <- []baggageclaim.Volume{}
|
||||
|
@ -230,7 +231,7 @@ var _ = Describe("Heartbeater", func() {
|
|||
close(containers)
|
||||
close(volumes)
|
||||
|
||||
fakeGardenClient.ContainersStub = func(garden.Properties) ([]garden.Container, error) {
|
||||
fakeGardenClient.ContainersStub = func(garden.Properties) ([]gclient.Container, error) {
|
||||
return <-containers, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,8 @@ type TSACommand struct {
|
|||
TokenURL flag.URL `long:"token-url" required:"true" description:"Token endpoint of the auth server"`
|
||||
Scopes []string `long:"scope" description:"Scopes to request from the auth server"`
|
||||
|
||||
HeartbeatInterval time.Duration `long:"heartbeat-interval" default:"30s" description:"interval on which to heartbeat workers to the ATC"`
|
||||
HeartbeatInterval time.Duration `long:"heartbeat-interval" default:"30s" description:"interval on which to heartbeat workers to the ATC"`
|
||||
GardenRequestTimeout time.Duration `long:"garden-request-timeout" default:"5m" description:"How long to wait for requests to Garden to complete. 0 means no timeout."`
|
||||
|
||||
ClusterName string `long:"cluster-name" description:"A name for this Concourse cluster, to be displayed on the dashboard page."`
|
||||
LogClusterName bool `long:"log-cluster-name" description:"Log cluster name."`
|
||||
|
@ -133,14 +134,15 @@ func (cmd *TSACommand) Runner(args []string) (ifrit.Runner, error) {
|
|||
httpClient := oauth2.NewClient(ctx, tokenSource)
|
||||
|
||||
server := &server{
|
||||
logger: logger,
|
||||
heartbeatInterval: cmd.HeartbeatInterval,
|
||||
cprInterval: 1 * time.Second,
|
||||
atcEndpointPicker: atcEndpointPicker,
|
||||
forwardHost: cmd.PeerAddress,
|
||||
config: config,
|
||||
httpClient: httpClient,
|
||||
sessionTeam: sessionAuthTeam,
|
||||
logger: logger,
|
||||
heartbeatInterval: cmd.HeartbeatInterval,
|
||||
cprInterval: 1 * time.Second,
|
||||
atcEndpointPicker: atcEndpointPicker,
|
||||
forwardHost: cmd.PeerAddress,
|
||||
config: config,
|
||||
httpClient: httpClient,
|
||||
sessionTeam: sessionAuthTeam,
|
||||
gardenRequestTimeout: cmd.GardenRequestTimeout,
|
||||
}
|
||||
// Starts a goroutine whose purpose is to listen to the
|
||||
// SIGHUP syscall and reload configuration upon receiving the signal.
|
||||
|
|
|
@ -4,17 +4,15 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"code.cloudfoundry.org/clock"
|
||||
gclient "code.cloudfoundry.org/garden/client"
|
||||
gconn "code.cloudfoundry.org/garden/client/connection"
|
||||
"code.cloudfoundry.org/lager"
|
||||
"code.cloudfoundry.org/lager/lagerctx"
|
||||
bclient "github.com/concourse/baggageclaim/client"
|
||||
"github.com/concourse/concourse/atc"
|
||||
"github.com/concourse/concourse/atc/worker/gclient"
|
||||
"github.com/concourse/concourse/tsa"
|
||||
"golang.org/x/crypto/ssh"
|
||||
)
|
||||
|
@ -76,11 +74,10 @@ func (req forwardWorkerRequest) Handle(ctx context.Context, state ConnState, cha
|
|||
clock.NewClock(),
|
||||
req.server.heartbeatInterval,
|
||||
req.server.cprInterval,
|
||||
gclient.New(
|
||||
gconn.NewWithDialerAndLogger(
|
||||
keepaliveDialerFactory("tcp", worker.GardenAddr),
|
||||
lagerctx.WithSession(ctx, "garden-connection"),
|
||||
),
|
||||
gclient.BasicGardenClientWithRequestTimeout(
|
||||
lagerctx.WithSession(ctx, "garden-connection"),
|
||||
req.server.gardenRequestTimeout,
|
||||
gardenURL(worker.GardenAddr),
|
||||
),
|
||||
bclient.NewWithHTTPClient(worker.BaggageclaimURL, &http.Client{
|
||||
Transport: &http.Transport{
|
||||
|
@ -330,12 +327,6 @@ func (req reportVolumesRequest) Handle(ctx context.Context, state ConnState, cha
|
|||
}).WorkerStatus(ctx, worker, tsa.ReportVolumes)
|
||||
}
|
||||
|
||||
func keepaliveDialerFactory(network string, address string) gconn.DialerFunc {
|
||||
dialer := &net.Dialer{
|
||||
KeepAlive: 15 * time.Second,
|
||||
}
|
||||
|
||||
return func(string, string) (net.Conn, error) {
|
||||
return dialer.Dial(network, address)
|
||||
}
|
||||
func gardenURL(addr string) string {
|
||||
return fmt.Sprintf("http://%s", addr)
|
||||
}
|
||||
|
|
|
@ -20,14 +20,15 @@ import (
|
|||
const maxForwards = 2
|
||||
|
||||
type server struct {
|
||||
logger lager.Logger
|
||||
atcEndpointPicker tsa.EndpointPicker
|
||||
heartbeatInterval time.Duration
|
||||
cprInterval time.Duration
|
||||
forwardHost string
|
||||
config *ssh.ServerConfig
|
||||
httpClient *http.Client
|
||||
sessionTeam *sessionTeam
|
||||
logger lager.Logger
|
||||
atcEndpointPicker tsa.EndpointPicker
|
||||
heartbeatInterval time.Duration
|
||||
cprInterval time.Duration
|
||||
gardenRequestTimeout time.Duration
|
||||
forwardHost string
|
||||
config *ssh.ServerConfig
|
||||
httpClient *http.Client
|
||||
sessionTeam *sessionTeam
|
||||
}
|
||||
|
||||
type sessionTeam struct {
|
||||
|
|
Loading…
Reference in New Issue