Skip to content

Commit

Permalink
Merge pull request #288 from volcano-sh/scheduler-in-tree
Browse files Browse the repository at this point in the history
Scheduler in tree
  • Loading branch information
k82cn authored Jul 6, 2019
2 parents 82b4ec4 + 2bf4884 commit 37251c4
Show file tree
Hide file tree
Showing 111 changed files with 12,996 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cmd/admission/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"net/http"

"github.com/golang/glog"
"github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/client/clientset/versioned"

"k8s.io/api/admission/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down
2 changes: 1 addition & 1 deletion cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"

kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
kbver "volcano.sh/volcano/pkg/client/clientset/versioned"

"volcano.sh/volcano/cmd/controllers/app/options"
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"
Expand Down
95 changes: 95 additions & 0 deletions cmd/kube-batch/app/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package options

import (
"fmt"
"time"

"github.com/spf13/pflag"
)

const (
defaultSchedulerName = "volcano"
defaultSchedulerPeriod = time.Second
defaultQueue = "default"
defaultListenAddress = ":8080"

defaultQPS = 50.0
defaultBurst = 100
)

// ServerOption is the main context object for the controller manager.
type ServerOption struct {
Master string
Kubeconfig string
SchedulerName string
SchedulerConf string
SchedulePeriod time.Duration
EnableLeaderElection bool
LockObjectNamespace string
DefaultQueue string
PrintVersion bool
ListenAddress string
EnablePriorityClass bool
KubeAPIBurst int
KubeAPIQPS float32
}

// ServerOpts server options
var ServerOpts *ServerOption

// NewServerOption creates a new CMServer with a default config.
func NewServerOption() *ServerOption {
s := ServerOption{}
return &s
}

// AddFlags adds flags for a specific CMServer to the specified FlagSet
func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information")
// kube-batch will ignore pods with scheduler names other than specified with the option
fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "kube-batch will handle pods whose .spec.SchedulerName is same as scheduler-name")
fs.StringVar(&s.SchedulerConf, "scheduler-conf", "", "The absolute path of scheduler configuration file")
fs.DurationVar(&s.SchedulePeriod, "schedule-period", defaultSchedulerPeriod, "The period between each scheduling cycle")
fs.StringVar(&s.DefaultQueue, "default-queue", defaultQueue, "The default queue name of the job")
fs.BoolVar(&s.EnableLeaderElection, "leader-elect", s.EnableLeaderElection,
"Start a leader election client and gain leadership before "+
"executing the main loop. Enable this when running replicated kube-batch for high availability")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object that is used for leader election")
fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.")
fs.BoolVar(&s.EnablePriorityClass, "priority-class", true,
"Enable PriorityClass to provide the capacity of preemption at pod group level; to disable it, set it false")
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
}

// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled
func (s *ServerOption) CheckOptionOrDie() error {
if s.EnableLeaderElection && s.LockObjectNamespace == "" {
return fmt.Errorf("lock-object-namespace must not be nil when LeaderElection is enabled")
}

return nil
}

// RegisterOptions registers options
func (s *ServerOption) RegisterOptions() {
ServerOpts = s
}
51 changes: 51 additions & 0 deletions cmd/kube-batch/app/options/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package options

import (
"reflect"
"testing"
"time"

"github.com/spf13/pflag"
)

func TestAddFlags(t *testing.T) {
fs := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError)
s := NewServerOption()
s.AddFlags(fs)

args := []string{
"--schedule-period=5m",
"--priority-class=false",
}
fs.Parse(args)

// This is a snapshot of expected options parsed by args.
expected := &ServerOption{
SchedulerName: defaultSchedulerName,
SchedulePeriod: 5 * time.Minute,
DefaultQueue: defaultQueue,
ListenAddress: defaultListenAddress,
KubeAPIBurst: defaultBurst,
KubeAPIQPS: defaultQPS,
}

if !reflect.DeepEqual(expected, s) {
t.Errorf("Got different run options than expected.\nGot: %+v\nExpected: %+v\n", s, expected)
}
}
153 changes: 153 additions & 0 deletions cmd/kube-batch/app/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package app

import (
"context"
"fmt"
"net/http"
"os"
"time"

"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"
"volcano.sh/volcano/cmd/kube-batch/app/options"
"volcano.sh/volcano/pkg/scheduler"
"volcano.sh/volcano/pkg/version"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

// Register gcp auth
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/rest"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
)

const (
leaseDuration = 15 * time.Second
renewDeadline = 10 * time.Second
retryPeriod = 5 * time.Second
apiVersion = "v1alpha1"
)

