Skip to content

Commit

Permalink
Merge pull request #36 from ganganxiaojiu/main
Browse files Browse the repository at this point in the history
specify the storage pool of minio and olap
  • Loading branch information
nineinfra authored Jan 7, 2024
2 parents 90e3ddf + 3489e9a commit 2814a37
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 24 deletions.
16 changes: 16 additions & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,3 +553,19 @@ func GetReadyDirectPVVolumes(dpclient *directpvv1beta1.DirectpvV1beta1Client, ns
}
return &directpvv1beta1.DirectPVVolumeList{Items: specificDirectPVList}, nil
}

func CheckStoragePoolValid(sp string) bool {
path, _ := rootCmd.Flags().GetString(kubeconfig)
client, err := GetKubeClient(path)
if err != nil {
return false
}
_, err = client.StorageV1().StorageClasses().Get(context.TODO(), sp, metav1.GetOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return false
}
if k8serrors.IsNotFound(err) {
return false
}
return true
}
16 changes: 13 additions & 3 deletions cmd/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ const (
DefaultThriftPortName = "thrift-binary"
DefaultCMDHelm = "helm"
DefaultCMDDirectPV = "kubectl-directpv"
DefaultKyuubiUserName = "hive"
DefaultKyuubiVersion = "1.8.0"
DefaultScalaVersion = "2.12"
DefaultNineInfraPrefix = "nineinfra"
GiMultiplier = 1024 * 1024 * 1024
)
Expand All @@ -34,6 +31,8 @@ const (
DefaultToolNifiUserName = "admin"
DefaultToolNifiUserPWD = "nineinfraadmin"
DefaultZookeeperSVCName = DefaultToolsNamePrefix + "zookeeper-headless"
DefaultAirflowPVCLabelKey = "release"
DefaultZookeeperPVCLabelKey = "app.kubernetes.io/instance"
)

const (
Expand Down Expand Up @@ -73,10 +72,21 @@ var (
DefaultDorisAdminUser = "root"
DefaultDorisAdminPassword = ""
DefaultDorisDatabaseName = "nineinfra"
DefaultDorisFERepo = "selectdb/doris.fe-ubuntu"
DefaultDorisFEVersion = "2.0.2"
DefaultDorisFERepoPullPolicy = "IfNotPresent"
DefaultDorisFEStoragePVSize = 20
DefaultDorisBERepo = "selectdb/doris.be-ubuntu"
DefaultDorisBEVersion = "2.0.2"
DefaultDorisBERepoPullPolicy = "IfNotPresent"
DefaultDorisBEStoragePVSize = 100
DefaultKyuubiUserName = "hive"
DefaultKyuubiVersion = "1.8.0"
DefaultScalaVersion = "2.12"
DefaultMinioRepo = "minio/minio"
DefaultMinioVersion = "RELEASE.2023-09-07T02-05-02Z"
DefaultMinioRepoPullPolicy = "IfNotPresent"
DefaultDataBaseVersion = "v16.0.0"
)

var DefaultChartList = map[string]string{
Expand Down
80 changes: 74 additions & 6 deletions cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ const (

var (
olapsSupported = "doris"
DorisFeClusterInfo = nineinfrav1alpha1.ClusterInfo{
Type: nineinfrav1alpha1.DorisFEClusterType,
Version: DefaultDorisFEVersion,
Configs: nineinfrav1alpha1.ClusterConfig{
Image: nineinfrav1alpha1.ImageConfig{
Repository: DefaultDorisFERepo,
Tag: DefaultDorisFEVersion,
PullPolicy: DefaultDorisFERepoPullPolicy,
},
},
Resource: nineinfrav1alpha1.ResourceConfig{
ResourceRequirements: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
"storage": *resource.NewQuantity(int64(DefaultDorisFEStoragePVSize*GiMultiplier), resource.BinarySI),
},
},
},
}
DorisBeClusterInfo = nineinfrav1alpha1.ClusterInfo{
Type: nineinfrav1alpha1.DorisBEClusterType,
Version: DefaultDorisBEVersion,
Expand All @@ -38,15 +56,37 @@ var (
},
},
}
MinioClusterInfo = nineinfrav1alpha1.ClusterInfo{
Type: nineinfrav1alpha1.MinioClusterType,
Version: DefaultMinioVersion,
Configs: nineinfrav1alpha1.ClusterConfig{
Image: nineinfrav1alpha1.ImageConfig{
Repository: DefaultMinioRepo,
Tag: DefaultMinioVersion,
PullPolicy: DefaultMinioRepoPullPolicy,
},
},
}
PGClusterInfo = nineinfrav1alpha1.ClusterInfo{
Type: nineinfrav1alpha1.DatabaseClusterType,
SubType: nineinfrav1alpha1.DbTypePostgres,
Version: DefaultDataBaseVersion,
Resource: nineinfrav1alpha1.ResourceConfig{
StorageClass: DefaultStorageClass,
},
}
)

