Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peering: generate new token only on user-triggered events #1399

Merged
merged 7 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ commands:
${ENABLE_ENTERPRISE:+-enable-enterprise} \
-enable-multi-cluster \
-debug-directory="$TEST_RESULTS/debug" \
-consul-image=docker.mirror.hashicorp.services/hashicorppreview/consul-enterprise:1.13-dev \
-consul-k8s-image=<< parameters.consul-k8s-image >>
then
echo "Tests in ${pkg} failed, aborting early"
Expand Down Expand Up @@ -132,6 +133,7 @@ commands:
-enable-multi-cluster \
${ENABLE_ENTERPRISE:+-enable-enterprise} \
-debug-directory="$TEST_RESULTS/debug" \
-consul-image=docker.mirror.hashicorp.services/hashicorppreview/consul-enterprise:1.13-dev \
-consul-k8s-image=<< parameters.consul-k8s-image >>

jobs:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ FEATURES:
* Support new expose-servers Kubernetes Service deployed by Helm chart to expose the Consul servers, and using the service address in the peering token. [[GH-1378](https://github.com/hashicorp/consul-k8s/pull/1378)]
* Support non-default partitions by using `externalServers.hosts` as the server addresses in the peering token. [[GH-1384](https://github.com/hashicorp/consul-k8s/pull/1384)]
* Support arbitrary addresses as the server addresses in the peering token via `global.peering.tokenGeneration.source="static"` and `global.peering.tokenGeneration.static=["sample-server-address:8502"]`. [[GH-1392](https://github.com/hashicorp/consul-k8s/pull/1392)]
* Generate new peering token only on user-triggered events. [[GH-1399](https://github.com/hashicorp/consul-k8s/pull/1399)]

## 0.46.1 (July 26, 2022)

Expand Down
4 changes: 2 additions & 2 deletions acceptance/framework/k8s/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ func KubectlApplyK(t *testing.T, options *k8s.KubectlOptions, kustomizeDir strin
// deletes it from the cluster by running 'kubectl delete -f'.
// If there's an error deleting the file, fail the test.
func KubectlDelete(t *testing.T, options *k8s.KubectlOptions, configPath string) {
_, err := RunKubectlAndGetOutputE(t, options, "delete", "-f", configPath)
_, err := RunKubectlAndGetOutputE(t, options, "delete", "--timeout=60s", "-f", configPath)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding these so that we don't hang forever in case CRD deletion hands

require.NoError(t, err)
}

// KubectlDeleteK takes a path to a kustomize directory and
// deletes it from the cluster by running 'kubectl delete -k'.
// If there's an error deleting the file, fail the test.
func KubectlDeleteK(t *testing.T, options *k8s.KubectlOptions, kustomizeDir string) {
_, err := RunKubectlAndGetOutputE(t, options, "delete", "-k", kustomizeDir)
_, err := RunKubectlAndGetOutputE(t, options, "delete", "--timeout=60s", "-k", kustomizeDir)
require.NoError(t, err)
}

Expand Down
12 changes: 5 additions & 7 deletions acceptance/tests/peering/peering_connect_namespaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ func TestPeering_ConnectNamespaces(t *testing.T) {
false,
},
{
"default destination namespace",
"default destination namespace; secure",
defaultNamespace,
false,
true,
},
{
"single destination namespace",
"single destination namespace; secure",
staticServerNamespace,
false,
true,
},
{
"mirror k8s namespaces",
"mirror k8s namespaces; secure",
staticServerNamespace,
true,
true,
Expand All @@ -96,8 +96,6 @@ func TestPeering_ConnectNamespaces(t *testing.T) {
"global.peering.enabled": "true",
"global.enableConsulNamespaces": "true",

"global.image": "ndhanushkodi/consul-dev:ent-backoff-fix",

"global.tls.enabled": "true",
"global.tls.httpsOnly": strconv.FormatBool(c.ACLsAndAutoEncryptEnabled),
"global.tls.enableAutoEncrypt": strconv.FormatBool(c.ACLsAndAutoEncryptEnabled),
Expand Down Expand Up @@ -177,9 +175,9 @@ func TestPeering_ConnectNamespaces(t *testing.T) {
// Ensure the secret is created.
timer := &retry.Timer{Timeout: 1 * time.Minute, Wait: 1 * time.Second}
retry.RunWith(timer, t, func(r *retry.R) {
acceptorSecretResourceVersion, err := k8s.RunKubectlAndGetOutputE(t, staticClientPeerClusterContext.KubectlOptions(t), "get", "peeringacceptor", "server", "-o", "jsonpath={.status.secret.resourceVersion}")
acceptorSecretName, err := k8s.RunKubectlAndGetOutputE(t, staticClientPeerClusterContext.KubectlOptions(t), "get", "peeringacceptor", "server", "-o", "jsonpath={.status.secret.name}")
require.NoError(r, err)
require.NotEmpty(r, acceptorSecretResourceVersion)
require.NotEmpty(r, acceptorSecretName)
})

// Copy secret from client peer to server peer.
Expand Down
6 changes: 2 additions & 4 deletions acceptance/tests/peering/peering_connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ func TestPeering_Connect(t *testing.T) {
commonHelmValues := map[string]string{
"global.peering.enabled": "true",

"global.image": "ndhanushkodi/consul-dev:ent-backoff-fix",

"global.tls.enabled": "true",
"global.tls.httpsOnly": strconv.FormatBool(c.ACLsAndAutoEncryptEnabled),
"global.tls.enableAutoEncrypt": strconv.FormatBool(c.ACLsAndAutoEncryptEnabled),
Expand Down Expand Up @@ -132,9 +130,9 @@ func TestPeering_Connect(t *testing.T) {
// Ensure the secret is created.
timer := &retry.Timer{Timeout: 1 * time.Minute, Wait: 1 * time.Second}
retry.RunWith(timer, t, func(r *retry.R) {
acceptorSecretResourceVersion, err := k8s.RunKubectlAndGetOutputE(t, staticClientPeerClusterContext.KubectlOptions(t), "get", "peeringacceptor", "server", "-o", "jsonpath={.status.secret.resourceVersion}")
acceptorSecretName, err := k8s.RunKubectlAndGetOutputE(t, staticClientPeerClusterContext.KubectlOptions(t), "get", "peeringacceptor", "server", "-o", "jsonpath={.status.secret.name}")
require.NoError(r, err)
require.NotEmpty(r, acceptorSecretResourceVersion)
require.NotEmpty(r, acceptorSecretName)
})

// Copy secret from client peer to server peer.
Expand Down
164 changes: 72 additions & 92 deletions control-plane/connect-inject/peering_acceptor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ const (
// - If the resource exists, and a peering does exist in Consul, it should be reconciled.
// - If the status of the resource does not match the current state of the specified secret, generate a new token
// and store it according to the spec.
//
// NOTE: It is possible that Reconcile is called multiple times concurrently because we're watching
// two different resource kinds. As a result, we need to make sure that the code in this method
// is thread-safe. For example, we may need to fetch the resource again before writing because another
// call to Reconcile could have modified it, and so we need to make sure that we're updating the latest version.
func (r *PeeringAcceptorController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.Log.Info("received request for PeeringAcceptor", "name", req.Name, "ns", req.Namespace)

Expand Down Expand Up @@ -92,7 +97,7 @@ func (r *PeeringAcceptorController) Reconcile(ctx context.Context, req ctrl.Requ
r.Log.Info("PeeringAcceptor was deleted, deleting from Consul", "name", req.Name, "ns", req.Namespace)
err := r.deletePeering(ctx, req.Name)
if acceptor.Secret().Backend == "kubernetes" {
err = r.deleteK8sSecret(ctx, acceptor)
err = r.deleteK8sSecret(ctx, acceptor.Secret().Name, acceptor.Namespace)
}
if err != nil {
return ctrl.Result{}, err
Expand All @@ -116,20 +121,14 @@ func (r *PeeringAcceptorController) Reconcile(ctx context.Context, req ctrl.Requ
serverExternalAddresses = r.TokenServerAddresses
}

statusSecretSet := acceptor.SecretRef() != nil

// existingStatusSecret will be nil if it doesn't exist, and have the contents of the secret if it does exist.
var existingStatusSecret *corev1.Secret
if statusSecretSet {
existingStatusSecret, err = r.getExistingSecret(ctx, acceptor.SecretRef().Name, acceptor.Namespace)
if err != nil {
r.updateStatusError(ctx, acceptor, KubernetesError, err)
return ctrl.Result{}, err
}
// existingSecret will be nil if it doesn't exist, and have the contents of the secret if it does exist.
existingSecret, err := r.getExistingSecret(ctx, acceptor.Secret().Name, acceptor.Namespace)
if err != nil {
r.Log.Error(err, "error retrieving existing secret", "name", acceptor.Secret().Name)
r.updateStatusError(ctx, acceptor, KubernetesError, err)
return ctrl.Result{}, err
}

var secretResourceVersion string

// Read the peering from Consul.
peering, _, err := r.ConsulClient.Peerings().Read(ctx, acceptor.Name, nil)
if err != nil {
Expand All @@ -142,14 +141,11 @@ func (r *PeeringAcceptorController) Reconcile(ctx context.Context, req ctrl.Requ
if peering == nil {
r.Log.Info("peering doesn't exist in Consul; creating new peering", "name", acceptor.Name)

if statusSecretSet {
if existingStatusSecret != nil {
r.Log.Info("stale secret in status; deleting stale secret", "name", acceptor.Name)
err := r.Client.Delete(ctx, existingStatusSecret)
if err != nil {
r.updateStatusError(ctx, acceptor, KubernetesError, err)
return ctrl.Result{}, err
}
if acceptor.SecretRef() != nil {
r.Log.Info("stale secret in status; deleting stale secret", "name", acceptor.Name, "secret-name", acceptor.SecretRef().Name)
if err := r.deleteK8sSecret(ctx, acceptor.SecretRef().Name, acceptor.Namespace); err != nil {
r.updateStatusError(ctx, acceptor, KubernetesError, err)
return ctrl.Result{}, err
}
}
// Generate and store the peering token.
Expand All @@ -159,62 +155,50 @@ func (r *PeeringAcceptorController) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, err
}
if acceptor.Secret().Backend == "kubernetes" {
secretResourceVersion, err = r.createOrUpdateK8sSecret(ctx, acceptor, resp)
if err != nil {
if err := r.createOrUpdateK8sSecret(ctx, acceptor, resp); err != nil {
r.updateStatusError(ctx, acceptor, KubernetesError, err)
return ctrl.Result{}, err
}
}
// Store the state in the status.
err := r.updateStatus(ctx, acceptor, secretResourceVersion)
return ctrl.Result{}, err
} else if err != nil {
r.Log.Error(err, "failed to get Peering from Consul", "name", req.Name)
err := r.updateStatus(ctx, req.NamespacedName)
return ctrl.Result{}, err
}

// TODO(peering): Verify that the existing peering in Consul is an acceptor peer. If it is a dialing peer, an error should be thrown.

// If the peering does exist in Consul, figure out whether to generate and store a new token by comparing the secret
// in the status to the resource version of the secret. If no secret is specified in the status, shouldGenerate will
// be set to true.
var shouldGenerate bool
var nameChanged bool
if statusSecretSet {
shouldGenerate, nameChanged, err = shouldGenerateToken(acceptor, existingStatusSecret)
if err != nil {
r.updateStatusError(ctx, acceptor, InternalError, err)
return ctrl.Result{}, err
}
} else {
shouldGenerate = true
r.Log.Info("peering exists in Consul")

// If the peering does exist in Consul, figure out whether to generate and store a new token.
shouldGenerate, nameChanged, err := shouldGenerateToken(acceptor, existingSecret)
if err != nil {
r.updateStatusError(ctx, acceptor, InternalError, err)
return ctrl.Result{}, err
}

if shouldGenerate {
// Generate and store the peering token.
var resp *api.PeeringGenerateTokenResponse
r.Log.Info("generating new token for an existing peering")
if resp, err = r.generateToken(ctx, acceptor.Name, serverExternalAddresses); err != nil {
return ctrl.Result{}, err
}
if acceptor.Secret().Backend == "kubernetes" {
secretResourceVersion, err = r.createOrUpdateK8sSecret(ctx, acceptor, resp)
if err != nil {
if err = r.createOrUpdateK8sSecret(ctx, acceptor, resp); err != nil {
return ctrl.Result{}, err
}
}
// Delete the existing secret if the name changed. This needs to come before updating the status if we do generate a new token.
if nameChanged {
if existingStatusSecret != nil {
err := r.Client.Delete(ctx, existingStatusSecret)
if err != nil {
r.updateStatusError(ctx, acceptor, ConsulAgentError, err)
return ctrl.Result{}, err
}
if nameChanged && acceptor.SecretRef() != nil {
r.Log.Info("stale secret in status; deleting stale secret", "name", acceptor.Name, "secret-name", acceptor.SecretRef().Name)
if err = r.deleteK8sSecret(ctx, acceptor.SecretRef().Name, acceptor.Namespace); err != nil {
r.updateStatusError(ctx, acceptor, KubernetesError, err)
return ctrl.Result{}, err
}
}

// Store the state in the status.
err := r.updateStatus(ctx, acceptor, secretResourceVersion)
err := r.updateStatus(ctx, req.NamespacedName)
return ctrl.Result{}, err
}

Expand All @@ -223,48 +207,47 @@ func (r *PeeringAcceptorController) Reconcile(ctx context.Context, req ctrl.Requ

// shouldGenerateToken returns whether a token should be generated, and whether the name of the secret has changed. It
// compares the spec secret's name/key/backend and resource version with the name/key/backend and resource version of the status secret's.
func shouldGenerateToken(acceptor *consulv1alpha1.PeeringAcceptor, existingStatusSecret *corev1.Secret) (shouldGenerate bool, nameChanged bool, err error) {
if acceptor.SecretRef() == nil {
return false, false, errors.New("shouldGenerateToken was called with an empty fields in the existing status")
}
// Compare the existing name, key, and backend.
if acceptor.SecretRef().Name != acceptor.Secret().Name {
return true, true, nil
}
if acceptor.SecretRef().Key != acceptor.Secret().Key {
return true, false, nil
}
// TODO(peering): remove this when validation webhook exists.
if acceptor.SecretRef().Backend != acceptor.Secret().Backend {
return false, false, errors.New("PeeringAcceptor backend cannot be changed")
}
if peeringVersionString, ok := acceptor.Annotations[annotationPeeringVersion]; ok {
peeringVersion, err := strconv.ParseUint(peeringVersionString, 10, 64)
if err != nil {
return false, false, err
func shouldGenerateToken(acceptor *consulv1alpha1.PeeringAcceptor, existingSecret *corev1.Secret) (shouldGenerate bool, nameChanged bool, err error) {
if acceptor.SecretRef() != nil {
// Compare the existing name, key, and backend.
if acceptor.SecretRef().Name != acceptor.Secret().Name {
return true, true, nil
}
if acceptor.Status.LatestPeeringVersion == nil || *acceptor.Status.LatestPeeringVersion < peeringVersion {
if acceptor.SecretRef().Key != acceptor.Secret().Key {
return true, false, nil
}
}
// Compare the existing secret resource version.
// Get the secret specified by the status, make sure it matches the status' secret.ResourceVersion.
if existingStatusSecret != nil {
if existingStatusSecret.ResourceVersion != acceptor.SecretRef().ResourceVersion {
return true, false, nil
// TODO(peering): remove this when validation webhook exists.
if acceptor.SecretRef().Backend != acceptor.Secret().Backend {
return false, false, errors.New("PeeringAcceptor backend cannot be changed")
}
if peeringVersionString, ok := acceptor.Annotations[annotationPeeringVersion]; ok {
peeringVersion, err := strconv.ParseUint(peeringVersionString, 10, 64)
if err != nil {
return false, false, err
}
if acceptor.Status.LatestPeeringVersion == nil || *acceptor.Status.LatestPeeringVersion < peeringVersion {
return true, false, nil
}
}
}

} else {
if existingSecret == nil {
return true, false, nil
}

return false, false, nil
}

// updateStatus updates the peeringAcceptor's secret in the status.
func (r *PeeringAcceptorController) updateStatus(ctx context.Context, acceptor *consulv1alpha1.PeeringAcceptor, secretResourceVersion string) error {
func (r *PeeringAcceptorController) updateStatus(ctx context.Context, acceptorObjKey types.NamespacedName) error {
// Get the latest resource before we update it.
acceptor := &consulv1alpha1.PeeringAcceptor{}
err := r.Client.Get(ctx, acceptorObjKey, acceptor)
if err != nil {
return fmt.Errorf("error fetching acceptor resource before status update: %w", err)
}
acceptor.Status.SecretRef = &consulv1alpha1.SecretRefStatus{
Secret: *acceptor.Secret(),
ResourceVersion: secretResourceVersion,
Secret: *acceptor.Secret(),
}
acceptor.Status.LastSyncedTime = &metav1.Time{Time: time.Now()}
acceptor.SetSyncedCondition(corev1.ConditionTrue, "", "")
Expand All @@ -278,7 +261,7 @@ func (r *PeeringAcceptorController) updateStatus(ctx context.Context, acceptor *
acceptor.Status.LatestPeeringVersion = pointerToUint64(peeringVersion)
}
}
err := r.Status().Update(ctx, acceptor)
err = r.Status().Update(ctx, acceptor)
if err != nil {
r.Log.Error(err, "failed to update PeeringAcceptor status", "name", acceptor.Name, "namespace", acceptor.Namespace)
}
Expand Down Expand Up @@ -311,37 +294,34 @@ func (r *PeeringAcceptorController) getExistingSecret(ctx context.Context, name

// createOrUpdateK8sSecret creates a secret and uses the controller's K8s client to apply the secret. It checks if
// there's an existing secret with the same name and makes sure to update the existing secret if so.
func (r *PeeringAcceptorController) createOrUpdateK8sSecret(ctx context.Context, acceptor *consulv1alpha1.PeeringAcceptor, resp *api.PeeringGenerateTokenResponse) (string, error) {
func (r *PeeringAcceptorController) createOrUpdateK8sSecret(ctx context.Context, acceptor *consulv1alpha1.PeeringAcceptor, resp *api.PeeringGenerateTokenResponse) error {
secretName := acceptor.Secret().Name
secretNamespace := acceptor.Namespace
secret := createSecret(secretName, secretNamespace, acceptor.Secret().Key, resp.PeeringToken)
existingSecret, err := r.getExistingSecret(ctx, secretName, secretNamespace)
if err != nil {
return "", err
return err
}
if existingSecret != nil {
if err := r.Client.Update(ctx, secret); err != nil {
return "", err
return err
}

} else {
if err := r.Client.Create(ctx, secret); err != nil {
return "", err
return err
}
}
return secret.ResourceVersion, nil
return nil
}

func (r *PeeringAcceptorController) deleteK8sSecret(ctx context.Context, acceptor *consulv1alpha1.PeeringAcceptor) error {
secretName := acceptor.Secret().Name
secretNamespace := acceptor.Namespace
secret := createSecret(secretName, secretNamespace, "", "")
existingSecret, err := r.getExistingSecret(ctx, secretName, secretNamespace)
func (r *PeeringAcceptorController) deleteK8sSecret(ctx context.Context, name, namespace string) error {
existingSecret, err := r.getExistingSecret(ctx, name, namespace)
if err != nil {
return err
}
if existingSecret != nil {
if err := r.Client.Delete(ctx, secret); err != nil {
if err := r.Client.Delete(ctx, existingSecret); err != nil {
return err
}
}
Expand Down
Loading