Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ECK IPv6 compatible #3654

Merged
merged 20 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
controllerscheme "github.com/elastic/cloud-on-k8s/pkg/controller/common/scheme"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings"
"github.com/elastic/cloud-on-k8s/pkg/controller/enterprisesearch"
"github.com/elastic/cloud-on-k8s/pkg/controller/kibana"
"github.com/elastic/cloud-on-k8s/pkg/controller/license"
Expand All @@ -52,6 +53,7 @@ import (
"github.com/spf13/viper"
"go.elastic.co/apm"
"go.uber.org/automaxprocs/maxprocs"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -184,6 +186,11 @@ func Command() *cobra.Command {
false,
"Enables a validating webhook server in the operator process.",
)
cmd.Flags().String(
operator.IPFamilyFlag,
"",
"Set the IP family to use. Possible values: IPv4, IPv6, \"\" (= auto-detect) ",
)
cmd.Flags().Bool(
operator.ManageWebhookCertsFlag,
true,
Expand Down Expand Up @@ -440,6 +447,12 @@ func startOperator(stopChan <-chan struct{}) error {

log.V(1).Info("Using certificate rotation parameters", operator.CertValidityFlag, certValidity, operator.CertRotateBeforeFlag, certRotateBefore)

ipFamily, err := chooseAndValidateIPFamily(viper.GetString(operator.IPFamilyFlag), net.ToIPFamily(os.Getenv(settings.EnvPodIP)))
if err != nil {
log.Error(err, "Invalid IP family parameter")
return err
}

// Setup a client to set the operator uuid config map
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
Expand All @@ -460,6 +473,7 @@ func startOperator(stopChan <-chan struct{}) error {
}
params := operator.Parameters{
Dialer: dialer,
IPFamily: ipFamily,
OperatorNamespace: operatorNamespace,
OperatorInfo: operatorInfo,
CACertRotation: certificates.RotationParams{
Expand Down Expand Up @@ -523,6 +537,19 @@ func asyncTasks(mgr manager.Manager, cfg *rest.Config, managedNamespaces []strin
garbageCollectUsers(cfg, managedNamespaces)
}

func chooseAndValidateIPFamily(ipFamilyStr string, ipFamilyDefault corev1.IPFamily) (corev1.IPFamily, error) {
switch strings.ToLower(ipFamilyStr) {
case "":
return ipFamilyDefault, nil
case "ipv4":
return corev1.IPv4Protocol, nil
case "ipv6":
return corev1.IPv6Protocol, nil
default:
return ipFamilyDefault, fmt.Errorf("IP family can be one of: IPv4, IPv6 or \"\" to auto-detect, but was %s", ipFamilyStr)
}
}

func registerControllers(mgr manager.Manager, params operator.Parameters, accessReviewer rbac.AccessReviewer) error {
controllers := []struct {
name string
Expand Down
4 changes: 4 additions & 0 deletions config/e2e/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ spec:
- name: ELASTIC_APM_ENVIRONMENT
value: {{ .Pipeline }}-{{ .BuildNumber }}-{{ .Provider }}-{{ .ClusterName }}-{{ .KubernetesVersion }}-{{ .ElasticStackVersion }}
{{end}}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
resources:
limits:
cpu: 1
Expand Down
1 change: 1 addition & 0 deletions docs/operating-eck/operator-config.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ ECK can be configured using either command line flags or environment variables.
|enable-tracing | false | Enable APM tracing in the operator process. Use environment variables to configure APM server URL, credentials, and so on. See link:https://www.elastic.co/guide/en/apm/agent/go/1.x/configuration.html[Apm Go Agent reference] for details.
|enable-webhook | false | Enables a validating webhook server in the operator process.
|enforce-rbac-on-refs| false | Enables restrictions on cross-namespace resource association through RBAC.
|ip-family|""| Set the IP family to use. Possible values: IPv4, IPv6, "" (= auto-detect)
|log-verbosity |0 |Verbosity level of logs. `-2`=Error, `-1`=Warn, `0`=Info, `0` and above=Debug.
|manage-webhook-certs |true |Enables automatic webhook certificate management.
|max-concurrent-reconciles |3 | Maximum number of concurrent reconciles per controller (Elasticsearch, Kibana, APM Server). Affects the ability of the operator to process changes concurrently.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
{{- if .Values.config.webhook.enabled }}
- name: WEBHOOK_SECRET
value: "{{ .Values.config.webhook.certsSecret }}"
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/elasticsearch/v1/validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func validSanIP(es *Elasticsearch) field.ErrorList {
if selfSignedCerts != nil {
for _, san := range selfSignedCerts.SubjectAlternativeNames {
if san.IP != "" {
ip := netutil.MaybeIPTo4(net.ParseIP(san.IP))
ip := netutil.IPToRFCForm(net.ParseIP(san.IP))
if ip == nil {
errs = append(errs, field.Invalid(field.NewPath("spec").Child("http", "tls", "selfSignedCertificate", "subjectAlternativeNames"), san.IP, invalidSanIPErrMsg))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/elasticsearch/v1beta1/validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func validSanIP(es *Elasticsearch) field.ErrorList {
if selfSignedCerts != nil {
for _, san := range selfSignedCerts.SubjectAlternativeNames {
if san.IP != "" {
ip := netutil.MaybeIPTo4(net.ParseIP(san.IP))
ip := netutil.IPToRFCForm(net.ParseIP(san.IP))
if ip == nil {
errs = append(errs, field.Invalid(field.NewPath("spec").Child("http", "tls", "selfSignedCertificate", "subjectAlternativeNames"), san.IP, invalidSanIPErrMsg))
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/controller/common/certificates/http_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@ import (
"strings"
"time"

commonv1 "github.com/elastic/cloud-on-k8s/pkg/apis/common/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/name"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
netutil "github.com/elastic/cloud-on-k8s/pkg/utils/net"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

commonv1 "github.com/elastic/cloud-on-k8s/pkg/apis/common/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/name"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
netutil "github.com/elastic/cloud-on-k8s/pkg/utils/net"
)

// ReconcilePublicHTTPCerts reconciles the Secret containing the HTTP Certificate currently in use, and the CA of
Expand Down Expand Up @@ -362,7 +361,7 @@ func createValidatedHTTPCertificateTemplate(
dnsNames = append(dnsNames, san.DNS)
}
if san.IP != "" {
ipAddresses = append(ipAddresses, netutil.MaybeIPTo4(net.ParseIP(san.IP)))
ipAddresses = append(ipAddresses, netutil.IPToRFCForm(net.ParseIP(san.IP)))
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/common/operator/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
EnableTracingFlag = "enable-tracing"
EnableWebhookFlag = "enable-webhook"
EnforceRBACOnRefsFlag = "enforce-rbac-on-refs"
IPFamilyFlag = "ip-family"
ManageWebhookCertsFlag = "manage-webhook-certs"
MaxConcurrentReconcilesFlag = "max-concurrent-reconciles"
MetricsPortFlag = "metrics-port"
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/common/operator/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package operator

import (
"go.elastic.co/apm"
corev1 "k8s.io/api/core/v1"

"github.com/elastic/cloud-on-k8s/pkg/about"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/certificates"
Expand All @@ -20,6 +21,8 @@ type Parameters struct {
OperatorInfo about.OperatorInfo
// Dialer is used to create the Elasticsearch HTTP client.
Dialer net.Dialer
// IPFamily represents the IP family to use when creating configuration and services.
IPFamily corev1.IPFamily
// CACertRotation defines the rotation params for CA certificates.
CACertRotation certificates.RotationParams
// CertRotation defines the rotation params for non-CA certificates.
Expand Down
16 changes: 11 additions & 5 deletions pkg/controller/elasticsearch/certificates/transport/csr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import (
"net"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/certificates"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec"
netutil "github.com/elastic/cloud-on-k8s/pkg/utils/net"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
)

// createValidatedCertificateTemplate validates a CSR and creates a certificate template.
Expand Down Expand Up @@ -71,6 +72,9 @@ func buildGeneralNames(
return nil, errors.Errorf("pod currently has no valid IP, found: [%s]", pod.Status.PodIP)
}

ssetName := pod.Labels[label.StatefulSetNameLabelName]
svcName := nodespec.HeadlessServiceName(ssetName)

commonName := buildCertificateCommonName(pod, cluster.Name, cluster.Namespace)

commonNameUTF8OtherName := &certificates.UTF8StringValuedOtherName{
Expand All @@ -90,8 +94,10 @@ func buildGeneralNames(
// add the transport service name for remote cluster connections initially connecting through the service
// the DNS name has to match the seed hosts configured in the remote cluster settings
{DNSName: fmt.Sprintf("%s.%s.svc", esv1.TransportService(cluster.Name), cluster.Namespace)},
{IPAddress: netutil.MaybeIPTo4(podIP)},
{IPAddress: net.ParseIP("127.0.0.1").To4()},
// add the resolvable DNS name of the Pod as published by Elasticsearch
{DNSName: fmt.Sprintf("%s.%s", pod.Name, svcName)},
{IPAddress: netutil.IPToRFCForm(podIP)},
{IPAddress: netutil.IPToRFCForm(netutil.LoopbackFor(netutil.ToIPFamily(podIP.String())))},
}

return generalNames, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import (
"net"
"testing"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/certificates"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/certificates"
)

// roundTripSerialize does a serialization round-trip of the certificate in order to make sure any extra extensions
Expand Down Expand Up @@ -93,6 +92,7 @@ func Test_buildGeneralNames(t *testing.T) {
{OtherName: *otherName},
{DNSName: expectedCommonName},
{DNSName: expectedTransportSvcName},
{DNSName: "test-pod-name.test-sset"},
{IPAddress: net.ParseIP(testIP).To4()},
{IPAddress: net.ParseIP("127.0.0.1").To4()},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"crypto/x509/pkix"
"encoding/pem"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/certificates"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// fixtures
Expand All @@ -34,6 +34,9 @@ var (
testPod = corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-name",
Labels: map[string]string{
label.StatefulSetNameLabelName: "test-sset",
},
},
Status: corev1.PodStatus{
PodIP: testIP,
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/elasticsearch/nodespec/podspec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings"
"github.com/elastic/cloud-on-k8s/pkg/utils/pointer"
"github.com/go-test/deep"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -225,7 +224,7 @@ func TestBuildPodTemplateSpec(t *testing.T) {
Labels: map[string]string{
"common.k8s.elastic.co/type": "elasticsearch",
"elasticsearch.k8s.elastic.co/cluster-name": "name",
"elasticsearch.k8s.elastic.co/config-hash": "2311754148",
"elasticsearch.k8s.elastic.co/config-hash": "3785137207",
"elasticsearch.k8s.elastic.co/http-scheme": "https",
"elasticsearch.k8s.elastic.co/node-data": "false",
"elasticsearch.k8s.elastic.co/node-ingest": "true",
Expand Down
12 changes: 10 additions & 2 deletions pkg/controller/elasticsearch/nodespec/readiness_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,17 @@ else
BASIC_AUTH=''
fi

# Check if we are using IPv6
if [[ $POD_IP =~ .*:.* ]]; then
LOOPBACK="[::1]"
else
LOOPBACK=127.0.01
fi

# request Elasticsearch on /
ENDPOINT="${READINESS_PROBE_PROTOCOL:-https}://127.0.0.1:9200/"
status=$(curl -o /dev/null -w "%{http_code}" --max-time ${READINESS_PROBE_TIMEOUT} -XGET -s -k ${BASIC_AUTH} $ENDPOINT)
# we are turning globbing off to allow for unescaped [] in case of IPv6
ENDPOINT="${READINESS_PROBE_PROTOCOL:-https}://${LOOPBACK}:9200/"
status=$(curl -o /dev/null -w "%{http_code}" --max-time ${READINESS_PROBE_TIMEOUT} -XGET -g -s -k ${BASIC_AUTH} $ENDPOINT)
curl_rc=$?

if [[ ${curl_rc} -ne 0 ]]; then
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/elasticsearch/nodespec/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func HeadlessService(es *esv1.Elasticsearch, ssetName string) corev1.Service {
Port: network.HTTPPort,
},
},
// allow nodes to discover themselves via DNS while they are booting up ie. are not ready yet
PublishNotReadyAddresses: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will want to flag this in the release notes since it could cause issues if people were relying on the old behavior

},
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/elasticsearch/settings/masters.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package settings

import (
"context"
"fmt"
"net"
"reflect"
"sort"
"strconv"
"strings"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
Expand Down Expand Up @@ -58,7 +59,7 @@ func UpdateSeedHostsConfigMap(
if len(master.Status.PodIP) > 0 { // do not add pod with no IPs
seedHosts = append(
seedHosts,
fmt.Sprintf("%s:%d", master.Status.PodIP, network.TransportPort),
net.JoinHostPort(master.Status.PodIP, strconv.Itoa(network.TransportPort)),
)
}
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/controller/elasticsearch/settings/masters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,20 @@ func TestUpdateSeedHostsConfigMap(t *testing.T) {
wantErr: false,
expectedContent: "10.0.3.3:9300\n10.0.6.5:9300\n10.0.9.2:9300",
},
{
name: "Can handle IPv6 addresses",
args: args{
pods: []corev1.Pod{ //
newPodWithIP("master2", "fd00:10:244:0:2::3", true),
newPodWithIP("master3", "fd00:10:244:0:2::5", true),
newPodWithIP("master1", "fd00:10:244:0:2::2", true),
pebrc marked this conversation as resolved.
Show resolved Hide resolved
},
c: k8s.WrappedFakeClient(),
es: es,
},
wantErr: false,
expectedContent: "[fd00:10:244:0:2::2]:9300\n[fd00:10:244:0:2::3]:9300\n[fd00:10:244:0:2::5]:9300",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/elasticsearch/settings/merged_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func baseConfig(clusterName string, ver version.Version) *CanonicalConfig {
esv1.NodeName: "${" + EnvPodName + "}",
esv1.ClusterName: clusterName,

// derive IP dynamically from the pod IP, injected as env var
esv1.NetworkPublishHost: "${" + EnvPodIP + "}",
esv1.NetworkHost: "0.0.0.0",
// use the DNS name as the publish host
esv1.NetworkPublishHost: fmt.Sprintf("${%s}.${%s}", EnvPodName, HeadlessServiceName),
esv1.NetworkHost: "0",

// allow ES to be aware of k8s node the pod is running on when allocating shards
esv1.ShardAwarenessAttributes: nodeAttrK8sNodeName,
Expand Down
5 changes: 2 additions & 3 deletions pkg/controller/elasticsearch/settings/merged_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"bytes"
"testing"

"github.com/stretchr/testify/require"

commonv1 "github.com/elastic/cloud-on-k8s/pkg/apis/common/v1"
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/version"
"github.com/stretchr/testify/require"
)

func TestNewMergedESConfig(t *testing.T) {
Expand Down Expand Up @@ -159,7 +158,7 @@ func TestNewMergedESConfig(t *testing.T) {
cfgBytes, err := cfg.Render()
require.NoError(t, err)
// default config is still there
require.True(t, bytes.Contains(cfgBytes, []byte("publish_host: ${POD_IP}")))
require.True(t, bytes.Contains(cfgBytes, []byte("publish_host: ${POD_NAME}.${HEADLESS_SERVICE_NAME}")))
// but has been overridden
require.True(t, bytes.Contains(cfgBytes, []byte("seed_providers: something-else")))
require.Equal(t, 1, bytes.Count(cfgBytes, []byte("seed_providers:")))
Expand Down
Loading