Skip to content

Commit

Permalink
peering: generate new token only on user-triggered events (#1399)
Browse files Browse the repository at this point in the history
Previously, we will call generate token endpoint when secret contents change,
i.e. when the resource version in the status of the Acceptor CR doesn't match
the resouce version in the existing secret version. However, this results in a race
condition when multiple reconciles run concurrently. When that happens, one of them ends
up with an outdated CR object and as a result decides that it needs to re-generate the peering
token because those resource versions don't match. This is also undesirable because now
peering tokens have an establishment secret that can only be used one time. If we re-generate
tokens without user knowing, this could result in errors because the dialer will use an outdated secret.

Also, make sure we always update the latest version of the acceptor object by re-fetching
it before updating.
  • Loading branch information
ishustava authored Aug 9, 2022
1 parent 1abda49 commit bfd6407
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 185 deletions.
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ commands:
${ENABLE_ENTERPRISE:+-enable-enterprise} \
-enable-multi-cluster \
-debug-directory="$TEST_RESULTS/debug" \
-consul-image=docker.mirror.hashicorp.services/hashicorppreview/consul-enterprise:1.13-dev \
-consul-k8s-image=<< parameters.consul-k8s-image >>
then
echo "Tests in ${pkg} failed, aborting early"
Expand Down Expand Up @@ -132,6 +133,7 @@ commands:
-enable-multi-cluster \
${ENABLE_ENTERPRISE:+-enable-enterprise} \
-debug-directory="$TEST_RESULTS/debug" \
-consul-image=docker.mirror.hashicorp.services/hashicorppreview/consul-enterprise:1.13-dev \
-consul-k8s-image=<< parameters.consul-k8s-image >>
jobs:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ FEATURES:
* Support new expose-servers Kubernetes Service deployed by Helm chart to expose the Consul servers, and using the service address in the peering token. [[GH-1378](https://github.com/hashicorp/consul-k8s/pull/1378)]
* Support non-default partitions by using `externalServers.hosts` as the server addresses in the peering token. [[GH-1384](https://github.com/hashicorp/consul-k8s/pull/1384)]
* Support arbitrary addresses as the server addresses in the peering token via `global.peering.tokenGeneration.source="static"` and `global.peering.tokenGeneration.static=["sample-server-address:8502"]`. [[GH-1392](https://github.com/hashicorp/consul-k8s/pull/1392)]
* Generate new peering token only on user-triggered events. [[GH-1399](https://github.com/hashicorp/consul-k8s/pull/1399)]

IMPROVEMENTS:
* Helm
Expand Down
4 changes: 2 additions & 2 deletions acceptance/framework/k8s/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ func KubectlApplyK(t *testing.T, options *k8s.KubectlOptions, kustomizeDir strin
// deletes it from the cluster by running 'kubectl delete -f'.
// If there's an error deleting the file, fail the test.
func KubectlDelete(t *testing.T, options *k8s.KubectlOptions, configPath string) {
_, err := RunKubectlAndGetOutputE(t, options, "delete", "-f", configPath)
_, err := RunKubectlAndGetOutputE(t, options, "delete", "--timeout=60s", "-f", configPath)
require.NoError(t, err)
}

// KubectlDeleteK takes a path to a kustomize directory and
// deletes it from the cluster by running 'kubectl delete -k'.
// If there's an error deleting the file, fail the test.
func KubectlDeleteK(t *testing.T, options *k8s.KubectlOptions, kustomizeDir string) {
_, err := RunKubectlAndGetOutputE(t, options, "delete", "-k", kustomizeDir)
_, err := RunKubectlAndGetOutputE(t, options, "delete", "--timeout=60s", "-k", kustomizeDir)
require.NoError(t, err)
}

Expand Down
12 changes: 5 additions & 7 deletions acceptance/tests/peering/peering_connect_namespaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ func TestPeering_ConnectNamespaces(t *testing.T) {
false,
},
{
"default destination namespace",
"default destination namespace; secure",
defaultNamespace,
false,
true,
},
{
"single destination namespace",
"single destination namespace; secure",
staticServerNamespace,
false,
true,
},
{
"mirror k8s namespaces",
"mirror k8s namespaces; secure",
staticServerNamespace,
true,
true,
Expand All @@ -96,8 +96,6 @@ func TestPeering_ConnectNamespaces(t *testing.T) {
"global.peering.enabled": "true",
"global.enableConsulNamespaces": "true",

"global.image": "ndhanushkodi/consul-dev:ent-backoff-fix",

"global.tls.enabled": "true",
"global.tls.httpsOnly": strconv.FormatBool(c.ACLsAndAutoEncryptEnabled),
"global.tls.enableAutoEncrypt": strconv.FormatBool(c.ACLsAndAutoEncryptEnabled),
Expand Down Expand Up @@ -177,9 +175,9 @@ func TestPeering_ConnectNamespaces(t *testing.T) {
// Ensure the secret is created.
timer := &retry.Timer{Timeout: 1 * time.Minute, Wait: 1 * time.Second}
retry.RunWith(timer, t, func(r *retry.R) {
acceptorSecretResourceVersion, err := k8s.RunKubectlAndGetOutputE(t, staticClientPeerClusterContext.KubectlOptions(t), "get", "peeringacceptor", "server", "-o", "jsonpath={.status.secret.resourceVersion}")
acceptorSecretName, err := k8s.RunKubectlAndGetOutputE(t, staticClientPeerClusterContext.KubectlOptions(t), "get", "peeringacceptor", "server", "-o", "jsonpath={.status.secret.name}")
require.NoError(r, err)
require.NotEmpty(r, acceptorSecretResourceVersion)
require.NotEmpty(r, acceptorSecretName)
})

// Copy secret from client peer to server peer.
Expand Down
6 changes: 2 additions & 4 deletions acceptance/tests/peering/peering_connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ func TestPeering_Connect(t *testing.T) {
commonHelmValues := map[string]string{
"global.peering.enabled": "true",

"global.image": "ndhanushkodi/consul-dev:ent-backoff-fix",

"global.tls.enabled": "true",
"global.tls.httpsOnly": strconv.FormatBool(c.ACLsAndAutoEncryptEnabled),
"global.tls.enableAutoEncrypt": strconv.FormatBool(c.ACLsAndAutoEncryptEnabled),
Expand Down Expand Up @@ -132,9 +130,9 @@ func TestPeering_Connect(t *testing.T) {
// Ensure the secret is created.
timer := &retry.Timer{Timeout: 1 * time.Minute, Wait: 1 * time.Second}
retry.RunWith(timer, t, func(r *retry.R) {
acceptorSecretResourceVersion, err := k8s.RunKubectlAndGetOutputE(t, staticClientPeerClusterContext.KubectlOptions(t), "get", "peeringacceptor", "server", "-o", "jsonpath={.status.secret.resourceVersion}")
acceptorSecretName, err := k8s.RunKubectlAndGetOutputE(t, staticClientPeerClusterContext.KubectlOptions(t), "get", "peeringacceptor", "server", "-o", "jsonpath={.status.secret.name}")
require.NoError(r, err)
require.NotEmpty(r, acceptorSecretResourceVersion)
require.NotEmpty(r, acceptorSecretName)
})

// Copy secret from client peer to server peer.
Expand Down
161 changes: 70 additions & 91 deletions control-plane/connect-inject/peering_acceptor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ const (
// - If the resource exists, and a peering does exist in Consul, it should be reconciled.
// - If the status of the resource does not match the current state of the specified secret, generate a new token
// and store it according to the spec.
//
// NOTE: It is possible that Reconcile is called multiple times concurrently because we're watching
// two different resource kinds. As a result, we need to make sure that the code in this method
// is thread-safe. For example, we may need to fetch the resource again before writing because another
// call to Reconcile could have modified it, and so we need to make sure that we're updating the latest version.
func (r *PeeringAcceptorController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.Log.Info("received request for PeeringAcceptor", "name", req.Name, "ns", req.Namespace)

Expand Down Expand Up @@ -92,7 +97,7 @@ func (r *PeeringAcceptorController) Reconcile(ctx context.Context, req ctrl.Requ
r.Log.Info("PeeringAcceptor was deleted, deleting from Consul", "name", req.Name, "ns", req.Namespace)
err := r.deletePeering(ctx, req.Name)
if acceptor.Secret().Backend == "kubernetes" {
err = r.deleteK8sSecret(ctx, acceptor)
err = r.deleteK8sSecret(ctx, acceptor.Secret().Name, acceptor.Namespace)
}
if err != nil {
return ctrl.Result{}, err
Expand All @@ -116,20 +121,14 @@ func (r *PeeringAcceptorController) Reconcile(ctx context.Context, req ctrl.Requ
serverExternalAddresses = r.TokenServerAddresses
}

statusSecretSet := acceptor.SecretRef() != nil

// existingStatusSecret will be nil if it doesn't exist, and have the contents of the secret if it does exist.
var existingStatusSecret *corev1.Secret
if statusSecretSet {
existingStatusSecret, err = r.getExistingSecret(ctx, acceptor.SecretRef().Name, acceptor.Namespace)
if err != nil {
r.updateStatusError(ctx, acceptor, KubernetesError, err)
return ctrl.Result{}, err
}
// existingSecret will be nil if it doesn't exist, and have the contents of the secret if it does exist.
existingSecret, err := r.getExistingSecret(ctx, acceptor.Secret().Name, acceptor.Namespace)
if err != nil {
r.Log.Error(err, "error retrieving existing secret", "name", acceptor.Secret().Name)
r.updateStatusError(ctx, acceptor, KubernetesError, err)
return ctrl.Result{}, err
}

var secretResourceVersion string

// Read the peering from Consul.
peering, _, err := r.ConsulClient.Peerings().Read(ctx, acceptor.Name, nil)
if err != nil {
Expand All @@ -142,14 +141,11 @@ func (r *PeeringAcceptorController) Reconcile(ctx context.Context, req ctrl.Requ
if peering == nil {
r.Log.Info("peering doesn't exist in Consul; creating new peering", "name", acceptor.Name)

if statusSecretSet {
if existingStatusSecret != nil {
r.Log.Info("stale secret in status; deleting stale secret", "name", acceptor.Name)
err := r.Client.Delete(ctx, existingStatusSecret)
if err != nil {
r.updateStatusError(ctx, acceptor, KubernetesError, err)
return ctrl.Result{}, err
}
if acceptor.SecretRef() != nil {
r.Log.Info("stale secret in status; deleting stale secret", "name", acceptor.Name, "secret-name", acceptor.SecretRef().Name)
if err := r.deleteK8sSecret(ctx, acceptor.SecretRef().Name, acceptor.Namespace); err != nil {
r.updateStatusError(ctx, acceptor, KubernetesError, err)
return ctrl.Result{}, err
}
}
// Generate and store the peering token.
Expand All @@ -159,62 +155,50 @@ func (r *PeeringAcceptorController) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, err
}
if acceptor.Secret().Backend == "kubernetes" {
secretResourceVersion, err = r.createOrUpdateK8sSecret(ctx, acceptor, resp)
if err != nil {
if err := r.createOrUpdateK8sSecret(ctx, acceptor, resp); err != nil {
r.updateStatusError(ctx, acceptor, KubernetesError, err)
return ctrl.Result{}, err
}
}
// Store the state in the status.
err := r.updateStatus(ctx, acceptor, secretResourceVersion)
return ctrl.Result{}, err
} else if err != nil {
r.Log.Error(err, "failed to get Peering from Consul", "name", req.Name)
err := r.updateStatus(ctx, req.NamespacedName)
return ctrl.Result{}, err
}

// TODO(peering): Verify that the existing peering in Consul is an acceptor peer. If it is a dialing peer, an error should be thrown.

// If the peering does exist in Consul, figure out whether to generate and store a new token by comparing the secret
// in the status to the resource version of the secret. If no secret is specified in the status, shouldGenerate will
// be set to true.
var shouldGenerate bool
var nameChanged bool
if statusSecretSet {
shouldGenerate, nameChanged, err = shouldGenerateToken(acceptor, existingStatusSecret)
if err != nil {
r.updateStatusError(ctx, acceptor, InternalError, err)
return ctrl.Result{}, err
}
} else {
shouldGenerate = true
r.Log.Info("peering exists in Consul")

// If the peering does exist in Consul, figure out whether to generate and store a new token.
shouldGenerate, nameChanged, err := shouldGenerateToken(acceptor, existingSecret)
if err != nil {
r.updateStatusError(ctx, acceptor, InternalError, err)
return ctrl.Result{}, err
}

if shouldGenerate {
// Generate and store the peering token.
var resp *api.PeeringGenerateTokenResponse
r.Log.Info("generating new token for an existing peering")
if resp, err = r.generateToken(ctx, acceptor.Name, serverExternalAddresses); err != nil {
return ctrl.Result{}, err
}
if acceptor.Secret().Backend == "kubernetes" {
secretResourceVersion, err = r.createOrUpdateK8sSecret(ctx, acceptor, resp)
if err != nil {
if err = r.createOrUpdateK8sSecret(ctx, acceptor, resp); err != nil {
return ctrl.Result{}, err
}
}
// Delete the existing secret if the name changed. This needs to come before updating the status if we do generate a new token.
if nameChanged {
if existingStatusSecret != nil {
err := r.Client.Delete(ctx, existingStatusSecret)
if err != nil {
r.updateStatusError(ctx, acceptor, ConsulAgentError, err)
return ctrl.Result{}, err
}
if nameChanged && acceptor.SecretRef() != nil {
r.Log.Info("stale secret in status; deleting stale secret", "name", acceptor.Name, "secret-name", acceptor.SecretRef().Name)
if err = r.deleteK8sSecret(ctx, acceptor.SecretRef().Name, acceptor.Namespace); err != nil {
r.updateStatusError(ctx, acceptor, KubernetesError, err)
return ctrl.Result{}, err
}
}

// Store the state in the status.
err := r.updateStatus(ctx, acceptor, secretResourceVersion)
err := r.updateStatus(ctx, req.NamespacedName)
return ctrl.Result{}, err
}

Expand All @@ -223,48 +207,46 @@ func (r *PeeringAcceptorController) Reconcile(ctx context.Context, req ctrl.Requ

// shouldGenerateToken returns whether a token should be generated, and whether the name of the secret has changed. It
// compares the spec secret's name/key/backend and resource version with the name/key/backend and resource version of the status secret's.
func shouldGenerateToken(acceptor *consulv1alpha1.PeeringAcceptor, existingStatusSecret *corev1.Secret) (shouldGenerate bool, nameChanged bool, err error) {
if acceptor.SecretRef() == nil {
return false, false, errors.New("shouldGenerateToken was called with an empty fields in the existing status")
}
// Compare the existing name, key, and backend.
if acceptor.SecretRef().Name != acceptor.Secret().Name {
return true, true, nil
}
if acceptor.SecretRef().Key != acceptor.Secret().Key {
return true, false, nil
}
// TODO(peering): remove this when validation webhook exists.
if acceptor.SecretRef().Backend != acceptor.Secret().Backend {
return false, false, errors.New("PeeringAcceptor backend cannot be changed")
}
if peeringVersionString, ok := acceptor.Annotations[annotationPeeringVersion]; ok {
peeringVersion, err := strconv.ParseUint(peeringVersionString, 10, 64)
if err != nil {
return false, false, err
func shouldGenerateToken(acceptor *consulv1alpha1.PeeringAcceptor, existingSecret *corev1.Secret) (shouldGenerate bool, nameChanged bool, err error) {
if acceptor.SecretRef() != nil {
// Compare the existing name, key, and backend.
if acceptor.SecretRef().Name != acceptor.Secret().Name {
return true, true, nil
}
if acceptor.Status.LatestPeeringVersion == nil || *acceptor.Status.LatestPeeringVersion < peeringVersion {
if acceptor.SecretRef().Key != acceptor.Secret().Key {
return true, false, nil
}
}
// Compare the existing secret resource version.
// Get the secret specified by the status, make sure it matches the status' secret.ResourceVersion.
if existingStatusSecret != nil {
if existingStatusSecret.ResourceVersion != acceptor.SecretRef().ResourceVersion {
return true, false, nil
// TODO(peering): remove this when validation webhook exists.
if acceptor.SecretRef().Backend != acceptor.Secret().Backend {
return false, false, errors.New("PeeringAcceptor backend cannot be changed")
}
if peeringVersionString, ok := acceptor.Annotations[annotationPeeringVersion]; ok {
peeringVersion, err := strconv.ParseUint(peeringVersionString, 10, 64)
if err != nil {
return false, false, err
}
if acceptor.Status.LatestPeeringVersion == nil || *acceptor.Status.LatestPeeringVersion < peeringVersion {
return true, false, nil
}
}
}

} else {
if existingSecret == nil {
return true, false, nil
}

return false, false, nil
}

// updateStatus updates the peeringAcceptor's secret in the status.
func (r *PeeringAcceptorController) updateStatus(ctx context.Context, acceptor *consulv1alpha1.PeeringAcceptor, secretResourceVersion string) error {
func (r *PeeringAcceptorController) updateStatus(ctx context.Context, acceptorObjKey types.NamespacedName) error {
// Get the latest resource before we update it.
acceptor := &consulv1alpha1.PeeringAcceptor{}
if err := r.Client.Get(ctx, acceptorObjKey, acceptor); err != nil {
return fmt.Errorf("error fetching acceptor resource before status update: %w", err)
}
acceptor.Status.SecretRef = &consulv1alpha1.SecretRefStatus{
Secret: *acceptor.Secret(),
ResourceVersion: secretResourceVersion,
Secret: *acceptor.Secret(),
}
acceptor.Status.LastSyncedTime = &metav1.Time{Time: time.Now()}
acceptor.SetSyncedCondition(corev1.ConditionTrue, "", "")
Expand Down Expand Up @@ -311,37 +293,34 @@ func (r *PeeringAcceptorController) getExistingSecret(ctx context.Context, name

// createOrUpdateK8sSecret creates a secret and uses the controller's K8s client to apply the secret. It checks if
// there's an existing secret with the same name and makes sure to update the existing secret if so.
func (r *PeeringAcceptorController) createOrUpdateK8sSecret(ctx context.Context, acceptor *consulv1alpha1.PeeringAcceptor, resp *api.PeeringGenerateTokenResponse) (string, error) {
func (r *PeeringAcceptorController) createOrUpdateK8sSecret(ctx context.Context, acceptor *consulv1alpha1.PeeringAcceptor, resp *api.PeeringGenerateTokenResponse) error {
secretName := acceptor.Secret().Name
secretNamespace := acceptor.Namespace
secret := createSecret(secretName, secretNamespace, acceptor.Secret().Key, resp.PeeringToken)
existingSecret, err := r.getExistingSecret(ctx, secretName, secretNamespace)
if err != nil {
return "", err
return err
}
if existingSecret != nil {
if err := r.Client.Update(ctx, secret); err != nil {
return "", err
return err
}

} else {
if err := r.Client.Create(ctx, secret); err != nil {
return "", err
return err
}
}
return secret.ResourceVersion, nil
return nil
}

func (r *PeeringAcceptorController) deleteK8sSecret(ctx context.Context, acceptor *consulv1alpha1.PeeringAcceptor) error {
secretName := acceptor.Secret().Name
secretNamespace := acceptor.Namespace
secret := createSecret(secretName, secretNamespace, "", "")
existingSecret, err := r.getExistingSecret(ctx, secretName, secretNamespace)
func (r *PeeringAcceptorController) deleteK8sSecret(ctx context.Context, name, namespace string) error {
existingSecret, err := r.getExistingSecret(ctx, name, namespace)
if err != nil {
return err
}
if existingSecret != nil {
if err := r.Client.Delete(ctx, secret); err != nil {
if err := r.Client.Delete(ctx, existingSecret); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit bfd6407

Please sign in to comment.