func buildConfig(opt *options.ServerOption) (*rest.Config, error) {
var cfg *rest.Config
var err error

master := opt.Master
kubeconfig := opt.Kubeconfig
if master != "" || kubeconfig != "" {
cfg, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
} else {
cfg, err = rest.InClusterConfig()
}
if err != nil {
return nil, err
}
cfg.QPS = opt.KubeAPIQPS
cfg.Burst = opt.KubeAPIBurst

return cfg, nil
}

// Run the kubeBatch scheduler
func Run(opt *options.ServerOption) error {
if opt.PrintVersion {
version.PrintVersionAndExit()
}

config, err := buildConfig(opt)
if err != nil {
return err
}

// Start policy controller to allocate resources.
sched, err := scheduler.NewScheduler(config,
opt.SchedulerName,
opt.SchedulerConf,
opt.SchedulePeriod,
opt.DefaultQueue)
if err != nil {
panic(err)
}

go func() {
http.Handle("/metrics", promhttp.Handler())
glog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
}()

run := func(ctx context.Context) {
sched.Run(ctx.Done())
<-ctx.Done()
}

if !opt.EnableLeaderElection {
run(context.TODO())
return fmt.Errorf("finished without leader elect")
}

leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(config, "leader-election"))
if err != nil {
return err
}

// Prepare event clients.
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: leaderElectionClient.CoreV1().Events(opt.LockObjectNamespace)})
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: opt.SchedulerName})

hostname, err := os.Hostname()
if err != nil {
return fmt.Errorf("unable to get hostname: %v", err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id := hostname + "_" + string(uuid.NewUUID())

rl, err := resourcelock.New(resourcelock.ConfigMapsResourceLock,
opt.LockObjectNamespace,
"kube-batch",
leaderElectionClient.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: eventRecorder,
})
if err != nil {
return fmt.Errorf("couldn't create resource lock: %v", err)
}

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
return fmt.Errorf("lost lease")
}
20 changes: 15 additions & 5 deletions cmd/kube-batch/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2018 The Vulcan Authors.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -18,24 +18,34 @@ package main
import (
"fmt"
"os"
"runtime"
"time"

// init pprof server
_ "net/http/pprof"

"github.com/golang/glog"
"github.com/spf13/pflag"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/flag"

_ "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions"
_ "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins"
"volcano.sh/volcano/cmd/kube-batch/app"
"volcano.sh/volcano/cmd/kube-batch/app/options"

// Import default actions/plugins.
_ "volcano.sh/volcano/pkg/scheduler/actions"
_ "volcano.sh/volcano/pkg/scheduler/plugins"

"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app"
"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options"
// init assert
_ "volcano.sh/volcano/pkg/scheduler/util/assert"
)

var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes")

func main() {
runtime.GOMAXPROCS(runtime.NumCPU())

s := options.NewServerOption()
s.AddFlags(pflag.CommandLine)
s.RegisterOptions()
Expand Down
7 changes: 7 additions & 0 deletions hack/.golint_failures
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
volcano.sh/volcano/pkg/apis/scheduling/v1alpha1
volcano.sh/volcano/pkg/apis/utils
volcano.sh/volcano/pkg/scheduler/actions/allocate
volcano.sh/volcano/pkg/scheduler/actions/backfill
volcano.sh/volcano/pkg/scheduler/actions/enqueue
volcano.sh/volcano/pkg/scheduler/actions/preempt
volcano.sh/volcano/pkg/scheduler/actions/reclaim
volcano.sh/volcano/test/e2e
2 changes: 1 addition & 1 deletion hack/update-gencode.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ CODEGEN_PKG=${CODEGEN_PKG:-$(cd ${SCRIPT_ROOT}; ls -d -1 ./vendor/k8s.io/code-ge
# instead of the $GOPATH directly. For normal projects this can be dropped.
${CODEGEN_PKG}/generate-groups.sh "deepcopy,client,informer,lister" \
volcano.sh/volcano/pkg/client volcano.sh/volcano/pkg/apis \
"batch:v1alpha1 bus:v1alpha1" \
"batch:v1alpha1 bus:v1alpha1 scheduling:v1alpha1" \
--go-header-file ${SCRIPT_ROOT}/hack/boilerplate/boilerplate.go.txt

# To use your own boilerplate text use:
Expand Down
2 changes: 1 addition & 1 deletion pkg/admission/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"strings"

"github.com/golang/glog"
"github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/client/clientset/versioned"

"k8s.io/api/admission/v1beta1"
"k8s.io/api/core/v1"
Expand Down
Loading

0 comments on commit 37251c4

Please sign in to comment.