// ClusterOptions encapsulates the CLI options for a NineCluster
type ClusterOptions struct {
Name string
NS string
DataVolume int
OlapVolume int
Olap string
Name string
NS string
DataVolume int
StoragePool string
OlapVolume int
OlapStoragePool string
MetastoreStoragePool string
Olap string
}

type createCmd struct {
Expand All @@ -67,6 +107,21 @@ func (t ClusterOptions) Validate() error {
if t.OlapVolume <= 10 {
return errors.New("olap volume size should not be less than 10")
}
if t.StoragePool != "" {
if !CheckStoragePoolValid(t.StoragePool) {
return errors.New(fmt.Sprintf("storage pool %s may be not exist", t.StoragePool))
}
}
if t.OlapStoragePool != "" {
if !CheckStoragePoolValid(t.OlapStoragePool) {
return errors.New(fmt.Sprintf("olap storage pool %s may be not exist", t.OlapStoragePool))
}
}
if t.MetastoreStoragePool != "" {
if !CheckStoragePoolValid(t.MetastoreStoragePool) {
return errors.New(fmt.Sprintf("metastore storage pool %s may be not exist", t.MetastoreStoragePool))
}
}
return nil
}

Expand Down Expand Up @@ -94,6 +149,9 @@ func newClusterCreateCmd(out io.Writer, errOut io.Writer) *cobra.Command {
f.IntVarP(&c.clusterOpts.DataVolume, "data-volume", "v", 32, "total raw data volumes of the ninecluster,the unit is Gi, e.g. 16")
f.StringVarP(&c.clusterOpts.Olap, "olap", "a", "", fmt.Sprintf("add olap to the ninecluster,support [%s]", olapsSupported))
f.IntVar(&c.clusterOpts.OlapVolume, "olap-volume", 100, "olap storage volume size")
f.StringVarP(&c.clusterOpts.StoragePool, "storage-pool", "s", "", "storage pool for the ninecluster")
f.StringVarP(&c.clusterOpts.OlapStoragePool, "olap-storage-pool", "o", "", "storage pool for olap")
f.StringVarP(&c.clusterOpts.MetastoreStoragePool, "metastore-storage-pool", "m", "", "storage pool for metastore")
f.BoolVar(&DEBUG, "debug", false, "print debug information")
f.StringVarP(&c.clusterOpts.NS, "namespace", "n", "", "k8s namespace for this ninecluster")
return cmd
Expand Down Expand Up @@ -131,10 +189,20 @@ func (c *createCmd) run(_ []string) error {
var userClusterSet []nineinfrav1alpha1.ClusterInfo
if c.clusterOpts.Olap != "" {
features[FeaturesOlapKey] = c.clusterOpts.Olap
userClusterSet = make([]nineinfrav1alpha1.ClusterInfo, 0)
DorisBeClusterInfo.Resource.ResourceRequirements.Requests["storage"] =
*resource.NewQuantity(int64(c.clusterOpts.OlapVolume*GiMultiplier), resource.BinarySI)
DorisBeClusterInfo.Resource.StorageClass = c.clusterOpts.OlapStoragePool
userClusterSet = append(userClusterSet, DorisBeClusterInfo)
DorisFeClusterInfo.Resource.StorageClass = c.clusterOpts.OlapStoragePool
userClusterSet = append(userClusterSet, DorisFeClusterInfo)
}
if c.clusterOpts.StoragePool != "" {
MinioClusterInfo.Resource.StorageClass = c.clusterOpts.StoragePool
userClusterSet = append(userClusterSet, MinioClusterInfo)
}
if c.clusterOpts.MetastoreStoragePool != "" {
PGClusterInfo.Resource.StorageClass = c.clusterOpts.MetastoreStoragePool
userClusterSet = append(userClusterSet, PGClusterInfo)
}
desiredNineCluster := &nineinfrav1alpha1.NineCluster{
ObjectMeta: metav1.ObjectMeta{
Expand Down
4 changes: 0 additions & 4 deletions cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ func constructPVCLabel(name string) string {
return DefaultPVCLabelKey + "=" + name
}

func constructOlabPVCLabel(name string) string {
return DefaultOlapPVCLabelKey + "=" + name + DefaultDorisBENameSuffix
}

func deleteNineInfraPVC(name string, namespace string) error {
if name == "" || namespace == "" {
return nil
Expand Down
40 changes: 29 additions & 11 deletions cmd/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type toolsCmd struct {
toolkitArgs []string // --nodes flag
deletePVC bool
chartPath string
storagepool string
}

type DatabasesConnection struct {
Expand Down Expand Up @@ -89,7 +90,7 @@ func newToolsCmd(out io.Writer, errOut io.Writer) *cobra.Command {
f.StringVar(&DefaultToolNifiSvcType, "nifi-svctype", "NodePort", "service type for nifi ui")
f.StringVar(&DefaultToolAirflowRepository, "airflow-repo", "nineinfra/airflow", "airflow image repository")
f.StringVar(&DefaultToolAirflowTag, "airflow-tag", "2.7.3", "airflow image tag")
f.StringVarP(&DefaultStorageClass, "storage-pool", "s", "nineinfra-default", "storage pool fo tools")
f.StringVarP(&c.storagepool, "storage-pool", "s", DefaultStorageClass, "storage pool for tools")
f.BoolVar(&c.deletePVC, "delete-pvc", false, "delete the ninecluster tools pvcs")
f.StringVarP(&c.chartPath, "chart-path", "p", "", "local path of the charts")
f.StringVarP(&c.ns, "namespace", "n", "", "k8s namespace for tools")
Expand All @@ -115,20 +116,31 @@ func (t *toolsCmd) validate(args []string) error {
return fmt.Errorf("invalid access host %s", DefaultAccessHost)
}
}
if t.storagepool != DefaultStorageClass {
if !CheckStoragePoolValid(t.storagepool) {
return errors.New(fmt.Sprintf("tools storage pool %s may be not exist", t.storagepool))
}
}
return nil
}

func (t *toolsCmd) deleteToolsPVC(name string, namespace string) error {
if name == "" || namespace == "" {
return nil
func (t *toolsCmd) deleteToolsPVC(namespace string) error {
if namespace == "" {
return errors.New("namespace should be supplied when deleting pvc")
}
path, _ := rootCmd.Flags().GetString(kubeconfig)
c, err := GetKubeClient(path)
if err != nil {
return err
}

err = c.CoreV1().PersistentVolumeClaims(namespace).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: constructPVCLabel(name)})
toolsPvcLabel := DefaultAirflowPVCLabelKey + "=" + DefaultToolsNamePrefix + DefaultToolAirflowName
err = c.CoreV1().PersistentVolumeClaims(namespace).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: toolsPvcLabel})
if err != nil {
return err
}
toolsPvcLabel = DefaultZookeeperPVCLabelKey + "=" + DefaultToolsNamePrefix + DefaultToolZookeeperName
err = c.CoreV1().PersistentVolumeClaims(namespace).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: toolsPvcLabel})
if err != nil {
return err
}
Expand Down Expand Up @@ -252,7 +264,7 @@ func (t *toolsCmd) genSupersetParameters(relName string, parameters []string) []

func (t *toolsCmd) genZookeeperParameters(relName string, parameters []string) []string {
params := append(parameters, []string{"--set", fmt.Sprintf("fullnameOverride=%s", relName)}...)
params = append(params, []string{"--set", fmt.Sprintf("persistence.storageClass=%s", DefaultStorageClass)}...)
params = append(params, []string{"--set", fmt.Sprintf("persistence.storageClass=%s", t.storagepool)}...)
params = append(params, []string{"--set", "replicaCount=3"}...)
params = append(params, []string{"--set", "podAntiAffinityPreset=hard"}...)
return params
Expand All @@ -272,7 +284,7 @@ func (t *toolsCmd) genNifiParameters(relName string, parameters []string) []stri
}
params := append(parameters, []string{"--set", "fullnameOverride=" + relName}...)
params = append(params, []string{"--set", "auth.enabled=false"}...)
params = append(params, []string{"--set", fmt.Sprintf("master.persistence.storageClass=%s", DefaultStorageClass)}...)
params = append(params, []string{"--set", fmt.Sprintf("master.persistence.storageClass=%s", t.storagepool)}...)
params = append(params, []string{"--set", fmt.Sprintf("service.type=%s", DefaultToolNifiSvcType)}...)
params = append(params, []string{"--set", fmt.Sprintf("service.nodePort=%d", DefaultToolNifiSvcNodePort)}...)
params = append(params, []string{"--set", fmt.Sprintf("properties.webProxyHost=%s:%d", nodePortIp, DefaultToolNifiSvcNodePort)}...)
Expand All @@ -298,9 +310,9 @@ func (t *toolsCmd) genAirflowParameters(relName string, parameters []string) []s
params = append(params, []string{"--set", fmt.Sprintf("logs.persistence.size=%s", DefaultToolAirflowDiskSize)}...)
params = append(params, []string{"--set", fmt.Sprintf("workers.persistence.size=%s", DefaultToolAirflowDiskSize)}...)
params = append(params, []string{"--set", fmt.Sprintf("triggerer.persistence.size=%s", DefaultToolAirflowDiskSize)}...)
params = append(params, []string{"--set", fmt.Sprintf("workers.persistence.storageClassName=%s", DefaultStorageClass)}...)
params = append(params, []string{"--set", fmt.Sprintf("triggerer.persistence.storageClassName=%s", DefaultStorageClass)}...)
params = append(params, []string{"--set", fmt.Sprintf("dags.persistence.storageClassName=%s", DefaultStorageClass)}...)
params = append(params, []string{"--set", fmt.Sprintf("workers.persistence.storageClassName=%s", t.storagepool)}...)
params = append(params, []string{"--set", fmt.Sprintf("triggerer.persistence.storageClassName=%s", t.storagepool)}...)
params = append(params, []string{"--set", fmt.Sprintf("dags.persistence.storageClassName=%s", t.storagepool)}...)
params = append(params, []string{"--set", "statsd.enabled=false"}...)
params = append(params, []string{"--set", "redis.enabled=false"}...)
params = append(params, []string{"--set", "statsd.enabled=false"}...)
Expand All @@ -310,7 +322,7 @@ func (t *toolsCmd) genAirflowParameters(relName string, parameters []string) []s

func (t *toolsCmd) genRedisParameters(relName string, parameters []string) []string {
params := append(parameters, []string{"--set", "fullnameOverride=" + relName}...)
params = append(params, []string{"--set", fmt.Sprintf("storage.className=%s", DefaultStorageClass)}...)
params = append(params, []string{"--set", fmt.Sprintf("storage.className=%s", t.storagepool)}...)
return params
}

Expand Down Expand Up @@ -458,6 +470,12 @@ func (t *toolsCmd) uninstall(parameters []string) error {
return err
}
}

if t.deletePVC {
if err := t.deleteToolsPVC(t.ns); err != nil {
return err
}
}
return nil
}

Expand Down

0 comments on commit 2814a37

Please sign in to comment.