diff --git a/test/sigma/apiserver/pdfc_admission.go b/test/sigma/apiserver/pdfc_admission.go new file mode 100644 index 0000000000000..fff29ffaf6623 --- /dev/null +++ b/test/sigma/apiserver/pdfc_admission.go @@ -0,0 +1,179 @@ +package apiserver + +import ( + "fmt" + "path/filepath" + "strconv" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/plugin/pkg/admission/poddeletionflowcontrol" + "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/sigma/util" +) + +var _ = Describe("[kube-apiserver][admission][pdfc]", func() { + image := "reg.docker.alibaba-inc.com/k8s-test/nginx:1.15.3" + f := framework.NewDefaultFramework("sigma-apiserver") + + It("[smoke][test-flow-control] test pod deletion limited by pdfc [Serial]", func() { + By("load pdfc template") + pdfcConfigFile := filepath.Join(util.TestDataDir, "pdfc-config.json") + pdfcConfig, err := util.LoadConfigMapFromFile(pdfcConfigFile) + Expect(err).NotTo(HaveOccurred(), "load pdfc template failed") + + By("load pod template") + podFile := filepath.Join(util.TestDataDir, "pod-base.json") + podCfg, err := util.LoadPodFromFile(podFile) + Expect(err).NotTo(HaveOccurred(), "load pod template failed") + + By("create pdfc rules") + pdfcConfig.Data[poddeletionflowcontrol.PdfcConfigRuleKey] = + `[{"duration":"1m","deleteLimit":10}]` + _, err = f.ClientSet.CoreV1().ConfigMaps(f.Namespace.Name).Create(pdfcConfig) + Expect(err).NotTo(HaveOccurred(), "create pdfc failed") + defer f.ClientSet.CoreV1().ConfigMaps(f.Namespace.Name).Delete(poddeletionflowcontrol.PdfcConfigName, nil) + + By("create 11 pods") + var pods []*v1.Pod + podCfg.Spec.Containers[0].Image = image + for i := 0; i <= 10; i++ { + pod := podCfg.DeepCopy() + pod.Name = pod.Name + strconv.Itoa(i) + pod, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(pod) + Expect(err).NotTo(HaveOccurred(), "create pod failed") + pods = append(pods, pod) + } + + second := time.Now().Second() + if second > 55 || second < 5 { + count := time.Duration((65 - second) % 60) + time.Sleep(count * time.Second) + } + By(fmt.Sprintf("delete time date: %v", time.Now())) + + By("try to delete 11 pods in one minute") + for i := range pods { + err := f.ClientSet.CoreV1().Pods(pods[i].Namespace).Delete(pods[i].Name, nil) + if i < 3 { + Expect(err).NotTo(HaveOccurred(), "delete pod failed") + } else if i == 10 { + Expect(err.Error()).To(ContainSubstring("rejected by flow control")) + } + } + + By("wait until next minute") + expectRecords := fmt.Sprintf(`{"%v":{"deleteCount":10}}`, time.Unix(time.Now().Unix(), 0).Format("200601021504")) + if time.Now().Second() != 0 { + sleepSecs := time.Duration(70 - time.Now().Second()) + time.Sleep(sleepSecs * time.Second) + } + + By("check pod deletion record") + pdfcCm, err := f.ClientSet.CoreV1().ConfigMaps(f.Namespace.Name).Get(poddeletionflowcontrol.PdfcConfigName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred(), "get pdfc failed") + framework.Logf("pdfc: %+v", pdfcCm) + Expect(pdfcCm.Data[poddeletionflowcontrol.PdfcConfigRecordKey]).To(Equal(expectRecords)) + }) + + It("[test-counter-reload] test counter reload when apiserver restart [Serial]", func() { + By("load pdfc template") + pdfcConfigFile := filepath.Join(util.TestDataDir, "pdfc-config.json") + pdfcConfig, err := util.LoadConfigMapFromFile(pdfcConfigFile) + Expect(err).NotTo(HaveOccurred(), "load pdfc template failed") + + By("load pod template") + podFile := filepath.Join(util.TestDataDir, "pod-base.json") + podCfg, err := util.LoadPodFromFile(podFile) + Expect(err).NotTo(HaveOccurred(), "load pod template failed") + + By("create pdfc rules") + pdfcConfig.Data[poddeletionflowcontrol.PdfcConfigRuleKey] = + `[{"duration":"2m","deleteLimit":15}]` + pdfcConfig.Data[poddeletionflowcontrol.PdfcConfigRecordKey] = + fmt.Sprintf(`{"%v":{"deleteCount":10}}`, time.Unix(time.Now().Add(-time.Minute).Unix(), 0).Format("200601021504")) + _, err = f.ClientSet.CoreV1().ConfigMaps(f.Namespace.Name).Create(pdfcConfig) + Expect(err).NotTo(HaveOccurred(), "create pdfc failed") + defer f.ClientSet.CoreV1().ConfigMaps(f.Namespace.Name).Delete(poddeletionflowcontrol.PdfcConfigName, nil) + + By("create 6 pods") + var pods []*v1.Pod + podCfg.Spec.Containers[0].Image = image + for i := 0; i <= 6; i++ { + pod := podCfg.DeepCopy() + pod.Name = pod.Name + strconv.Itoa(i) + pod, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(pod) + Expect(err).NotTo(HaveOccurred(), "create pod failed") + pods = append(pods, pod) + } + + By("try to delete 6 pods in one minute") + for i := range pods { + err := f.ClientSet.CoreV1().Pods(pods[i].Namespace).Delete(pods[i].Name, nil) + if i < 5 { + Expect(err).NotTo(HaveOccurred(), "delete pod failed") + } else { + // Todo: fix panic + Expect(err.Error()).To(ContainSubstring("rejected by flow control")) + } + } + }) + + It("[test-rules-change] test rules change when apiserver running [Serial]", func() { + By("load pdfc template") + pdfcConfigFile := filepath.Join(util.TestDataDir, "pdfc-config.json") + pdfcConfig, err := util.LoadConfigMapFromFile(pdfcConfigFile) + Expect(err).NotTo(HaveOccurred(), "load pdfc template failed") + + By("load pod template") + podFile := filepath.Join(util.TestDataDir, "pod-base.json") + podCfg, err := util.LoadPodFromFile(podFile) + Expect(err).NotTo(HaveOccurred(), "load pod template failed") + + By("create pdfc rules") + pdfcConfig.Data[poddeletionflowcontrol.PdfcConfigRuleKey] = + `[{"duration":"5m","deleteLimit":5}]` + _, err = f.ClientSet.CoreV1().ConfigMaps(f.Namespace.Name).Create(pdfcConfig) + Expect(err).NotTo(HaveOccurred(), "create pdfc failed") + defer f.ClientSet.CoreV1().ConfigMaps(f.Namespace.Name).Delete(poddeletionflowcontrol.PdfcConfigName, nil) + + By("create 6 pods") + var pods []*v1.Pod + podCfg.Spec.Containers[0].Image = image + for i := 0; i <= 6; i++ { + pod := podCfg.DeepCopy() + pod.Name = pod.Name + strconv.Itoa(i) + pod, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(pod) + Expect(err).NotTo(HaveOccurred(), "create pod failed") + pods = append(pods, pod) + } + + By("try to delete 6 pods in one minute") + for i := range pods { + err := f.ClientSet.CoreV1().Pods(pods[i].Namespace).Delete(pods[i].Name, nil) + if i < 5 { + Expect(err).NotTo(HaveOccurred(), "delete pod failed") + } else { + Expect(err.Error()).To(ContainSubstring("rejected by flow control")) + } + } + + By("update pdfc rules") + pdfcCm, err := f.ClientSet.CoreV1().ConfigMaps(f.Namespace.Name).Get(poddeletionflowcontrol.PdfcConfigName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred(), "get pdfc failed") + pdfcCm.Data[poddeletionflowcontrol.PdfcConfigRuleKey] = `[{"duration":"5m","deleteLimit":15}]` + _, err = f.ClientSet.CoreV1().ConfigMaps(f.Namespace.Name).Update(pdfcCm) + Expect(err).NotTo(HaveOccurred(), "update pdfc rules failed") + + By("wait one minute") + time.Sleep(70 * time.Second) + + By("try to delete last pod") + err = f.ClientSet.CoreV1().Pods(pods[5].Namespace).Delete(pods[5].Name, nil) + Expect(err).NotTo(HaveOccurred(), "delete pod failed") + }) +}) diff --git a/test/sigma/sigma_suite_test.go b/test/sigma/sigma_suite_test.go index a392028925a16..095dfafed048f 100644 --- a/test/sigma/sigma_suite_test.go +++ b/test/sigma/sigma_suite_test.go @@ -19,6 +19,7 @@ import ( // test sources _ "k8s.io/kubernetes/test/sigma/ant-sigma-bvt" + _ "k8s.io/kubernetes/test/sigma/apiserver" _ "k8s.io/kubernetes/test/sigma/cni" _ "k8s.io/kubernetes/test/sigma/common" _ "k8s.io/kubernetes/test/sigma/controller" diff --git a/test/sigma/testdata/cgroup-parent-configmap.json b/test/sigma/testdata/cgroup-parent-configmap.json new file mode 100644 index 0000000000000..a8d4e3d49abff --- /dev/null +++ b/test/sigma/testdata/cgroup-parent-configmap.json @@ -0,0 +1,11 @@ +{ + "apiVersion": "v1", + "data": { + "custom-cgroup-parents": "/fakecgroup/test;/fakecgroup/test1" + }, + "kind": "ConfigMap", + "metadata": { + "name": "custom-cgroup-parents", + "namespace": "kube-system" + } +} diff --git a/test/sigma/util/armory_util.go b/test/sigma/util/armory_util.go index ebac14709b0d5..6b7e1c75eda56 100644 --- a/test/sigma/util/armory_util.go +++ b/test/sigma/util/armory_util.go @@ -1,21 +1,12 @@ package util import ( - "encoding/json" - "errors" - "html" - "io/ioutil" - "net/http" - "net/url" - "strings" - "fmt" + "strings" + "time" "github.com/golang/glog" -) - -const ( - namingBackend = "armory" + "k8s.io/kubernetes/test/sigma/util/skyline" ) // ArmoryInfo copy from sigma-k8s-controller/pkg/util/naming/ns.go @@ -42,61 +33,86 @@ type ArmoryQueryResult struct { Data []ArmoryInfo `json:"result"` } -// QueryArmory query the daily armory. -func QueryArmory(query string) ([]ArmoryInfo, error) { - selectRows := "[default],rack,location_in_rack,logic_region_flag,product_name,app_use_type,product_id,product_name,hw_cpu,hw_harddisk,hw_mem,hw_raid,room,sm_name,app_use_type,create_time,modify_time,parent_service_tag" - - paramMap := map[string]string{ - "_username": "zeus", - "key": "iabSU71PfURu90Lz6LE5vg==", - "select": selectRows, - "q": query, +// QueryArmory query skyline and covert to armory info. +func QueryArmory(queryString string) ([]ArmoryInfo, error) { + if strings.Contains(queryString, "dns_ip") { + queryString = strings.Replace(queryString, "dns_ip", "ip", -1) } - - // hardcoding daily armory url - armoryURL := "http://gapi.a.alibaba-inc.com/page/api/free/opsfreeInterface/search.htm" - - values := url.Values{} - for k, v := range paramMap { - values.Set(k, v) + queryString = strings.Replace(queryString, "==", "=", -1) + + skylineManager := skyline.NewSkylineManager() + queryItem := &skyline.QueryItem{ + From: "server", + Select: strings.Join([]string{skyline.SelectDiskSize, skyline.SelectAppUseType, skyline.SelectParentSn, + skyline.SelectSn, skyline.SelectIp, skyline.SelectAppGroup, skyline.SelectAppName, + skyline.SelectParentSn, skyline.SelectHostName, skyline.SelectAppServerState, + skyline.SelectSecurityDomain, skyline.SelectSite, skyline.SelectModel}, ","), + Condition: queryString, + Page: 1, + Num: 100, } - - //glog.Info("Query armory args:%v", values) - - resp, err := http.PostForm(armoryURL, values) + result, err := skylineManager.Query(queryItem) if err != nil { return nil, err } - - defer resp.Body.Close() - data, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - armoryResult := &ArmoryQueryResult{} - if err := json.Unmarshal(data, armoryResult); err != nil { - return nil, err + if result == nil { + // try one more + time.Sleep(5 * time.Second) + result, err = skylineManager.Query(queryItem) + if err != nil { + return nil, err + } + if result == nil { + return nil, fmt.Errorf("no skyline record is found for %s", queryString) + } } - if armoryResult.Error != "" { - info, _ := url.QueryUnescape(armoryResult.Message) - return nil, errors.New(html.UnescapeString(info)) + armoryInfos := make([]ArmoryInfo, 0) + for _, item := range result.Value.ItemList { + armoryInfo := ArmoryInfo{} + val, ok := item[skyline.SelectIp] + if ok { + armoryInfo.IP = val.(string) + } + val, ok = item[skyline.SelectSn] + if ok { + armoryInfo.ServiceTag = val.(string) + } + val, ok = item[skyline.SelectHostName] + if ok { + armoryInfo.NodeName = val.(string) + } + val, ok = item[skyline.SelectAppGroup] + if ok { + armoryInfo.NodeGroup = val.(string) + } + val, ok = item[skyline.SelectParentSn] + if ok { + armoryInfo.ParentServiceTag = val.(string) + } + val, ok = item[skyline.SelectAppServerState] + if ok { + armoryInfo.State = val.(string) + } + val, ok = item[skyline.SelectSite] + if ok { + armoryInfo.Site = val.(string) + } + val, ok = item[skyline.SelectModel] + if ok { + armoryInfo.Model = val.(string) + } + val, ok = item[skyline.SelectAppName] + if ok { + armoryInfo.ProductName = val.(string) + } + + armoryInfos = append(armoryInfos, armoryInfo) } - - //glog.Info("armoryresult :%v ", armoryResult) - if armoryResult.Num <= 0 { - return nil, nil - } - - for _, data := range armoryResult.Data { - data.Site = strings.ToLower(data.Site) - } - - return armoryResult.Data, nil + return armoryInfos, nil } -// GetHostSnFromIp get the host SN from host IP. +// GetHostSnFromHostIp get the host SN from host IP. func GetHostSnFromHostIp(ip string) string { nsInfo, err := QueryArmory(fmt.Sprintf("dns_ip=='%v'", ip)) if err != nil { diff --git a/test/sigma/util/configmap_util.go b/test/sigma/util/configmap_util.go index 79a04ef0f1c7c..a3cd832b0d23d 100644 --- a/test/sigma/util/configmap_util.go +++ b/test/sigma/util/configmap_util.go @@ -7,7 +7,7 @@ import ( "k8s.io/api/core/v1" ) -// LoadDeploymentFromFile create a deployment object from file +// LoadConfigMapFromFile create a configmap object from file func LoadConfigMapFromFile(file string) (*v1.ConfigMap, error) { fileContent, err := ioutil.ReadFile(file) if err != nil { diff --git a/test/sigma/util/skyline/comm.go b/test/sigma/util/skyline/comm.go new file mode 100644 index 0000000000000..e8aac3f7f1e62 --- /dev/null +++ b/test/sigma/util/skyline/comm.go @@ -0,0 +1,219 @@ +package skyline + +import ( + "strings" + "time" +) + +const ( + SkylineCpuSetModeNameSpace = "cpu_set_mode" + SkylineCpuSetModeCpuSet = "cpu_set_mode.cpuset" + SkylineCpuSetModeCpuShare = "cpu_set_mode.cpushare" +) + +const ( + EQ = "EQ" + IN = "IN" +) + +const ( + addTagUri = "%s/h/server/tag_add" // 打标 + delTagUri = "%s/h/server/tag_del" // 去标 + vmAddUri = "%s/openapi/device/vm/add" // 新增 + vmDeleteUri = "%s/openapi/device/vm/delete" // 删除 + queryUri = "%s/item/query" // 查询 + updateUri = "%s/openapi/device/server/update" // 更新 + timeOutDefault = time.Duration(60) * time.Second // 限流器锁超时 +) + +const ( + updateNodeOperatorType = 1 // update时的Operator类型 + defaultOperatorType = "USER" // 通用的Operator类型 +) + +const ( + NsUnknownStatus = "unknown" + NsCreatedStatus = "allocated" + NsPreparedStatus = "prepared" + NsRunningStatus = "running" + NsStoppedStatus = "stopped" +) + +// 参考https://yuque.antfin-inc.com/at7ocb/qbn0oy/dwuaod#9c1zin +const ( + SelectCabinetNum = "cabinet.cabinet_num" // 机柜编号 + SelectCabinetPosition = "cabinet_position" // 设备在机柜中位置号 + SelectAppName = "app.name" // 应用名称 + SelectAppGroup = "app_group.name" // 应用分组 + SelectAppUseType = "app_use_type" // 应用用途 + SelectCpuCount = "total_cpu_count" // 总核数 + SelectDiskSize = "total_disk_size" // 磁盘大小 + SelectMemorySize = "total_memory_size" // 内存大小 + SelectAbb = "room.abbreviation" // 物理房间缩写 + SelectSmName = "standard_model.sm_name" // 标准机型名称 + SelectCreate = "gmt_create" // 创建时间 + SelectModify = "gmt_modified" // 上次修改时间 + SelectParentSn = "parent_service_tag" // 父sn,物理机为机框sn,虚拟机为服务器sn + SelectSn = "sn" // 容器或者宿主机sn + SelectIp = "ip" // 容器或者宿主机ip + SelectHostName = "host_name" // hostname + SelectAppServerState = "app_server_state" // 应用状态 + SelectSecurityDomain = "security_domain" // 安全域 + SelectSite = "cabinet.logic_region" // 机房 + SelectModel = "device_model.model_name" // 机房 +) + +// 参考https://yuque.antfin-inc.com/at7ocb/qbn0oy/qbdtlf#serverapiparamconstantserver +const ( + ApiParamSn = "sn" // sn + ApiParamParentSn = "parentSn" // 父类sn + ApiParamAppGroup = "appGroup" // 应用分组 + ApiParamHostName = "hostName" // 主机名 + ApiParamDeviceModel = "deviceModel" // 设备模式 + ApiParamDeviceType = "device_type" // 设备类型 + ApiParamIp = "ip" // ip + ApiParamIpSecurityDomain = "ipSecurityDomain" // 安全域 + ApiParamAppServerState = "appServerState" // 服务状态 + ApiParamTotalCpuCount = "totalCpuCount" // 总核数 + ApiParamTotalMemorySize = "totalMemorySize" // 内存大小单位M + ApiParamTotalDiskSize = "totalDiskSize" // 磁盘大小单位G + ApiParamAppUseType = "appUseType" // 应用状态 + ApiParamResourceOwner = "resourceOwner" // 资源归属 + ApiParamContainerState = "containerState" // 容器状态 +) + +var SelectDefault = strings.Join([]string{SelectCabinetNum, SelectCabinetPosition, SelectAppName, SelectAppUseType, SelectCpuCount, + SelectDiskSize, SelectMemorySize, SelectAbb, SelectSmName, SelectCreate, SelectModify, + SelectParentSn}, ",") + +type ArmoryAppState string + +var ( + Armory_UNKNOWN = ArmoryAppState("unknown") // "未知" + Armory_WAIT_ONLINE = ArmoryAppState("wait_online") // "应用等待在线" + Armory_WORKING_ONLINE = ArmoryAppState("working_online") // "应用在线" + Armory_WORKING_OFFLINE = ArmoryAppState("working_offline") // "应用离线" + Armory_READY = ArmoryAppState("ready") // "准备中"), + Armory_BUFFER = ArmoryAppState("buffer") // "闲置"), + Armory_BROKEN = ArmoryAppState("broken") // "损坏,维修"), + Armory_LOCK = ArmoryAppState("lock") // "锁定"), + Armory_UNUSE = ArmoryAppState("unuse") // "停用"); + + ArmoryRegisterStateMap = map[ArmoryAppState]int{ + Armory_READY: 1, + Armory_WAIT_ONLINE: 1, + Armory_WORKING_ONLINE: 1, + Armory_WORKING_OFFLINE: 1, + Armory_BUFFER: 1, + } + + armoryStateMap = map[string]ArmoryAppState{ + NsCreatedStatus: Armory_WORKING_OFFLINE, + NsPreparedStatus: Armory_WAIT_ONLINE, + NsStoppedStatus: Armory_WORKING_OFFLINE, + NsRunningStatus: Armory_WORKING_ONLINE, + NsUnknownStatus: Armory_UNKNOWN, + } + + gnsStateMap = map[ArmoryAppState]string{ + Armory_WORKING_OFFLINE: NsCreatedStatus, + Armory_READY: NsPreparedStatus, + Armory_WORKING_ONLINE: NsRunningStatus, + Armory_UNKNOWN: NsUnknownStatus, + } +) + +type auth struct { + Account string `json:"account"` + AppName string `json:"appName"` + Signature string `json:"signature"` + Timestamp int64 `json:"timestamp"` +} + +// TagAdd or Delete +type TagParam struct { + Auth *auth `json:"auth"` + SkyOperator *skyOperator `json:"operator"` + Sn string `json:"sn"` + Tag string `json:"tag"` + TagValue string `json:"tagValue"` +} + +type skyOperator struct { + Type interface{} `json:"type"` + Nick string `json:"nick"` + WorkerId string `json:"workerId"` +} + +// vmAdd +type VmAddParam struct { + Auth *auth `json:"auth"` + SkyOperator *skyOperator `json:"operator"` + SkyItem *skyItem `json:"item"` +} + +type skyItem struct { + DeviceType string `json:"deviceType"` + PropertyMap map[string]interface{} `json:"propertyMap"` +} + +// vmDelete +type VmDeleteParam struct { + Auth *auth `json:"auth"` + SkyOperator *skyOperator `json:"operator"` + SkyItem string `json:"item"` +} + +// query +type QueryParam struct { + Auth *auth `json:"auth"` + Item *QueryItem `json:"item"` +} + +type QueryItem struct { + From string `json:"from"` // 查询那个表;底层自动根据选择的字段进行join + Select string `json:"select"` // 查询哪些columns;目标表可以直接取属性名,其他关联表必须以类目名作为前缀.分隔 "sn,ip,node_type,app_group.name" + Condition string `json:"condition"` // 查询条件;暂时只支持AND;左表达式为字段名;表达式: =, !=, >, >=, <, <=, IN, LIKE;右表达式为值字符串和数字,字符串用''包起来,其他像boolean都当成字符类型 + Page int `json:"page"` // 页码;第几页;从1开始 + Num int `json:"num"` // 每页大小 + NeedTotal bool `json:"needTotal"` // 是否需要总数;为true才承诺给准确总数;不需要总数默认false;有利于底层优化 +} + +// 所有api返回的通用结构 +type Result struct { + Success bool `json:"success"` + Value *ResultValue `json:"value"` + ErrorCode int `json:"errorCode"` + ErrorMessage string `json:"errorMessage"` +} + +type ResultValue struct { + TotalCount int `json:"totalCount"` + HasMore bool `json:"hasMore"` + ItemList []map[string]interface{} `json:"itemList"` +} + +type ResultItem struct { + Ip string // 容器或者物理机ip + Sn string // 容器或者物理机sn + HostName string // 主机名 + NodeGroup string // 应用分组 + ParentSn string // 父sn,物理机为机框sn,虚拟机为服务器sn + State string // 应用状态 + Site string // 逻辑机房 + Model string // 型号名 + AppName string // 应用名称 + AppUseType string // 应用用途 +} + +// update +type UpdateParam struct { + Auth *auth `json:"auth"` + SkyOperator *skyOperator `json:"operator"` + UpdateItem *updateItem `json:"item"` +} + +type updateItem struct { + Sn string `json:"sn"` + PropertyMap map[string]interface{} `json:"propertyMap"` +} diff --git a/test/sigma/util/skyline/skylinemanager.go b/test/sigma/util/skyline/skylinemanager.go new file mode 100644 index 0000000000000..2ac8ccec30510 --- /dev/null +++ b/test/sigma/util/skyline/skylinemanager.go @@ -0,0 +1,210 @@ +/** +操作skyline的原生交互接口 + +账号权限: http://docs.skyline.alibaba-inc.com/authority/auth_apply_common.html +查询: http://docs.skyline.alibaba-inc.com/search/lql_search.html +注册: http://docs.skyline.alibaba-inc.com/server/vm_add.html +取消: http://docs.skyline.alibaba-inc.com/server/vm_delete.html +updateNode: http://docs.skyline.alibaba-inc.com/server/server_update.html +*/ +package skyline + +import ( + "bytes" + "crypto/md5" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "time" + + "github.com/golang/glog" +) + +type SkyConfig struct { + App string + Url string + User string + Key string + Concurrency int + Switch bool +} + +type SkylineManager struct { + config *SkyConfig +} + +func NewSkylineManager() *SkylineManager { + return &SkylineManager{ + config: &SkyConfig{ + App: "sigma-apiserver", + Url: "http://sky.alibaba-inc.com", + User: "sigma_engin_app", + Key: "bqtABUMCUoV2J9XR", + Concurrency: 100, + Switch: true, + }, + } +} + +// 查询 +// FIXME 当error==nil时,Result还是可能为nil +func (skyline *SkylineManager) Query(queryItem *QueryItem) (*Result, error) { + queryParam := &QueryParam{ + Auth: skyline.buildAuth(), + Item: queryItem, + } + body, err := json.Marshal(queryParam) + if err != nil { + return nil, err + } + url := fmt.Sprintf(queryUri, skyline.config.Url) + result, err := skyline.httpRequestToSky(url, body) + if err != nil { + return nil, err + } + if !result.Success { + return nil, fmt.Errorf("skyline query queryItem:%v, request url: %v, body: %v, message: %v", + queryItem, url, string(body), result.ErrorMessage) + } + return result, nil +} + +// 简单的认证 +func (skyline *SkylineManager) buildAuth() *auth { + auth := &auth{ + Account: skyline.config.User, + AppName: skyline.config.App, + Timestamp: time.Now().Unix(), + } + md5Cal := md5.New() + io.WriteString(md5Cal, auth.Account) + io.WriteString(md5Cal, skyline.config.Key) + io.WriteString(md5Cal, fmt.Sprintf("%v", auth.Timestamp)) + auth.Signature = hex.EncodeToString(md5Cal.Sum(nil)) + return auth +} + +// 绑定用户信息 +// FIXME 先写死 +func (skyline *SkylineManager) buildSkyOperator(operatorType interface{}) *skyOperator { + skyOperator := &skyOperator{ + Type: operatorType, + Nick: "fengxiu.fl", + WorkerId: "169572", + } + return skyOperator +} + +const ( + DEFAULT_DIAL_TIMEOUT int = 10 + DEFAULT_END2END_TIMEOUT int = 120 + + RETRY_COUNT = 2 + RETRY_INTERVAL = 10 + RETRY_INTERVAL_INCREMENT = 10 +) + +// 通用的http-post +func (skyline *SkylineManager) httpRequestToSky(url string, body []byte) (*Result, error) { + glog.Infof("skylineHttpRequestToSky. request body: %s", string(body)) + //fmt.Println(fmt.Sprintf("skylineHttpRequestToSky. request body: %s", string(body))) + + resByte, rErr := httpPostJsonWithHeadersWithTime(url, body, nil, nil, DEFAULT_DIAL_TIMEOUT, DEFAULT_END2END_TIMEOUT) + if rErr != nil { + glog.Errorf("httpRequestToSky failed: %s", rErr.Error()) + return nil, rErr + } + result := &Result{} + pErr := json.Unmarshal(resByte, result) + if pErr != nil { + glog.Errorf("skylineHttpRequestToSkyUnmarshal failed: %s", pErr.Error()) + return nil, pErr + } + glog.Infof("skylineHttpRequestToSky success. response: %s", string(resByte)) + //fmt.Println(fmt.Sprintf("skylineHttpRequestToSky success. response: %s", string(resByte))) + + return result, nil +} + +func httpPostJsonWithHeadersWithTime(httpUrl string, body []byte, headers map[string]string, params map[string]string, + timeoutInSecond int, end2endTimeoutInSecond int) ([]byte, error) { + var ( + err error + req *http.Request + resp *http.Response + ) + + var netTransport = &http.Transport{ + Dial: (&net.Dialer{ + Timeout: time.Duration(timeoutInSecond) * time.Second, + }).Dial, + } + var client = &http.Client{ + Timeout: time.Duration(end2endTimeoutInSecond) * time.Second, + Transport: netTransport, + } + + values := url.Values{} + for k, v := range params { + values.Set(k, v) + } + + var data []byte + err = RetryInc(func() (err error) { + req, err = http.NewRequest("POST", fmt.Sprintf("%v?%v", httpUrl, values.Encode()), bytes.NewReader(body)) + if err != nil { + return err + } + + req.Header.Add("Content-Type", "application/json") + for k, v := range headers { + req.Header.Add(k, v) + } + + resp, err = client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + data, err = ioutil.ReadAll(resp.Body) + + if resp.StatusCode != 200 { + if err != nil { + return errors.New(fmt.Sprintf("request %v with json %v and Header %v failed, StatusCode:%v, parse body error:%v", + httpUrl, string(body), req.Header, resp.StatusCode, err.Error())) + } + return errors.New(fmt.Sprintf("request %v with json %v and Header %v failed, Status:%v, msg:%v", + httpUrl, string(body), req.Header, resp.Status, string(data))) + } + return nil + }, "HttpPostJsonWithHeaders", RETRY_COUNT, RETRY_INTERVAL, RETRY_INTERVAL_INCREMENT) + + return data, err +} + +func RetryInc(operation func() error, name string, attempts int, retryWaitSeconds int, retryWaitIncSeconds int) (err error) { + for i := 0; ; i++ { + err = operation() + if err == nil { + if i > 0 { + glog.Infof("retry #%d %v finally succeed", i, name) + } + return nil + } + glog.Errorf("retry #%d %v, error: %s", i, name, err) + + if i >= (attempts - 1) { + break + } + + time.Sleep(time.Second * time.Duration(retryWaitSeconds)) + retryWaitSeconds = retryWaitSeconds + retryWaitIncSeconds + } + return err +}