Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Stateful sets #262

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion helm/spark-operator/templates/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ metadata:
{{- end }}
rules:
- apiGroups: [""]
resources: ["pods", "replicationcontrollers", "services", "configmaps"]
resources: ["pods", "replicationcontrollers", "statefulsets" , "services", "configmaps"]
verbs: ["create", "delete", "deletecollection", "get", "list", "update", "watch", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1beta1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ spec:
verbs:
- "*"
- apiGroups: [""]
resources: ["pods", "replicationcontrollers", "services", "configmaps"]
resources: ["pods", "replicationcontrollers", "statefulsets" , "services", "configmaps"]
verbs: ["create", "delete", "deletecollection", "get", "list", "update", "watch", "patch"]
- apiGroups:
- apiextensions.k8s.io
Expand Down
2 changes: 1 addition & 1 deletion manifest/operator-cm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ metadata:
name: edit-resources
rules:
- apiGroups: [""]
resources: ["pods", "replicationcontrollers", "services", "configmaps"]
resources: ["pods", "replicationcontrollers", "statefulsets", "services", "configmaps"]
verbs: ["create", "delete", "deletecollection", "get", "list", "update", "watch", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1beta1
Expand Down
4 changes: 2 additions & 2 deletions manifest/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ spec:
serviceAccountName: spark-operator
containers:
- name: spark-operator
image: quay.io/radanalyticsio/spark-operator:latest-released
image: jkremser/spark-operator:s-sets
env:
- name: WATCH_NAMESPACE # if not specified all the namespaces will be watched; ~ denotes the same ns as the operator's
value: "~"
Expand All @@ -60,5 +60,5 @@ spec:
limits:
memory: "512Mi"
cpu: "1000m"
imagePullPolicy: Never
imagePullPolicy: Always

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ protected void onDelete(SparkApplication app) {
String name = app.getName();
client.services().inNamespace(namespace).withLabels(deployer.getLabelsForDeletion(name)).delete();
client.replicationControllers().inNamespace(namespace).withLabels(deployer.getLabelsForDeletion(name)).delete();
client.apps().statefulSets().inNamespace(namespace).withLabels(deployer.getLabelsForDeletion(name)).delete();
client.pods().inNamespace(namespace).withLabels(deployer.getLabelsForDeletion(name)).delete();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.radanalytics.operator.cluster;

import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.radanalytics.operator.historyServer.HistoryServerHelper;
import io.radanalytics.types.*;

Expand Down Expand Up @@ -34,16 +35,16 @@ public class InitContainersHelper {
* </ol>
*
*
* @param rc ReplicationController instance
* @param ss StatefulSet instance
* @param cluster SparkCluster instance
* @param cmExists whether config map with overrides exists
* @return modified ReplicationController instance
* @return modified StatefulSet instance
*/
public static final ReplicationController addInitContainers(ReplicationController rc,
SparkCluster cluster,
boolean cmExists,
boolean isMaster) {
PodSpec podSpec = rc.getSpec().getTemplate().getSpec();
public static final StatefulSet addInitContainers(StatefulSet ss,
SparkCluster cluster,
boolean cmExists,
boolean isMaster) {
PodSpec podSpec = ss.getSpec().getTemplate().getSpec();

if (isMaster && HistoryServerHelper.needsVolume(cluster)) {
createChmodHistoryServerContainer(cluster, podSpec);
Expand All @@ -57,8 +58,8 @@ public static final ReplicationController addInitContainers(ReplicationControlle
createConfigOverrideContainer(cluster, podSpec, cmExists);
}

rc.getSpec().getTemplate().setSpec(podSpec);
return rc;
ss.getSpec().getTemplate().setSpec(podSpec);
return ss;
}

private static Container createDownloader(SparkCluster cluster, PodSpec podSpec) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package io.radanalytics.operator.cluster;

import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder;
import io.fabric8.kubernetes.api.model.apps.StatefulSetFluent;
import io.fabric8.kubernetes.api.model.apps.StatefulSetSpecFluent;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.radanalytics.operator.historyServer.HistoryServerHelper;
import io.radanalytics.operator.resource.LabelsHelper;
import io.radanalytics.types.*;
import io.radanalytics.types.PersistentVolume;

import java.util.*;

Expand Down Expand Up @@ -34,31 +39,55 @@ public KubernetesResourceList getResourceList(SparkCluster cluster) {
if (cluster.getMaster() != null && cluster.getMaster().getLabels() != null)
allMasterLabels.putAll(cluster.getMaster().getLabels());

ReplicationController masterRc = getRCforMaster(cluster);
ReplicationController workerRc = getRCforWorker(cluster);
StatefulSet masterSs = getSSforMaster(cluster);
StatefulSet workerSs = getSSforWorker(cluster);
Service masterService = getService(false, name, 7077, allMasterLabels);
List<HasMetadata> list = new ArrayList<>(Arrays.asList(masterRc, workerRc, masterService));
List<HasMetadata> list = new ArrayList<>(Arrays.asList(masterSs, workerSs, masterService));
if (cluster.getSparkWebUI()) {
Service masterUiService = getService(true, name, 8080, allMasterLabels);
list.add(masterUiService);
}

// pvc for history server (in case of sharedVolume strategy)
if (HistoryServerHelper.needsVolume(cluster)) {
PersistentVolumeClaim pvc = getPersistentVolumeClaim(cluster, getDefaultLabels(name));
SharedVolume sharedVolume = Optional.ofNullable(cluster.getHistoryServer().getSharedVolume()).orElse(new SharedVolume());
Map<String, String> matchLabels = sharedVolume.getMatchLabels();
if (null == matchLabels || matchLabels.isEmpty()) {
// if no match labels are specified, we assume the default one: radanalytics.io/SparkCluster: spark-cluster-name
matchLabels = new HashMap<>(1);
matchLabels.put(prefix + entityName, cluster.getName());
}
PersistentVolumeClaim pvc = getPersistentVolumeClaim(cluster.getName() + "-hs-claim", getDefaultLabels(name), matchLabels, sharedVolume.getSize(), "ReadWriteMany");
list.add(pvc);
}

// pvcs for masters/workers
List<PersistentVolume> pvcToBeAdded = new ArrayList<>();
if (cluster.getMaster() != null) {
pvcToBeAdded.addAll(cluster.getMaster().getPersistentVolumes());
}
if (cluster.getWorker() != null) {
pvcToBeAdded.addAll(cluster.getWorker().getPersistentVolumes());
}

// if (!pvcToBeAdded.isEmpty()) {
// for (PersistentVolume pv : pvcToBeAdded) {
// String pvName = cluster.getName() + "-" + Optional.ofNullable(pv.getName()).orElse(UUID.randomUUID().toString()) + "-claim";
// PersistentVolumeClaim pvc = getPersistentVolumeClaim(pvName, getDefaultLabels(name), pv.getMatchLabels(), pv.getSize(), "ReadWriteOnce");
// list.add(pvc);
// }
// }
KubernetesList resources = new KubernetesListBuilder().withItems(list).build();
return resources;
}
}

private ReplicationController getRCforMaster(SparkCluster cluster) {
return getRCforMasterOrWorker(true, cluster);
private StatefulSet getSSforMaster(SparkCluster cluster) {
return getSSforMasterOrWorker(true, cluster);
}

private ReplicationController getRCforWorker(SparkCluster cluster) {
return getRCforMasterOrWorker(false, cluster);
private StatefulSet getSSforWorker(SparkCluster cluster) {
return getSSforMasterOrWorker(false, cluster);
}

private Service getService(boolean isUi, String name, int port, Map<String, String> allMasterLabels) {
Expand All @@ -78,7 +107,7 @@ public static EnvVar env(String key, String value) {
return new EnvVarBuilder().withName(key).withValue(value).build();
}

private ReplicationController getRCforMasterOrWorker(boolean isMaster, SparkCluster cluster) {
private StatefulSet getSSforMasterOrWorker(boolean isMaster, SparkCluster cluster) {
String name = cluster.getName();
String podName = name + (isMaster ? "-m" : "-w");
Map<String, String> selector = getSelector(name, podName);
Expand Down Expand Up @@ -166,7 +195,7 @@ private ReplicationController getRCforMasterOrWorker(boolean isMaster, SparkClus
podLabels.put(prefix + LabelsHelper.OPERATOR_POD_TYPE_LABEL, isMaster ? OPERATOR_TYPE_MASTER_LABEL : OPERATOR_TYPE_WORKER_LABEL);
addLabels(podLabels, cluster, isMaster);

PodTemplateSpecFluent.SpecNested<ReplicationControllerSpecFluent.TemplateNested<ReplicationControllerFluent.SpecNested<ReplicationControllerBuilder>>> rcBuilder = new ReplicationControllerBuilder().withNewMetadata()
PodTemplateSpecFluent.SpecNested<StatefulSetSpecFluent.TemplateNested<StatefulSetFluent.SpecNested<StatefulSetBuilder>>> ssBuilder = new StatefulSetBuilder().withNewMetadata()
.withName(podName).withLabels(labels)
.endMetadata()
.withNewSpec().withReplicas(
Expand All @@ -176,11 +205,17 @@ private ReplicationController getRCforMasterOrWorker(boolean isMaster, SparkClus
:
Optional.ofNullable(cluster.getWorker()).orElse(new Worker()).getInstances()
)
.withSelector(selector)
.withNewSelector().withMatchLabels(selector).endSelector()
.withNewTemplate().withNewMetadata().withLabels(podLabels).endMetadata()
.withNewSpec().withContainers(containerBuilder.build());

ReplicationController rc = rcBuilder.endSpec().endTemplate().endSpec().build();
// // TODO: transform the replication controllers into stateful sets if the PV is asked
// List<Volume> volumes = getVolumes(cluster, isMaster);
// if (volumes != null) {
// rcBuilder.withVolumes(volumes);
// }

StatefulSet ss = ssBuilder.endSpec().endTemplate().endSpec().build();

// history server
if (isMaster && null != cluster.getHistoryServer()) {
Expand All @@ -189,23 +224,36 @@ private ReplicationController getRCforMasterOrWorker(boolean isMaster, SparkClus

// add init containers that will prepare the data on the nodes or override the configuration
if (!cluster.getDownloadData().isEmpty() || !cluster.getSparkConfiguration().isEmpty() || cmExists) {
InitContainersHelper.addInitContainers(rc, cluster, cmExists, isMaster);
InitContainersHelper.addInitContainers(ss, cluster, cmExists, isMaster);
}
return rc;
return ss;

}

private PersistentVolumeClaim getPersistentVolumeClaim(SparkCluster cluster, Map<String, String> labels) {
SharedVolume sharedVolume = Optional.ofNullable(cluster.getHistoryServer().getSharedVolume()).orElse(new SharedVolume());
Map<String,Quantity> requests = new HashMap<>();
requests.put("storage", new QuantityBuilder().withAmount(sharedVolume.getSize()).build());
Map<String, String> matchLabels = sharedVolume.getMatchLabels();
if (null == matchLabels || matchLabels.isEmpty()) {
// if no match labels are specified, we assume the default one: radanalytics.io/SparkCluster: spark-cluster-name
matchLabels = new HashMap<>(1);
matchLabels.put(prefix + entityName, cluster.getName());
private List<Volume> getVolumes(SparkCluster cluster, boolean isMaster) {
if (isMaster) {
if (cluster.getMaster() == null) {
return null;
}
// cluster.getMaster().getPersistentVolumes()
} else {
if (cluster.getWorker() == null) {
return null;
}
}
PersistentVolumeClaim pvc = new PersistentVolumeClaimBuilder().withNewMetadata().withName(cluster.getName() + "-claim").withLabels(labels).endMetadata()
.withNewSpec().withAccessModes("ReadWriteMany")
return null;
}

private PersistentVolumeClaim getPersistentVolumeClaim(String name,
Map<String, String> labels,
Map<String, String> matchLabels,
String size,
String accessMode) {
Map<String,Quantity> requests = new HashMap<>();
requests.put("storage", new QuantityBuilder().withAmount(size).build());

PersistentVolumeClaim pvc = new PersistentVolumeClaimBuilder().withNewMetadata().withName(name).withLabels(labels).endMetadata()
.withNewSpec().withAccessModes(accessMode)
.withNewSelector().withMatchLabels(matchLabels).endSelector()
.withNewResources().withRequests(requests).endResources().endSpec().build();
return pvc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Functions;
import com.google.common.collect.Sets;
import io.fabric8.kubernetes.api.model.DoneableReplicationController;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ReplicationController;
import io.fabric8.kubernetes.api.model.ReplicationControllerList;
import io.fabric8.kubernetes.api.model.apps.DoneableStatefulSet;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.api.model.apps.StatefulSetList;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.FilterWatchListMultiDeletable;
Expand Down Expand Up @@ -65,7 +65,7 @@ protected void onAdd(SparkCluster cluster) {
protected void onDelete(SparkCluster cluster) {
String name = cluster.getName();
client.services().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete();
client.replicationControllers().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete();
client.apps().statefulSets().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete();
client.pods().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete();
client.persistentVolumeClaims().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete();
getClusters().delete(name);
Expand Down Expand Up @@ -93,7 +93,7 @@ protected void onModify(SparkCluster newCluster) {
if (isOnlyScale(existingCluster, newCluster)) {
log.info("{}scaling{} from {}{}{} worker replicas to {}{}{}", re(), xx(), ye(),
existingCluster.getWorker().getInstances(), xx(), ye(), newWorkers, xx());
client.replicationControllers().inNamespace(namespace).withName(name + "-w").scale(newWorkers);
client.apps().statefulSets().inNamespace(namespace).withName(name + "-w").scale(newWorkers);

// update metrics
MetricsHelper.workers.labels(newCluster.getName(), namespace).set(newCluster.getWorker().getInstances());
Expand Down Expand Up @@ -199,18 +199,19 @@ public void fullReconciliation() {
}

private Map<String, Integer> getActual() {
MixedOperation<ReplicationController, ReplicationControllerList, DoneableReplicationController, RollableScalableResource<ReplicationController, DoneableReplicationController>> aux1 =
client.replicationControllers();
FilterWatchListMultiDeletable<ReplicationController, ReplicationControllerList, Boolean, Watch, Watcher<ReplicationController>> aux2 =

MixedOperation<StatefulSet, StatefulSetList, DoneableStatefulSet, RollableScalableResource<StatefulSet, DoneableStatefulSet>> aux1 =
client.apps().statefulSets();
FilterWatchListMultiDeletable<StatefulSet, StatefulSetList, Boolean, Watch, Watcher<StatefulSet>> aux2 =
"*".equals(namespace) ? aux1.inAnyNamespace() : aux1.inNamespace(namespace);
Map<String, String> labels =new HashMap<>(2);
labels.put(prefix + OPERATOR_KIND_LABEL, entityName);
labels.put(prefix + OPERATOR_RC_TYPE_LABEL, "worker");
List<ReplicationController> workerRcs = aux2.withLabels(labels).list().getItems();
Map<String, Integer> retMap = workerRcs
List<StatefulSet> workerSss = aux2.withLabels(labels).list().getItems();
Map<String, Integer> retMap = workerSss
.stream()
.collect(Collectors.toMap(rc -> rc.getMetadata().getLabels().get(prefix + entityName),
rc -> rc.getSpec().getReplicas()));
.collect(Collectors.toMap(ss -> ss.getMetadata().getLabels().get(prefix + entityName),
ss -> ss.getSpec().getReplicas()));
return retMap;
}

Expand Down
39 changes: 39 additions & 0 deletions src/main/resources/schema/sparkCluster.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,25 @@
"items": {
"type": "string"
}
},
"persistentVolumes": {
"type": "array",
"items": {
"type": "object",
"properties": {
"mountPath": {
"type": "string"
},
"size": {
"type": "string",
"default": "0.3Gi"
},
"matchLabels": {
"type": "object",
"existingJavaType": "java.util.Map<String,String>"
}
}
}
}
}
},
Expand Down Expand Up @@ -71,6 +90,26 @@
"items": {
"type": "string"
}
},
"persistentVolumes": {
"type": "array",
"items": {
"existingJavaType": "io.radanalytics.types.PersistentVolume",
"type": "object",
"properties": {
"mountPath": {
"type": "string"
},
"size": {
"type": "string",
"default": "0.3Gi"
},
"matchLabels": {
"type": "object",
"existingJavaType": "java.util.Map<String,String>"
}
}
}
}
}
},
Expand Down