From 7e55e810156880c2569168de58127fbcde02a688 Mon Sep 17 00:00:00 2001 From: Cairry <115769353+Cairry@users.noreply.github.com> Date: Fri, 27 Dec 2024 15:09:08 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=80=20=E9=87=87=E9=9B=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🚀 采集逻辑重构 --- .github/workflows/ci.yaml | 56 +++-- Dockerfile | 24 +- README.md | 29 --- assets/entrypoint | 6 +- assets/filebeat/filebeat.tpl | 7 - controller/containerd.go | 168 +++++++++++++ controller/docker.go | 147 +++++++++++ controller/runtime.go | 101 ++++++++ deploy/README.md | 13 + go.mod | 42 ++-- go.sum | 95 ++++--- log/config/logConfig.go | 181 ++++---------- log/log.go | 80 ++++++ log/nodeInfo/n.go | 35 +-- log/point.go | 39 --- log/worker/worker.go | 61 ----- main.go | 62 +++-- pkg/client/filebeat/filebeat.go | 315 ------------------------ pkg/client/filebeat/types.go | 91 ------- pkg/controller/container.go | 223 ----------------- pkg/controller/docker.go | 184 -------------- pkg/controller/process.go | 27 -- pkg/ctx/ctx.go | 45 ++-- pkg/provider/filebeat.go | 196 +++++++++++++++ pkg/provider/filebeat_types.go | 49 ++++ pkg/runtime/container.go | 17 ++ pkg/runtime/container/c.go | 37 --- pkg/runtime/docker.go | 15 ++ pkg/runtime/docker/c.go | 37 --- pkg/runtime/types.go | 18 ++ pkg/{util/fotmat.go => tools/format.go} | 3 +- pkg/{util => tools}/utils.go | 2 +- pkg/types/types.go | 95 ------- 33 files changed, 1043 insertions(+), 1457 deletions(-) delete mode 100644 README.md create mode 100644 controller/containerd.go create mode 100644 controller/docker.go create mode 100644 controller/runtime.go create mode 100644 log/log.go delete mode 100644 log/point.go delete mode 100644 log/worker/worker.go delete mode 100644 pkg/client/filebeat/filebeat.go delete mode 100644 pkg/client/filebeat/types.go delete mode 100644 pkg/controller/container.go delete mode 100644 pkg/controller/docker.go delete mode 100644 pkg/controller/process.go create mode 100644 pkg/provider/filebeat.go create mode 100644 pkg/provider/filebeat_types.go create mode 100644 pkg/runtime/container.go delete mode 100644 pkg/runtime/container/c.go create mode 100644 pkg/runtime/docker.go delete mode 100644 pkg/runtime/docker/c.go rename pkg/{util/fotmat.go => tools/format.go} (99%) rename pkg/{util => tools}/utils.go (95%) delete mode 100644 pkg/types/types.go diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d22e26b..a204fc6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -13,24 +13,52 @@ jobs: - name: Checkout code uses: actions/checkout@v3 - - name: 1. Set env - run: | - echo "BRANCH_NAME=$(echo ${GITHUB_REF#refs/heads/})" >> $GITHUB_ENV - echo "SHORT_SHA=$(echo ${GITHUB_SHA:0:4})" >> $GITHUB_ENV - echo "DATE=$(TZ=Asia/Shanghai date +%Y-%m-%d.%H-%M-%S)" >> $GITHUB_ENV + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 - - name: 2. Build Docker image - run: | - docker build -t cairry/watchlog:$BRANCH_NAME.$DATE.$SHORT_SHA . - docker tag cairry/watchlog:$BRANCH_NAME.$DATE.$SHORT_SHA cairry/watchlog:latest + - name: Inject slug/short variables + uses: rlespinasse/github-slug-action@v4 + + - name: Set up Docker Buildx + id: buildx + uses: docker/setup-buildx-action@v3 + + - name: Available platforms + run: echo ${{ steps.buildx.outputs.platforms }} - - name: 3. Login to Docker Hub - uses: docker/login-action@v1 + - name: Login to Docker Hub + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKERHUB_USER }} password: ${{ secrets.DOCKERHUB_TOKEN }} - - name: 4. Push Docker image to Docker Hub + - name: Login to Aliyun Hub + uses: docker/login-action@v3 + with: + registry: registry.cn-hangzhou.aliyuncs.com + username: ${{ secrets.OPSRE_ALIHUB_USERNAME }} + password: ${{ secrets.OPSRE_ALIHUB_TOKEN }} + + - name: Set env variables + id: set_env run: | - docker push cairry/watchlog:latest - docker push cairry/watchlog:$BRANCH_NAME.$DATE.$SHORT_SHA \ No newline at end of file + echo "BRANCH_NAME=${GITHUB_REF#refs/heads/}" + echo "SHORT_SHA=${GITHUB_SHA:0:4}" + echo "DATE=$(TZ=Asia/Shanghai date +%Y-%m-%d.%H-%M-%S)" + + - name: Build and push + uses: docker/build-push-action@v6 + with: + context: . + file: ./Dockerfile + # 所需要的体系结构,可以在 Available platforms 步骤中获取所有的可用架构 + platforms: linux/arm64,linux/amd64 + # 镜像推送时间 + push: ${{ github.event_name != 'pull_request' }} + build-args: | + VERSION=${{ env.DATE }} + tags: | + cairry/watchlog:latest + cairry/watchlog:${{ env.BRANCH_NAME }}_${{ env.DATE }}_${{ env.SHORT_SHA }} + registry.cn-hangzhou.aliyuncs.com/opsre/watchlog:latest + registry.cn-hangzhou.aliyuncs.com/opsre/watchlog:${{ env.BRANCH_NAME }}_${{ env.DATE }}_${{ env.SHORT_SHA }} diff --git a/Dockerfile b/Dockerfile index 31a4f8a..7674ff9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM registry.js.design/base/golang:1.18-alpine3.16 AS build +FROM registry.cn-hangzhou.aliyuncs.com/opsre/golang:1.21.9-alpine3.19 AS build ENV GO111MODULE=on \ CGO_ENABLED=0 \ @@ -11,29 +11,21 @@ COPY . /workspace WORKDIR /workspace RUN go mod tidy && \ - CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o point ./main.go && \ - chmod 777 point + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o watchlog ./main.go && \ + chmod 777 watchlog -FROM elastic/filebeat:7.17.10 +FROM registry.js.design/library/filebeat:7.17.10_python2 -USER root +COPY --from=build /workspace/watchlog /usr/share/filebeat/watchlog/watchlog -COPY --from=build /workspace/point /usr/share/filebeat/point/point +COPY assets/entrypoint assets/filebeat/ assets/healthz /usr/share/filebeat/watchlog/ -COPY assets/entrypoint assets/filebeat/ assets/healthz /usr/share/filebeat/point/ - -RUN apt-get update && \ - apt-get -y install python2 && \ - /usr/bin/chmod +x /usr/share/filebeat/point/point /usr/share/filebeat/point/healthz /usr/share/filebeat/point/config.filebeat +RUN /usr/bin/chmod +x /usr/share/filebeat/watchlog/watchlog /usr/share/filebeat/watchlog/healthz /usr/share/filebeat/watchlog/config.filebeat HEALTHCHECK CMD /usr/share/filebeat/healthz -VOLUME /var/log/filebeat - -VOLUME /var/lib/filebeat - WORKDIR /usr/share/filebeat/ ENV PILOT_TYPE=filebeat -ENTRYPOINT ["python2", "/usr/share/filebeat/point/entrypoint"] +ENTRYPOINT ["python2", "/usr/share/filebeat/watchlog/entrypoint"] diff --git a/README.md b/README.md deleted file mode 100644 index 7a25dc2..0000000 --- a/README.md +++ /dev/null @@ -1,29 +0,0 @@ -WatchLog -======== - -`WatchLog`是一个云原生容器日志采集工具。你可以使用它来收集`Docker`、`Containerd`的容器日志并发送到集中式日志管理系统中,例如`elasticsearch` `kafka` `redis`等。 - -版本兼容 -======== -**Input** - -| Service | Version | -|------------|-------------| -| Docker | 推荐 20.x ➕ | -| Containerd | 推荐 1.20.x ➕ | - -**Output** - -| Service | Version | -|---------------|-------------| -| Elasticsearch | 推荐 7.10.x ➕ | -| Kafka | 推荐 2.x | -| Redis | 推荐 6.x | - -快速开始 -======== -➡️ [访问我](deploy/README.md) - -贡献 -======== -欢迎你提出新问题并提出请求。 \ No newline at end of file diff --git a/assets/entrypoint b/assets/entrypoint index bbaf4d1..6e40b7b 100644 --- a/assets/entrypoint +++ b/assets/entrypoint @@ -35,9 +35,9 @@ def cleanup(): def run(): pilot_type = os.environ.get(ENV_PILOT_TYPE) if pilot_filebeat == pilot_type: - tpl_config = "/usr/share/filebeat/point/filebeat.tpl" + tpl_config = "/usr/share/filebeat/watchlog/filebeat.tpl" - os.execve('/usr/share/filebeat/point/point', ['/usr/share/filebeat/point/point', '-template', tpl_config, '-base', base, '-log-level', 'debug'], + os.execve('/usr/share/filebeat/watchlog/watchlog', ['/usr/share/filebeat/watchlog/watchlog', '-template', tpl_config], os.environ) @@ -45,7 +45,7 @@ def config(): pilot_type = os.environ.get(ENV_PILOT_TYPE) if pilot_filebeat == pilot_type: print "start log-pilot:", pilot_filebeat - subprocess.check_call(['/usr/share/filebeat/point/config.filebeat']) + subprocess.check_call(['/usr/share/filebeat/watchlog/config.filebeat']) if __name__ == '__main__': diff --git a/assets/filebeat/filebeat.tpl b/assets/filebeat/filebeat.tpl index 8e6ba9c..037afad 100644 --- a/assets/filebeat/filebeat.tpl +++ b/assets/filebeat/filebeat.tpl @@ -10,23 +10,16 @@ json.keys_under_root: true {{end}} fields: - {{range $key, $value := .CustomFields}} - {{ $key }}: {{ $value }} - {{end}} {{range $key, $value := .Tags}} {{ $key }}: {{ $value }} {{end}} {{range $key, $value := $.container}} {{ $key }}: {{ $value }} {{end}} - {{range $key, $value := .CustomConfigs}} - {{ $key }}: {{ $value }} - {{end}} tail_files: false close_inactive: 2h close_eof: false close_removed: true clean_removed: true close_renamed: false - {{end}} \ No newline at end of file diff --git a/controller/containerd.go b/controller/containerd.go new file mode 100644 index 0000000..a753c8b --- /dev/null +++ b/controller/containerd.go @@ -0,0 +1,168 @@ +package controller + +import ( + "context" + "encoding/json" + "fmt" + "github.com/containerd/containerd" + "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/oci" + "github.com/zeromicro/go-zero/core/logc" + "io" + "regexp" + "strings" + "watchlog/pkg/ctx" + "watchlog/pkg/runtime" +) + +var ( + create = "containerd.events.ContainerCreate" + delete = "containerd.events.ContainerDelete" +) + +type Containerd struct { + ctx *ctx.Context +} + +func NewContainerInterface(ctx *ctx.Context) InterRuntime { + return &Containerd{ + ctx: ctx, + } +} + +func (c Containerd) ProcessContainers() error { + c.ctx.Lock() + defer c.ctx.Unlock() + + containerCtx := namespaces.WithNamespace(c.ctx.Context, "k8s.io") + c.watchEvent(c.ctx, containerCtx) + + containers, err := c.ctx.ContainerdCli.Containers(containerCtx) + if err != nil { + logc.Errorf(context.Background(), fmt.Sprintf("get containers failed, %s", err.Error())) + return err + } + + for _, container := range containers { + if err := c.processContainer(containerCtx, container); err != nil { + logc.Errorf(context.Background(), "process container failed: %v", err) + } + } + + return nil +} + +func (c Containerd) processContainer(containerCtx context.Context, container containerd.Container) error { + meta, err := container.Info(containerCtx) + if err != nil { + return fmt.Errorf("get container meta info failed: %s", err.Error()) + } + + spec, err := container.Spec(containerCtx) + if err != nil { + return fmt.Errorf("get container spec failed: %s", err.Error()) + } + + return processCollectFile(c.ctx, spec.Process.Env, meta) +} + +func (c Containerd) watchEvent(ctx *ctx.Context, containerCtx context.Context) { + msgs, errs := c.ctx.ContainerdCli.EventService().Subscribe(containerCtx, "") + + go func() { + defer logc.Info(context.Background(), "finish to watch containerd event") + logc.Infof(context.Background(), "begin to watch containerd event") + + for { + select { + case msg := <-msgs: + if err := c.processEvent(ctx, containerCtx, msg); err != nil { + logc.Errorf(context.Background(), "process event failed: %v", err) + } + case err := <-errs: + if err == io.EOF || err == io.ErrUnexpectedEOF { + return + } + logc.Errorf(context.Background(), "event subscription error: %v", err) + } + } + }() +} + +func (c Containerd) processEvent(ctx *ctx.Context, containerCtx context.Context, msg *events.Envelope) error { + v := string(msg.Event.GetValue()) + s := strings.TrimPrefix(v, "\n@") + containerId := removeSpecialChars(s) + containerId = strings.Split(containerId, "-")[0] + + t := msg.Event.GetTypeUrl() + switch t { + case create: + if Exists(ctx, containerId) { + return nil + } + + _, err := ctx.ContainerdCli.LoadContainer(containerCtx, containerId) + if err != nil { + if errdefs.IsNotFound(err) { + _, err = ctx.ContainerdCli.LoadContainer(containerCtx, containerId) + } + + return err + } + + meta, err := ctx.ContainerdCli.ContainerService().Get(containerCtx, containerId) + if err != nil { + return err + } + + var spec oci.Spec + err = json.Unmarshal(meta.Spec.GetValue(), &spec) + if err != nil { + return err + } + + err = processCollectFile(ctx, spec.Process.Env, meta) + if err != nil { + return err + } + + return err + + case delete: + logc.Infof(context.Background(), "Process container destroy event: %s", containerId) + + err := DelContainerLogFile(ctx, containerId) + if err != nil { + logc.Errorf(context.Background(), fmt.Sprintf("Process container destroy event error: %s, %s", containerId, err.Error())) + } + } + return nil +} + +func processCollectFile(c *ctx.Context, envs []string, meta containers.Container) error { + // 符合条件的 Env + var logEnvs []string + for _, envVar := range envs { + // LogPrefix: aliyun_logs_tencent-prod-hermione=stdout ,envVar: aliyun_logs + if strings.HasPrefix(envVar, c.LogPrefix) { + logEnvs = append(logEnvs, envVar) + } + } + + fields := CollectFields{ + Id: meta.ID, + Env: logEnvs, + Labels: meta.Labels, + LogPath: fmt.Sprintf("%s_%s_*/%s/*.log", meta.Labels[runtime.KubernetesContainerNamespace], meta.Labels[runtime.KubernetesPodName], meta.Labels[runtime.KubernetesContainerName]), + } + return NewCollectFile(c, fields) +} + +func removeSpecialChars(str string) string { + re := regexp.MustCompile(`[^a-zA-Z0-9]+`) + return re.ReplaceAllString(str, "-") +} diff --git a/controller/docker.go b/controller/docker.go new file mode 100644 index 0000000..6ff8ae2 --- /dev/null +++ b/controller/docker.go @@ -0,0 +1,147 @@ +package controller + +import ( + "context" + "fmt" + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" + "github.com/docker/docker/api/types/filters" + "github.com/zeromicro/go-zero/core/logc" + "io" + "strings" + "watchlog/pkg/ctx" +) + +type Docker struct { + ctx *ctx.Context + f filters.Args +} + +// NewDockerInterface creates a new Docker interface. +func NewDockerInterface(ctx *ctx.Context, f filters.Args) InterRuntime { + return &Docker{ctx: ctx, f: f} +} + +// ProcessContainers processes the Docker containers and logs. +func (d *Docker) ProcessContainers() error { + d.ctx.Lock() + defer d.ctx.Unlock() + + d.watchEvent(d.f) + containers, err := d.listContainers() + if err != nil { + return err + } + + for _, c := range containers { + if c.State == "removing" { + continue + } + + if Exists(d.ctx, c.ID) { + continue + } + + if err := d.processContainer(c.ID); err != nil { + logc.Errorf(context.Background(), fmt.Sprintf("Error processing container %s: %v", c.ID, err)) + } + } + return nil +} + +// listContainers retrieves the list of Docker containers. +func (d *Docker) listContainers() ([]types.Container, error) { + opts := types.ContainerListOptions{} + containers, err := d.ctx.DockerCli.ContainerList(d.ctx, opts) + if err != nil { + logc.Errorf(context.Background(), fmt.Sprintf("Failed to list containers: %s", err.Error())) + return nil, err + } + return containers, nil +} + +// processContainer inspects and processes an individual container. +func (d *Docker) processContainer(containerID string) error { + containerJSON, err := d.ctx.DockerCli.ContainerInspect(d.ctx, containerID) + if err != nil { + logc.Errorf(context.Background(), fmt.Sprintf("Failed to inspect container %s: %v", containerID, err)) + return err + } + + if !Collect(containerJSON.Config.Env, d.ctx.LogPrefix) { + return nil + } + + // 符合条件的 Env + var logEnvs []string + for _, envVar := range containerJSON.Config.Env { + // LogPrefix: aliyun_logs_tencent-prod-hermione=stdout ,envVar: aliyun_logs + if strings.HasPrefix(envVar, d.ctx.LogPrefix) { + logEnvs = append(logEnvs, envVar) + } + } + + fields := CollectFields{ + Id: containerJSON.ID, + Env: logEnvs, + Labels: containerJSON.Config.Labels, + LogPath: containerJSON.LogPath, + } + return NewCollectFile(d.ctx, fields) +} + +// watchEvent listens for Docker events and processes them. +func (d *Docker) watchEvent(filter filters.Args) { + options := types.EventsOptions{Filters: filter} + msgs, errs := d.ctx.DockerCli.Events(d.ctx.Context, options) + + go func() { + logc.Infof(context.Background(), "Beginning to watch docker events") + for { + select { + case msg := <-msgs: + if err := d.processEvent(msg); err != nil { + logc.Errorf(context.Background(), fmt.Sprintf("Error processing event: %v", err)) + } + case err := <-errs: + logc.Errorf(context.Background(), fmt.Sprintf("Error in event stream: %v", err)) + if err == io.EOF || err == io.ErrUnexpectedEOF { + return + } + } + } + }() +} + +// processEvent handles Docker events for containers. +func (d *Docker) processEvent(msg events.Message) error { + containerID := msg.Actor.ID + switch msg.Action { + case "start", "restart": + return d.handleStartRestartEvent(containerID) + case "destroy", "die": + return d.handleDestroyDieEvent(containerID) + default: + return nil + } +} + +// handleStartRestartEvent processes container start/restart events. +func (d *Docker) handleStartRestartEvent(containerID string) error { + logc.Debugf(context.Background(), "Processing container start/restart event: %s", containerID) + if Exists(d.ctx, containerID) { + logc.Debugf(context.Background(), "Container %s already exists, skipping", containerID) + return nil + } + return d.processContainer(containerID) +} + +// handleDestroyDieEvent processes container destroy/die events. +func (d *Docker) handleDestroyDieEvent(containerID string) error { + if !Exists(d.ctx, containerID) { + return nil + } + + logc.Debugf(context.Background(), "Processing container destroy event: %s", containerID) + return DelContainerLogFile(d.ctx, containerID) +} diff --git a/controller/runtime.go b/controller/runtime.go new file mode 100644 index 0000000..9a602b3 --- /dev/null +++ b/controller/runtime.go @@ -0,0 +1,101 @@ +package controller + +import ( + "context" + "fmt" + "github.com/zeromicro/go-zero/core/logc" + "io/ioutil" + "os" + "path/filepath" + "strings" + logtypes "watchlog/log/config" + "watchlog/pkg/ctx" + "watchlog/pkg/runtime" +) + +type InterRuntime interface { + ProcessContainers() error +} + +// Collect 判断是否需要收集日志 +func Collect(env []string, logPrefix string) bool { + var exist bool + for _, e := range env { + if strings.HasPrefix(e, logPrefix) { + exist = true + } + } + + return exist +} + +// Exists 判断采集容器日志的配置是否存在 +func Exists(ctx *ctx.Context, containId string) bool { + if _, err := os.Stat(ctx.FilebeatPointer.GetConfPath(containId)); os.IsNotExist(err) { + return false + } + return true +} + +// DelContainerLogFile 销毁采集容器日志文件 +func DelContainerLogFile(ctx *ctx.Context, id string) error { + logc.Infof(context.Background(), "Try removing log config %s", id) + if err := os.Remove(ctx.FilebeatPointer.GetConfPath(id)); err != nil { + return fmt.Errorf("removing %s log config failure, err: %s", id, err.Error()) + } + + return nil +} + +type CollectFields struct { + Id string + Env []string + Labels map[string]string + LogPath string +} + +// NewCollectFile 创建Filebeat采集配置 +func NewCollectFile(ctx *ctx.Context, cf CollectFields) error { + id := cf.Id + env := cf.Env + labels := cf.Labels + jsonLogPath := cf.LogPath + ct := runtime.BuildContainerLabels(labels) + logEnvs := getLogEnvs(env) + + logPath := filepath.Join(ctx.BaseDir, jsonLogPath) // /host/var/lib/containerd/log/pods/intl_diagon-alley-5cf4c7cddc-7nd94_*/diagon-alley/*.log + logConfigs, err := logtypes.GetLogConfigs(ctx.LogPrefix, logPath, logEnvs) + if err != nil { + return fmt.Errorf("GetLogConfigs failed, err: %s", err.Error()) + } + + if len(logConfigs) == 0 { + return nil + } + + //生成 filebeat 采集配置 + logConfig, err := ctx.FilebeatPointer.RenderLogConfig(id, ct, logConfigs) + if err != nil { + return fmt.Errorf("RenderLogConfig failed, err: %s", err.Error()) + } + + //TODO validate config before save + logc.Infof(context.Background(), fmt.Sprintf("Write Log config, path: %s", ctx.FilebeatPointer.GetConfPath(id))) + if err = ioutil.WriteFile(ctx.FilebeatPointer.GetConfPath(id), []byte(logConfig), os.FileMode(0644)); err != nil { + return fmt.Errorf("WriteFile failed, err: %s", err.Error()) + } + + return nil +} + +// getLogEnvs 获取关键 Envs +func getLogEnvs(env []string) map[string]string { + var logEnv = map[string]string{} // map[aliyun_logs_tencent-prod-diagon-alley:stdout] + for _, e := range env { + envLabel := strings.SplitN(e, "=", 2) // [aliyun_logs_tencent-prod-diagon-alley stdout] 2 + if len(envLabel) == 2 { + logEnv[envLabel[0]] = envLabel[1] // aliyun_logs_tencent-prod-diagon-alley stdout + } + } + return logEnv +} diff --git a/deploy/README.md b/deploy/README.md index 63db46b..68ef378 100644 --- a/deploy/README.md +++ b/deploy/README.md @@ -1,9 +1,22 @@ # 运行 WatchLog ## 确定参数配置 - LOG_PREFIX:日志前缀标识, 默认是watchlog, 支持自定义 +- LOG_BASE_DIR:日志存储目录(挂载到WatchLog容器内的路径),默认 `/host/var/log/pods` - RUNTIME_TYPE:运行时类型,支持`docker` `containerd` - LOGGING_OUTPUT:日志输出类型,支持主流的`kafka` `elasticsearch` `redis` `file`等 +**LOG_PREFIX 详细** +```yaml + - name: LOG_PREFIX + value: watchlog +``` + +**LOG_BASE_DIR 详细** +```yaml + - name: LOG_BASE_DIR + value: "/host/var/log/pods" +``` + **RUNTIME_TYPE 详细** ```yaml - name: RUNTIME_TYPE diff --git a/go.mod b/go.mod index 39b99a3..2d4431a 100644 --- a/go.mod +++ b/go.mod @@ -1,18 +1,13 @@ module watchlog -go 1.18 +go 1.21 require ( github.com/containerd/containerd v1.7.7 github.com/docker/docker v23.0.3+incompatible github.com/elastic/go-ucfg v0.8.8 github.com/sirupsen/logrus v1.9.3 -) - -require ( - go.opentelemetry.io/otel v1.16.0 // indirect - go.opentelemetry.io/otel/metric v1.16.0 // indirect - go.opentelemetry.io/otel/trace v1.16.0 // indirect + github.com/zeromicro/go-zero v1.7.4 ) require ( @@ -30,15 +25,16 @@ require ( github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect github.com/docker/go-units v0.5.0 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/fatih/color v1.18.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.4 // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/google/uuid v1.3.0 // indirect - github.com/klauspost/compress v1.16.0 // indirect - github.com/kr/text v0.2.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/moby/locker v1.0.1 // indirect github.com/moby/sys/mountinfo v0.6.2 // indirect github.com/moby/sys/sequential v0.5.0 // indirect @@ -51,19 +47,19 @@ require ( github.com/opencontainers/runtime-spec v1.1.0 // indirect github.com/opencontainers/selinux v1.11.0 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/stretchr/testify v1.9.0 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect go.opencensus.io v0.24.0 // indirect - golang.org/x/mod v0.11.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect - golang.org/x/time v0.5.0 // indirect - golang.org/x/tools v0.10.0 // indirect + go.uber.org/automaxprocs v1.6.0 // indirect + golang.org/x/mod v0.17.0 // indirect + golang.org/x/net v0.31.0 // indirect + golang.org/x/sync v0.9.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/text v0.20.0 // indirect + golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect - google.golang.org/grpc v1.56.3 // indirect - google.golang.org/protobuf v1.34.1 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect + google.golang.org/grpc v1.65.0 // indirect + google.golang.org/protobuf v1.35.2 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gotest.tools/v3 v3.5.1 // indirect ) diff --git a/go.sum b/go.sum index 7e445dc..e6bf579 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,7 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0 h1:59MxjQVfjXsBpLy+dbd2/ELV5ofnUkUZBvWSC85sheA= github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0/go.mod h1:OahwfttHWG6eJ0clwcfBAHoDI6X/LV/15hx/wlMZSrU= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= +github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= @@ -31,7 +32,6 @@ github.com/containerd/typeurl/v2 v2.1.1 h1:3Q4Pt7i8nYwy2KmQWIw2+1hTvwTE/6w9Fqctt github.com/containerd/typeurl/v2 v2.1.1/go.mod h1:IDp2JFvbwZ31H8dQbEIY7sDl2L3o3HZj1hsSQlywkQ0= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -53,10 +53,12 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -79,8 +81,6 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -92,17 +92,21 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= -github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg= github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc= github.com/moby/sys/mountinfo v0.5.0/go.mod h1:3bMD3Rg+zkqx8MRYPi7Pyb0Ie97QEBmdxbhnCLlSvSU= @@ -135,18 +139,25 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -161,14 +172,20 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/zeromicro/go-zero v1.7.4 h1:lyIUsqbpVRzM4NmXu5pRM3XrdRdUuWOkQmHiNmJF0VU= +github.com/zeromicro/go-zero v1.7.4/go.mod h1:jmv4hTdUBkDn6kxgI+WrKQw0q6LKxDElGPMfCLOeeEY= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= -go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= -go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= -go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= -go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= -go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= +go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= +go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= +go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -178,8 +195,8 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= -golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -190,8 +207,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= +golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -199,8 +216,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -215,16 +232,18 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= -golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -233,8 +252,8 @@ golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg= -golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -247,16 +266,16 @@ google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 h1:Z0hjGZePRE0ZBWotvtrwxFNrNE9CUAGtplaDK5NNI/g= google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98/go.mod h1:S7mY02OqCJTD0E1OiQy1F72PWFB4bZJ87cAtLPYgDR0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 h1:AgADTJarZTBqgjiUzRgfaBchgYB3/WFTC80GPwsMcRI= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 h1:BwIjyKYGsK9dMCBOorzRri8MQwmi7mT9rGHsCEinZkA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= -google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc= -google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -268,8 +287,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= -google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/log/config/logConfig.go b/log/config/logConfig.go index f5bb410..b0df4be 100644 --- a/log/config/logConfig.go +++ b/log/config/logConfig.go @@ -2,63 +2,32 @@ package config import ( "fmt" - log "github.com/sirupsen/logrus" - "os" "path/filepath" - "sort" "strings" - "watchlog/log/nodeInfo" - "watchlog/pkg/util" ) // LogConfig log configuration type LogConfig struct { - Name string - HostDir string - ContainerDir string - Format string - FormatConfig map[string]string - File string - Tags map[string]string - Target string - EstimateTime bool - Stdout bool - LogType string - CustomFields map[string]string - CustomConfigs map[string]string + Name string + HostDir string + ContainerDir string + Format string + FormatConfig map[string]string + File string + Tags map[string]string + EstimateTime bool + Stdout bool + LogType string } -const LabelServiceLogsTmpl = "%s." +const LabelServiceLogsTmpl = "%s_" -func GetLogConfigs(logPrefix string, jsonLogPath string, labels map[string]string) ([]*LogConfig, error) { - var ret []*LogConfig - - var labelNames []string - //sort keys - for k := range labels { - labelNames = append(labelNames, k) - } - sort.Strings(labelNames) - - root := nodeInfo.NewLogInfoNode("") - - for _, label := range labelNames { - prefix := fmt.Sprintf(LabelServiceLogsTmpl, logPrefix) - newLogPrefix := strings.Replace(prefix, "_", ".", -1) - if !strings.HasPrefix(label, newLogPrefix) { - continue - } - - logLabel := strings.TrimPrefix(label, newLogPrefix) - key := strings.Split(logLabel, ".") - if err := root.Insert(key, labels[label]); err != nil { - log.Errorf("%s", err.Error()) - return nil, err - } - } - - for name, node := range root.Children { - logConfig, err := parseLogConfig(name, node, jsonLogPath) +func GetLogConfigs(logPrefix string, jsonLogPath string, labels map[string]string) ([]LogConfig, error) { + var ret []LogConfig + for label, _ := range labels { + p := fmt.Sprintf(LabelServiceLogsTmpl, logPrefix) + logTopicName := strings.TrimPrefix(label, p) // watchlog_default, logTopicName = default + logConfig, err := parseLogConfig(logTopicName, labels[label], jsonLogPath) if err != nil { return nil, err } @@ -67,101 +36,37 @@ func GetLogConfigs(logPrefix string, jsonLogPath string, labels map[string]strin return ret, nil } -func parseLogConfig(name string, info *nodeInfo.LogInfoNode, jsonLogPath string) (*LogConfig, error) { - path := strings.TrimSpace(info.Value) - if path == "" { - return nil, fmt.Errorf("path for %s is empty", name) - } - - tags := info.Get("tags") - tagMap, err := parseTags(tags) - if err != nil { - return nil, fmt.Errorf("parse tags for %s error: %v", name, err) - } - - target := info.Get("target") - // add default index or topic - if _, ok := tagMap["index"]; !ok { - if target != "" { - tagMap["index"] = target - } else { - tagMap["index"] = name - } - } - - if _, ok := tagMap["topic"]; !ok { - if target != "" { - tagMap["topic"] = target - } else { - tagMap["topic"] = name - } - } - - format := info.Children["format"] - if format == nil || format.Value == "none" { - format = nodeInfo.NewLogInfoNode("nonex") - } - - formatConfig, err := util.Convert(format) - if err != nil { - return nil, fmt.Errorf("in log %s: format error: %v", name, err) - } - - // 特殊处理regex - if format.Value == "regexp" { - format.Value = fmt.Sprintf("/%s/", formatConfig["pattern"]) - delete(formatConfig, "pattern") - } - - rt := os.Getenv("RUNTIME_TYPE") - var lt string - switch rt { - case "docker", "containerd": - lt = "container" +//func getLabelNames(logPrefix string, labels map[string]string) []string { +// var labelNames []string +// for k := range labels { +// if strings.HasPrefix(k, logPrefix) { +// labelNames = append(labelNames, k) +// } +// } +// //sort keys +// sort.Strings(labelNames) +// return labelNames +//} + +func parseLogConfig(label, value string, jsonLogPath string) (LogConfig, error) { + cfg := new(LogConfig) + if value == "" { + return *cfg, fmt.Errorf("env %s value don't is null", label) } - cfg := new(LogConfig) // 标准输出日志 - if path == "stdout" { + if value == "stdout" { logFile := filepath.Base(jsonLogPath) + "*" - cfg = &LogConfig{ - File: logFile, - Name: name, - HostDir: filepath.Dir(jsonLogPath), - Format: format.Value, - Tags: tagMap, - FormatConfig: map[string]string{"time_format": "%Y-%m-%dT%H:%M:%S.%NZ"}, - Target: target, - EstimateTime: false, - Stdout: true, - LogType: lt, + File: logFile, + Name: label, + HostDir: filepath.Dir(jsonLogPath), + Tags: map[string]string{ + "index": label, + "topic": label, + }, + LogType: "container", } } - - return cfg, nil -} - -func parseTags(tags string) (map[string]string, error) { - tagMap := make(map[string]string) - - if tags == "" { - return tagMap, nil - } - - kvArray := strings.Split(tags, ",") - for _, kv := range kvArray { - arr := strings.Split(kv, "=") - if len(arr) != 2 { - return nil, fmt.Errorf("%s is not a valid k=v format", kv) - } - key := strings.TrimSpace(arr[0]) - value := strings.TrimSpace(arr[1]) - if key == "" || value == "" { - return nil, fmt.Errorf("%s is not a valid k=v format", kv) - } - tagMap[key] = value - } - - return tagMap, nil + return *cfg, nil } diff --git a/log/log.go b/log/log.go new file mode 100644 index 0000000..dfb47b4 --- /dev/null +++ b/log/log.go @@ -0,0 +1,80 @@ +package log + +import ( + "context" + "github.com/docker/docker/api/types/filters" + "github.com/zeromicro/go-zero/core/logc" + "io/ioutil" + "os" + "os/signal" + "syscall" + "text/template" + "watchlog/controller" + "watchlog/pkg/ctx" + "watchlog/pkg/provider" +) + +// Run starts the log pilot. +func Run(tmplPath, logPrefix, baseDir string) error { + tmpl, err := loadTemplate(tmplPath) + if err != nil { + return err + } + + p := provider.NewFilebeatPointer(tmpl, baseDir) + c := ctx.NewContext(baseDir, logPrefix, p) + return startWorker(c) +} + +// loadTemplate reads and parses the template file. +func loadTemplate(path string) (*template.Template, error) { + data, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + return template.New("watchalert").Parse(string(data)) +} + +// startWorker initiates the worker process. +func startWorker(c *ctx.Context) error { + if err := c.FilebeatPointer.CleanConfigs(); err != nil { + return err + } + + if err := c.FilebeatPointer.Start(); err != nil { + return err + } + + if err := processContainers(c); err != nil { + return err + } + + waitForShutdown() + logc.Infof(context.Background(), "Program Stop Successful!!!") + return nil +} + +// processContainers handles container processing based on the runtime type. +func processContainers(c *ctx.Context) error { + switch os.Getenv("RUNTIME_TYPE") { + case "docker": + logc.Infof(context.Background(), "Processing Docker runtime") + filter := filters.NewArgs() + filter.Add("type", "container") + dockerController := controller.NewDockerInterface(c, filter) + return dockerController.ProcessContainers() + case "containerd": + logc.Infof(context.Background(), "Processing container runtime") + containerController := controller.NewContainerInterface(c) + return containerController.ProcessContainers() + default: + return nil + } +} + +// waitForShutdown listens for OS signals to gracefully shut down the program. +func waitForShutdown() { + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + <-sig +} diff --git a/log/nodeInfo/n.go b/log/nodeInfo/n.go index 3df11a8..d66140a 100644 --- a/log/nodeInfo/n.go +++ b/log/nodeInfo/n.go @@ -1,15 +1,14 @@ package nodeInfo -import ( - "fmt" -) +import "fmt" -// LogInfoNode node info +// LogInfoNode 表示树结构中的一个节点 type LogInfoNode struct { Value string Children map[string]*LogInfoNode } +// NewLogInfoNode 创建一个新的节点 func NewLogInfoNode(value string) *LogInfoNode { return &LogInfoNode{ Value: value, @@ -17,36 +16,20 @@ func NewLogInfoNode(value string) *LogInfoNode { } } -func (node *LogInfoNode) Insert(keys []string, value string) error { - if len(keys) == 0 { - return nil - } - - key := keys[0] - if key == "" { - return nil - } - if len(keys) > 1 { - if child, ok := node.Children[key]; ok { - err := child.Insert(keys[1:], value) - if err != nil { - return err - } - } else { - return fmt.Errorf("%s has no parent node", key) - } - } else { - child := NewLogInfoNode(value) - node.Children[key] = child +// Insert 插入新值,如果必要会自动创建缺失的父节点 +func (node *LogInfoNode) Insert(key string, value string) error { + if len(key) == 0 { + return fmt.Errorf("键不能为空") } + node.Children[key] = NewLogInfoNode(value) return nil } +// Get 根据键获取对应的值 func (node *LogInfoNode) Get(key string) string { if child, ok := node.Children[key]; ok { return child.Value } - return "" } diff --git a/log/point.go b/log/point.go deleted file mode 100644 index a3f8d6a..0000000 --- a/log/point.go +++ /dev/null @@ -1,39 +0,0 @@ -package log - -import ( - "io/ioutil" - "text/template" - "watchlog/log/worker" - "watchlog/pkg/client/filebeat" - "watchlog/pkg/ctx" -) - -// Run start log pilot -func Run(tmplPath string, baseDir string) error { - b, err := ioutil.ReadFile(tmplPath) - if err != nil { - panic(err) - } - - c, err := New(string(b), baseDir) - if err != nil { - panic(err) - } - - return worker.NewWorker(c).Run() -} - -// New returns a log pilot instance -func New(tplStr string, baseDir string) (*ctx.Context, error) { - tmpl, err := template.New("pilot").Parse(tplStr) - if err != nil { - return nil, err - } - - p, err := filebeat.NewFilebeatPointer(tmpl, baseDir), nil - if err != nil { - return nil, err - } - - return ctx.NewContext(baseDir, p), nil -} diff --git a/log/worker/worker.go b/log/worker/worker.go deleted file mode 100644 index 923accb..0000000 --- a/log/worker/worker.go +++ /dev/null @@ -1,61 +0,0 @@ -package worker - -import ( - "github.com/docker/docker/api/types/filters" - log "github.com/sirupsen/logrus" - "os" - "time" - "watchlog/pkg/controller" - "watchlog/pkg/ctx" -) - -type Worker struct { - ctx *ctx.Context -} - -func NewWorker(c *ctx.Context) Worker { - return Worker{ - ctx: c, - } -} - -func (w Worker) Run() error { - // 清理旧配置 - if err := w.ctx.CleanConfigs(); err != nil { - return err - } - - // 启动 filebeat - err := w.ctx.Piloter.Start() - if err != nil { - return err - } - - w.ctx.LastReload = time.Now() - //go w.ctx.DoReload() - - filter := filters.NewArgs() - filter.Add("type", "container") - - switch os.Getenv("RUNTIME_TYPE") { - case "docker": - log.Info("Process docker runtime") - c := controller.NewDockerInterface(w.ctx, filter) - err := c.ProcessContainers(w.ctx) - if err != nil { - return err - } - case "containerd": - log.Info("Process container runtime") - c := controller.NewContainerInterface(w.ctx) - err := c.ProcessContainers() - if err != nil { - return err - } - } - - <-w.ctx.StopChan - close(w.ctx.ReloadChan) - close(w.ctx.StopChan) - return nil -} diff --git a/main.go b/main.go index f5d1dc5..956b693 100644 --- a/main.go +++ b/main.go @@ -1,52 +1,60 @@ package main import ( + "context" "flag" - log "github.com/sirupsen/logrus" + "github.com/zeromicro/go-zero/core/logc" "os" - "path/filepath" - log2 "watchlog/log" + "watchlog/log" ) func main() { - // -template /pilot/filebeat.tpl -base /host -log-level debug + // Command-line flags template := flag.String("template", "", "Template filepath for fluentd or filebeat.") - base := flag.String("base", "", "Directory which mount host root.") - level := flag.String("log-level", "INFO", "Log level") flag.Parse() - if os.Getenv("DOCKER_API_VERSION") == "" { - err := os.Setenv("DOCKER_API_VERSION", "1.23") - if err != nil { - log.Errorf(err.Error()) - return - } + // Set default Docker API version if not set + if err := setDefaultDockerAPIVersion(); err != nil { + logc.Errorf(context.Background(), err.Error()) + return } + // Validate runtime type if os.Getenv("RUNTIME_TYPE") == "" { - log.Errorf("Please set service type, (docker|containerd)") + panic("Please set service type, (docker|containerd)") } - baseDir, err := filepath.Abs(*base) - if err != nil { - panic(err) + // Validate template + if *template == "" { + panic("template file cannot be empty") } - if baseDir == "/" { - baseDir = "" + // Run log processing + if err := log.Run(*template, getLogPrefix(), getBaseDir()); err != nil { + logc.Errorf(context.Background(), err.Error()) } +} - if *template == "" { - panic("template file can not be empty") +// setDefaultDockerAPIVersion sets the default Docker API version if not already set. +func setDefaultDockerAPIVersion() error { + if os.Getenv("DOCKER_API_VERSION") == "" { + return os.Setenv("DOCKER_API_VERSION", "1.24") } + return nil +} - log.SetOutput(os.Stdout) - logLevel, err := log.ParseLevel(*level) - if err != nil { - log.Errorf(err.Error()) - panic(err) +// getLogPrefix retrieves the log prefix from the environment or defaults to "watchlog". +func getLogPrefix() string { + if lp := os.Getenv("LOG_PREFIX"); len(lp) > 0 { + return lp } - log.SetLevel(logLevel) + return "watchlog" +} - log.Fatal(log2.Run(*template, baseDir)) +// getBaseDir get log base store dir or defaults to "/host/var/log/pods" +func getBaseDir() string { + if lbd := os.Getenv("LOG_BASE_DIR"); len(lbd) > 0 { + return lbd + } + return "/host/var/log/pods" } diff --git a/pkg/client/filebeat/filebeat.go b/pkg/client/filebeat/filebeat.go deleted file mode 100644 index b7c0aa1..0000000 --- a/pkg/client/filebeat/filebeat.go +++ /dev/null @@ -1,315 +0,0 @@ -package filebeat - -import ( - "bytes" - "encoding/json" - "fmt" - "github.com/elastic/go-ucfg" - "github.com/elastic/go-ucfg/yaml" - log "github.com/sirupsen/logrus" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "regexp" - "strings" - "text/template" - "time" - logtypes "watchlog/log/config" -) - -var filebeat *exec.Cmd - -var configOpts = []ucfg.Option{ - ucfg.PathSep("."), - ucfg.ResolveEnv, - ucfg.VarExp, -} - -const ( - EnvLoggingOutput = "LOGGING_OUTPUT" -) - -type InterFilebeatPointer interface { - Name() string - - Start() error - Reload() error - Stop() error - - GetBaseConf() string - // GetConfHome [base]/prospectors.d - GetConfHome() string - GetConfPath(container string) string - - OnDestroyEvent(container string) error - - RenderLogConfig(containerId string, container map[string]string, configList []*logtypes.LogConfig) (string, error) -} - -// NewFilebeatPointer returns a FilebeatPointer instance -func NewFilebeatPointer(tmpl *template.Template, baseDir string) InterFilebeatPointer { - return &FilebeatPointer{ - name: "filebeat", - Tmpl: tmpl, - baseDir: baseDir, - watchDone: make(chan bool), - watchContainer: make(map[string]string, 0), - watchDuration: 60 * time.Second, - } -} - -// Start starting and watching filebeat process -func (p *FilebeatPointer) Start() error { - if filebeat != nil { - pid := filebeat.Process.Pid - log.Infof("filebeat started, pid: %v", pid) - return fmt.Errorf("filebeat process is exists") - } - - log.Info("starting filebeat") - filebeat = exec.Command(FilebeatExecCmd, "-c", FilebeatConfFile) - filebeat.Stderr = os.Stderr - filebeat.Stdout = os.Stdout - err := filebeat.Start() - if err != nil { - log.Errorf("filebeat start fail: %v", err) - } - - go func() { - log.Infof("filebeat started pid: %v", filebeat.Process.Pid) - err := filebeat.Wait() - if err != nil { - log.Errorf("filebeat exited: %v", err) - if exitError, ok := err.(*exec.ExitError); ok { - processState := exitError.ProcessState - log.Errorf("filebeat exited pid: %v", processState.Pid()) - } - } - - // try to restart filebeat - log.Warningf("filebeat exited and try to restart") - filebeat = nil - err = p.Start() - if err != nil { - return - } - }() - - go func() { - err := p.watch() - if err != nil { - panic(err) - } - }() - return err -} - -// Stop log collection -func (p *FilebeatPointer) Stop() error { - p.watchDone <- true - return nil -} - -// Reload reload configuration file -func (p *FilebeatPointer) Reload() error { - log.Debug("do not need to reload filebeat") - return nil -} - -// start filebeat watch process -func (p *FilebeatPointer) watch() error { - log.Infof("%s watcher start", p.Name()) - for { - select { - case <-p.watchDone: - log.Infof("%s watcher stop", p.Name()) - return nil - case <-time.After(p.watchDuration): - err := p.scan() - if err != nil { - log.Errorf("%s watcher scan error: %v", p.Name(), err) - } - } - } -} - -func (p *FilebeatPointer) scan() error { - // 获取注册表状态 - states, err := p.getRegistryState() - if err != nil { - return fmt.Errorf("failed to get registry state: %v", err) - } - - // 加载配置路径 - configPaths := p.loadConfigPaths() - - // 处理所有监控的容器 - for container := range p.watchContainer { - if err := p.processContainer(container, states, configPaths); err != nil { - log.Errorf("error processing container %s: %v", container, err) - } - } - - return nil -} - -func (p *FilebeatPointer) processContainer(container string, states map[string]RegistryState, configPaths map[string]string) error { - confPath := p.GetConfPath(container) - if _, err := os.Stat(confPath); err != nil { - if os.IsNotExist(err) { - log.Warnf("log config %s.yml has been removed and will be ignored", container) - delete(p.watchContainer, container) - return nil - } - return fmt.Errorf("failed to stat config path %s: %v", confPath, err) - } - - if p.canRemoveConf(container, states, configPaths) { - log.Warnf("attempting to remove log config for container %s", container) - if err := os.Remove(confPath); err != nil { - return fmt.Errorf("failed to remove log config %s.yml: %v", container, err) - } - log.Infof("successfully removed log config for container %s", container) - delete(p.watchContainer, container) - } - - return nil -} - -func (p *FilebeatPointer) canRemoveConf(container string, registry map[string]RegistryState, configPaths map[string]string) bool { - config, err := p.loadConfig(container) - if err != nil { - return false - } - - for _, path := range config.Paths { - pDir := filepath.Dir(path) - autoMount := p.isAutoMountPath(pDir) - logFiles, _ := filepath.Glob(path) - for _, logFile := range logFiles { - info, err := os.Stat(logFile) - if err != nil && os.IsNotExist(err) { - continue - } - if _, ok := registry[logFile]; !ok { - log.Warnf("%s->%s registry not exist", container, logFile) - continue - } - if registry[logFile].V.Offset < info.Size() { - if autoMount { // ephemeral logs - log.Infof("%s->%s does not finish to read", container, logFile) - return false - } else if _, ok := configPaths[path]; !ok { // host path bind - log.Infof("%s->%s does not finish to read and not exist in other config", - container, logFile) - return false - } - } - } - } - return true -} - -// 解析容器信息配置,获取path信息 -func (p *FilebeatPointer) loadConfig(container string) (*Config, error) { - // get config full path, /etc/filebeat/inputs.d/*.yml - confPath := p.GetConfPath(container) - c, err := yaml.NewConfigWithFile(confPath, configOpts...) - if err != nil { - log.Errorf("read %s.yml log config error: %v", container, err) - return nil, err - } - - var config Config - if err := c.Unpack(&config); err != nil { - log.Errorf("parse %s.yml log config error: %v", container, err) - return nil, err - } - return &config, nil -} - -// 加载容器config, path -func (p *FilebeatPointer) loadConfigPaths() map[string]string { - paths := make(map[string]string, 0) - // 读取 prospectors.d 目录下所有配置 - confs, _ := ioutil.ReadDir(p.GetConfHome()) - for _, conf := range confs { - // get file name - container := strings.TrimRight(conf.Name(), ".yml") - if _, ok := p.watchContainer[container]; ok { - continue // ignore removed container - } - - config, err := p.loadConfig(container) - if err != nil || config == nil { - continue - } - - for _, path := range config.Paths { - if _, ok := paths[path]; !ok { - paths[path] = container - } - } - } - return paths -} - -// 判断容器日志路径是否和挂载点匹配 -func (p *FilebeatPointer) isAutoMountPath(path string) bool { - dockerVolumePattern := fmt.Sprintf("^%s.*$", filepath.Join(p.baseDir, DockerSystemPath)) - if ok, _ := regexp.MatchString(dockerVolumePattern, path); ok { - return true - } - - kubeletVolumePattern := fmt.Sprintf("^%s.*$", filepath.Join(p.baseDir, KubeletSystemPath)) - ok, _ := regexp.MatchString(kubeletVolumePattern, path) - return ok -} - -// 获取 filebeat 仓库中容器日志的基本信息 -func (p *FilebeatPointer) getRegistryState() (map[string]RegistryState, error) { - f, err := os.Open(FilebeatRegistry) - if err != nil { - return nil, err - } - defer f.Close() - - decoder := json.NewDecoder(f) - var state RegistryState - err = decoder.Decode(&state) - if err != nil { - return nil, err - } - - statesMap := make(map[string]RegistryState, 0) - if _, ok := statesMap[state.V.Source]; !ok { - statesMap[state.V.Source] = state - } - - return statesMap, nil -} - -// RenderLogConfig 生成日志采集配置文件 -func (p *FilebeatPointer) RenderLogConfig(containerId string, container map[string]string, configList []*logtypes.LogConfig) (string, error) { - for _, config := range configList { - log.Infof("logs: %s = %v", containerId, config) - } - - output := os.Getenv(EnvFilebeatOutput) - if output == "" { - output = os.Getenv(EnvLoggingOutput) - } - - var buf bytes.Buffer - m := map[string]interface{}{ - "containerId": containerId, - "configList": configList, - "container": container, - "output": output, - } - if err := p.Tmpl.Execute(&buf, m); err != nil { - return "", err - } - return buf.String(), nil -} diff --git a/pkg/client/filebeat/types.go b/pkg/client/filebeat/types.go deleted file mode 100644 index 319e57a..0000000 --- a/pkg/client/filebeat/types.go +++ /dev/null @@ -1,91 +0,0 @@ -package filebeat - -import ( - "fmt" - log "github.com/sirupsen/logrus" - "text/template" - "time" -) - -// Global variables for Filebeat -const ( - FilebeatBaseConf = "/usr/share/filebeat" - FilebeatExecCmd = FilebeatBaseConf + "/filebeat" - FilebeatRegistry = FilebeatBaseConf + "/data/registry/filebeat/log.json" - FilebeatConfDir = FilebeatBaseConf + "/inputs.d" - FilebeatConfFile = FilebeatBaseConf + "/filebeat.yml" - - DockerSystemPath = "/var/lib/docker/" - KubeletSystemPath = "/var/lib/kubelet/" - - EnvFilebeatOutput = "FILEBEAT_OUTPUT" -) - -// FilebeatPointer for filebeat plugin -type FilebeatPointer struct { - name string - Tmpl *template.Template - baseDir string - watchDone chan bool - watchDuration time.Duration - watchContainer map[string]string -} - -// Config contains all log paths -type Config struct { - Paths []string `config:"paths"` -} - -// FileInode identify a unique log file -type FileInode struct { - Inode uint64 `json:"inode,"` - Device uint64 `json:"device,"` -} - -// RegistryState represents log offsets -type RegistryState struct { - K string `json:"k"` - V RegistryV `json:"v"` -} - -type RegistryV struct { - Source string `json:"source"` - Offset int64 `json:"offset"` - Timestamp []time.Time `json:"timestamp"` - TTL time.Duration `json:"ttl"` - Type string `json:"type"` - FileStateOS FileInode -} - -// GetConfPath returns log configuration path -func (p *FilebeatPointer) GetConfPath(container string) string { - return fmt.Sprintf("%s/%s.yml", FilebeatConfDir, container) -} - -// GetConfHome returns configuration directory -func (p *FilebeatPointer) GetConfHome() string { - return FilebeatConfDir -} - -// Name returns plugin name -func (p *FilebeatPointer) Name() string { - return p.name -} - -// OnDestroyEvent watching destroy event -func (p *FilebeatPointer) OnDestroyEvent(container string) error { - return p.feed(container) -} - -// GetBaseConf returns plugin root directory -func (p *FilebeatPointer) GetBaseConf() string { - return FilebeatBaseConf -} - -func (p *FilebeatPointer) feed(containerID string) error { - if _, ok := p.watchContainer[containerID]; !ok { - p.watchContainer[containerID] = containerID - log.Infof("begin to watch log config: %s.yml", containerID) - } - return nil -} diff --git a/pkg/controller/container.go b/pkg/controller/container.go deleted file mode 100644 index 14e4a5b..0000000 --- a/pkg/controller/container.go +++ /dev/null @@ -1,223 +0,0 @@ -package controller - -import ( - "context" - "encoding/json" - "fmt" - "github.com/containerd/containerd/containers" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" - "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/oci" - "github.com/opencontainers/runtime-spec/specs-go" - log "github.com/sirupsen/logrus" - "io" - "io/ioutil" - "os" - "path/filepath" - "regexp" - "strings" - "watchlog/log/config" - "watchlog/pkg/ctx" - "watchlog/pkg/runtime" - "watchlog/pkg/runtime/container" -) - -const ( - ContainerLogPath = "/var/log/pods" -) - -var ( - create = "containerd.events.ContainerCreate" - delete = "containerd.events.ContainerDelete" -) - -type RuntimeContainer struct { - ctx *ctx.Context -} - -type ContainerInterface interface { - ProcessContainers() error - NewContainer(meta containers.Container, process *specs.Process) error -} - -func NewContainerInterface(ctx *ctx.Context) ContainerInterface { - return &RuntimeContainer{ - ctx: ctx, - } -} - -func (c RuntimeContainer) ProcessContainers() error { - c.ctx.Mutex.Lock() - defer c.ctx.Mutex.Unlock() - - containerCtx := namespaces.WithNamespace(c.ctx.Context, "k8s.io") - c.ProcessEvent(c.ctx, containerCtx) - - containers, err := c.ctx.ContainerCli.Containers(containerCtx) - if err != nil { - log.Errorf("get containers failed, %s", err.Error()) - return err - } - - for _, v := range containers { - meta, err := v.Info(containerCtx) - if err != nil { - log.Errorf("get container meta info failed: %s", err.Error()) - } - - spec, err := v.Spec(containerCtx) - if err != nil { - log.Errorf("get container spec failed: %v", err) - continue - } - - var b bool - for _, envVar := range spec.Process.Env { - serviceLogs := fmt.Sprintf(c.ctx.LogPrefix) - if !strings.HasPrefix(envVar, serviceLogs) { - b = false - continue - } - - b = true - break - } - - if b { - err = c.NewContainer(meta, spec.Process) - if err != nil { - return err - } - } - - } - - return nil -} - -func (c RuntimeContainer) NewContainer(meta containers.Container, process *specs.Process) error { - id := meta.ID - env := process.Env - labels := meta.Labels - logFile := meta.Labels[runtime.KubernetesContainerNamespace] + "_" + meta.Labels[runtime.KubernetesPodName] + "_*" + "/" + meta.Labels[runtime.KubernetesContainerName] + "/" + "*.log" - ct := container.Container(meta) - - for _, envVar := range env { - // log_test 转换为 log.test - envLabel := strings.SplitN(envVar, "=", 2) - if len(envLabel) == 2 { - labelKey := strings.Replace(envLabel[0], "_", ".", -1) - labels[labelKey] = envLabel[1] - } - } - - logPath := filepath.Join(c.ctx.BaseDir, ContainerLogPath, logFile) - configs, err := config.GetLogConfigs(c.ctx.LogPrefix, logPath, labels) - if err != nil { - log.Errorf("%v", err.Error()) - return err - } - - if len(configs) == 0 { - log.Debugf("%s has not log config, skip", id) - return nil - } - // 生成 filebeat 采集配置 - logConfig, err := c.ctx.Piloter.RenderLogConfig(id, ct, configs) - if err != nil { - log.Errorf("%v", err.Error()) - return err - } - - if err = ioutil.WriteFile(c.ctx.Piloter.GetConfPath(id), []byte(logConfig), os.FileMode(0644)); err != nil { - log.Errorf("%v", err.Error()) - return err - } - - //c.TryReload() - return nil - -} - -func (c RuntimeContainer) ProcessEvent(ctx *ctx.Context, containerCtx context.Context) { - msgs, errs := ctx.ContainerCli.EventService().Subscribe(containerCtx, "") - - go func() { - defer func() { - log.Warn("finish to watch event") - ctx.StopChan <- true - }() - - log.Info("begin to watch event") - - for { - select { - case msg := <-msgs: - if err := c.processEvent(ctx, containerCtx, msg); err != nil { - log.Errorf("fail to process event: %v, %v", msg, err) - } - case err := <-errs: - log.Warnf("error: %v", err) - if err == io.EOF || err == io.ErrUnexpectedEOF { - return - } - msgs, errs = ctx.ContainerCli.EventService().Subscribe(containerCtx, "") - } - } - }() -} - -func (c RuntimeContainer) processEvent(ctx *ctx.Context, containerCtx context.Context, msg *events.Envelope) error { - v := string(msg.Event.GetValue()) - s := strings.TrimPrefix(v, "\n@") - containerId := removeSpecialChars(s) - containerId = strings.Split(containerId, "-")[0] - - t := msg.Event.GetTypeUrl() - switch t { - case create: - if Exists(ctx, containerId) { - log.Debugf("%s is already exists", containerId) - return nil - } - - _, err := ctx.ContainerCli.LoadContainer(containerCtx, containerId) - if err != nil { - if errdefs.IsNotFound(err) { - _, err = ctx.ContainerCli.LoadContainer(containerCtx, containerId) - } - - return err - } - - get, err := ctx.ContainerCli.ContainerService().Get(containerCtx, containerId) - if err != nil { - return err - } - - var spec oci.Spec - err = json.Unmarshal(get.Spec.GetValue(), &spec) - if err != nil { - return err - } - - return c.NewContainer(get, spec.Process) - - case delete: - log.Debugf("Process container destroy event: %s", containerId) - - err := DelContainerLogFile(ctx, containerId) - if err != nil { - log.Errorf("Process container destroy event error: %s, %s", containerId, err.Error()) - } - - } - - return nil -} - -func removeSpecialChars(str string) string { - re := regexp.MustCompile(`[^a-zA-Z0-9]+`) - return re.ReplaceAllString(str, "-") -} diff --git a/pkg/controller/docker.go b/pkg/controller/docker.go deleted file mode 100644 index caf704c..0000000 --- a/pkg/controller/docker.go +++ /dev/null @@ -1,184 +0,0 @@ -package controller - -import ( - "fmt" - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/events" - "github.com/docker/docker/api/types/filters" - log "github.com/sirupsen/logrus" - "io" - "io/ioutil" - "os" - "path/filepath" - "strings" - logtypes "watchlog/log/config" - "watchlog/pkg/ctx" - "watchlog/pkg/runtime/docker" -) - -const ( - EnvServiceLogsTmpl = "%s_" -) - -type RuntimeDocker struct { - ctx *ctx.Context - f filters.Args -} - -type DockerInterface interface { - ProcessContainers(ctx *ctx.Context) error - NewContainer(containerJSON *types.ContainerJSON) error -} - -func NewDockerInterface(ctx *ctx.Context, f filters.Args) DockerInterface { - return &RuntimeDocker{ - ctx: ctx, - f: f, - } -} - -func (d RuntimeDocker) ProcessContainers(ctx *ctx.Context) error { - log.Debug("process all container log config") - ctx.Mutex.Lock() - defer ctx.Mutex.Unlock() - - d.ProcessEvent(d.f) - - opts := types.ContainerListOptions{} - containers, err := ctx.Client.ContainerList(ctx.Context, opts) - if err != nil { - log.Errorf("fail to list container: %v", err) - return err - } - - for _, c := range containers { - if c.State == "removing" { - continue - } - - if Exists(ctx, c.ID) { - log.Debugf("%s is already exists", c.ID) - continue - } - - containerJSON, err := ctx.Client.ContainerInspect(ctx.Context, c.ID) - if err != nil { - log.Errorf("fail to inspect container %s: %v", c.ID, err) - continue - } - - if err = d.NewContainer(&containerJSON); err != nil { - log.Errorf("fail to process container %s: %v", containerJSON.Name, err) - } - } - - return nil -} - -func (d RuntimeDocker) NewContainer(containerJSON *types.ContainerJSON) error { - id := containerJSON.ID - env := containerJSON.Config.Env - labels := containerJSON.Config.Labels - jsonLogPath := containerJSON.LogPath - ct := docker.Container(containerJSON) - - for _, e := range env { - serviceLogs := fmt.Sprintf(EnvServiceLogsTmpl, d.ctx.LogPrefix) - if !strings.HasPrefix(e, serviceLogs) { - continue - } - - // log_test 转换为 log.test - envLabel := strings.SplitN(e, "=", 2) - if len(envLabel) == 2 { - labelKey := strings.Replace(envLabel[0], "_", ".", -1) - labels[labelKey] = envLabel[1] - } - } - - logPath := filepath.Join(d.ctx.BaseDir, jsonLogPath) - logConfigs, err := logtypes.GetLogConfigs(d.ctx.LogPrefix, logPath, labels) - if err != nil { - return err - } - - if len(logConfigs) == 0 { - log.Debugf("%s has not log config, skip", id) - return nil - } - - //生成 filebeat 采集配置 - logConfig, err := d.ctx.Piloter.RenderLogConfig(id, ct, logConfigs) - if err != nil { - return err - } - //TODO validate config before save - if err = ioutil.WriteFile(d.ctx.Piloter.GetConfPath(id), []byte(logConfig), os.FileMode(0644)); err != nil { - return err - } - - //c.TryReload() - return nil -} - -func (d RuntimeDocker) ProcessEvent(filter filters.Args) { - options := types.EventsOptions{ - Filters: filter, - } - - msgs, errs := d.ctx.Client.Events(d.ctx.Context, options) - - go func() { - defer func() { - log.Warn("finish to watch event") - d.ctx.StopChan <- true - }() - - log.Info("begin to watch event") - - for { - select { - case msg := <-msgs: - if err := d.processEvent(d.ctx, msg); err != nil { - log.Errorf("fail to process event: %v, %v", msg, err) - } - case err := <-errs: - log.Warnf("error: %v", err) - if err == io.EOF || err == io.ErrUnexpectedEOF { - return - } - msgs, errs = d.ctx.Client.Events(d.ctx.Context, options) - } - } - }() -} - -func (d RuntimeDocker) processEvent(ctx *ctx.Context, msg events.Message) error { - containerId := msg.Actor.ID - - switch msg.Action { - case "start", "restart": - log.Debugf("Process container start event: %s", containerId) - - if Exists(ctx, containerId) { - log.Debugf("%s is already exists", containerId) - return nil - } - - containerJSON, err := ctx.Client.ContainerInspect(ctx.Context, containerId) - if err != nil { - return err - } - - return d.NewContainer(&containerJSON) - case "destroy", "die": - log.Debugf("Process container destroy event: %s", containerId) - - err := DelContainerLogFile(ctx, containerId) - if err != nil { - log.Warnf("Process container destroy event error: %s, %s", containerId, err.Error()) - } - } - - return nil -} diff --git a/pkg/controller/process.go b/pkg/controller/process.go deleted file mode 100644 index 9dad4e9..0000000 --- a/pkg/controller/process.go +++ /dev/null @@ -1,27 +0,0 @@ -package controller - -import ( - log "github.com/sirupsen/logrus" - "os" - "watchlog/pkg/ctx" -) - -// Exists 判断采集容器日志的配置是否存在 -func Exists(ctx *ctx.Context, containId string) bool { - if _, err := os.Stat(ctx.Piloter.GetConfPath(containId)); os.IsNotExist(err) { - return false - } - return true -} - -// DelContainerLogFile 销毁采集容器日志文件 -func DelContainerLogFile(ctx *ctx.Context, id string) error { - log.Infof("Try removing log config %s", id) - if err := os.Remove(ctx.Piloter.GetConfPath(id)); err != nil { - log.Warnf("removing %s log config failure", id) - return err - } - - //ctx.TryReload() - return ctx.Piloter.OnDestroyEvent(id) -} diff --git a/pkg/ctx/ctx.go b/pkg/ctx/ctx.go index 0c22a03..4198d14 100644 --- a/pkg/ctx/ctx.go +++ b/pkg/ctx/ctx.go @@ -5,43 +5,40 @@ import ( "github.com/containerd/containerd" "github.com/docker/docker/client" "os" - "watchlog/pkg/client/filebeat" - "watchlog/pkg/runtime/container" - "watchlog/pkg/runtime/docker" - "watchlog/pkg/types" + "sync" + "watchlog/pkg/provider" + "watchlog/pkg/runtime" ) type Context struct { - types.Pilot + context.Context + // 采集器 + FilebeatPointer provider.FilebeatPointer + // 日志前缀 + LogPrefix string + BaseDir string + DockerCli *client.Client + ContainerdCli *containerd.Client + sync.Mutex } -func NewContext(baseDir string, p filebeat.InterFilebeatPointer) *Context { +func NewContext(baseDir, logPrefix string, f provider.FilebeatPointer) *Context { dockerCli := new(client.Client) containerCli := new(containerd.Client) switch os.Getenv("RUNTIME_TYPE") { case "docker": - dockerCli = docker.NewClient() + dockerCli = runtime.NewDockerClient() case "containerd": - containerCli = container.NewClient() - } - - logPrefix := "watchlog" - lp := os.Getenv("LOG_PREFIX") - if len(lp) > 1 { - logPrefix = lp + containerCli = runtime.NewContainerClient() } return &Context{ - types.Pilot{ - Context: context.Background(), - Client: dockerCli, - ContainerCli: containerCli, - BaseDir: baseDir, - ReloadChan: make(chan bool), - StopChan: make(chan bool), - Piloter: p, - LogPrefix: logPrefix, - }, + Context: context.Background(), + FilebeatPointer: f, + LogPrefix: logPrefix, + BaseDir: baseDir, + DockerCli: dockerCli, + ContainerdCli: containerCli, } } diff --git a/pkg/provider/filebeat.go b/pkg/provider/filebeat.go new file mode 100644 index 0000000..d483b34 --- /dev/null +++ b/pkg/provider/filebeat.go @@ -0,0 +1,196 @@ +package provider + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "github.com/elastic/go-ucfg" + "github.com/elastic/go-ucfg/yaml" + "github.com/zeromicro/go-zero/core/logc" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "strings" + "text/template" + logtypes "watchlog/log/config" +) + +// FilebeatPointer Filebeat 插件 +type FilebeatPointer struct { + cmd *exec.Cmd + Name string + Tmpl *template.Template + BaseDir string +} + +func NewFilebeatPointer(Tmpl *template.Template, BaseDir string) FilebeatPointer { + return FilebeatPointer{ + Name: "Filebeat", + Tmpl: Tmpl, + BaseDir: BaseDir, + } +} + +// Start 启动采集器 +func (f FilebeatPointer) Start() error { + if f.cmd != nil { + pid := f.cmd.Process.Pid + return fmt.Errorf("Filebeat process is exists, PID: %d", pid) + } + + f.cmd = exec.Command(FilebeatExecCmd, "-c", FilebeatConfFile) + f.cmd.Stderr = os.Stderr + f.cmd.Stdout = os.Stdout + err := f.cmd.Start() + if err != nil { + logc.Errorf(context.Background(), "Filebeat start fail: %s", err) + } + + go func() { + logc.Infof(context.Background(), "Starting Filebeat pid: %v", f.cmd.Process.Pid) + err := f.cmd.Wait() + if err != nil { + logc.Errorf(context.Background(), "Filebeat exited: %v", err) + if exitError, ok := err.(*exec.ExitError); ok { + processState := exitError.ProcessState + logc.Errorf(context.Background(), "Filebeat exited pid: %v", processState.Pid()) + } + } + + // try to restart filebeat + logc.Debugf(context.Background(), "Filebeat exited and try to restart") + f.cmd = nil + err = f.cmd.Start() + if err != nil { + return + } + }() + + return err +} + +// GetRegistryState 获取 filebeat 仓库中容器日志的基本信息 +func (f FilebeatPointer) GetRegistryState() (map[string]RegistryState, error) { + file, err := os.Open(FilebeatRegistry) + if err != nil { + return nil, err + } + defer file.Close() + + decoder := json.NewDecoder(file) + var state RegistryState + err = decoder.Decode(&state) + if err != nil { + return nil, err + } + + statesMap := make(map[string]RegistryState, 0) + if _, ok := statesMap[state.V.Source]; !ok { + statesMap[state.V.Source] = state + } + + return statesMap, nil +} + +// LoadConfigPaths 加载容器config, path +func (f FilebeatPointer) LoadConfigPaths() map[string]string { + paths := make(map[string]string, 0) + // 读取 inputs.d 目录下所有配置 + confs, _ := ioutil.ReadDir(FilebeatConfDir) + for _, conf := range confs { + // get file name + container := strings.TrimRight(conf.Name(), ".yml") + config, err := f.ParseConfig(container) + if err != nil || config == nil { + continue + } + + for _, path := range config.Paths { + if _, ok := paths[path]; !ok { + paths[path] = container + } + } + } + return paths +} + +type Config struct { + Paths []string `config:"paths"` +} + +var configOpts = []ucfg.Option{ + ucfg.PathSep("."), + ucfg.ResolveEnv, + ucfg.VarExp, +} + +// ParseConfig 解析容器信息配置,获取path信息 +func (f FilebeatPointer) ParseConfig(container string) (*Config, error) { + // get config full path, /etc/filebeat/inputs.d/*.yml + confPath := f.GetConfPath(container) + c, err := yaml.NewConfigWithFile(confPath, configOpts...) + if err != nil { + logc.Errorf(context.Background(), "read %s.yml log config error: %v", container, err) + return nil, err + } + + var config Config + if err := c.Unpack(&config); err != nil { + logc.Errorf(context.Background(), "parse %s.yml log config error: %v", container, err) + return nil, err + } + return &config, nil +} + +// RenderLogConfig 生成日志采集配置文件 +func (f FilebeatPointer) RenderLogConfig(containerId string, container map[string]string, configList []logtypes.LogConfig) (string, error) { + for _, config := range configList { + logc.Infof(context.Background(), "logs: %s = %v", containerId, config) + } + + var buf bytes.Buffer + m := map[string]interface{}{ + "containerId": containerId, + "configList": configList, + "container": container, + "output": "FILEBEAT_OUTPUT", + } + if err := f.Tmpl.Execute(&buf, m); err != nil { + return "", err + } + + return buf.String(), nil +} + +// CleanConfigs 清理旧配置 +func (f FilebeatPointer) CleanConfigs() error { + confDir := f.GetConfHome() + d, err := os.Open(confDir) + if err != nil { + return err + } + defer d.Close() + + // 获取目录下所有数据, 包括目录和文件 + names, err := d.Readdirnames(-1) + if err != nil { + return err + } + + for _, name := range names { + conf := filepath.Join(confDir, name) + stat, err := os.Stat(filepath.Join(confDir, name)) + if err != nil { + return err + } + // 是否为普通文件 + if stat.Mode().IsRegular() { + if err := os.Remove(conf); err != nil { + return err + } + } + } + return nil +} diff --git a/pkg/provider/filebeat_types.go b/pkg/provider/filebeat_types.go new file mode 100644 index 0000000..e99ca1d --- /dev/null +++ b/pkg/provider/filebeat_types.go @@ -0,0 +1,49 @@ +package provider + +import ( + "fmt" + "time" +) + +// RegistryState represents log offsets +type RegistryState struct { + K string `json:"k"` + V RegistryV `json:"v"` +} + +type RegistryV struct { + Source string `json:"source"` + Offset int64 `json:"offset"` + Timestamp []time.Time `json:"timestamp"` + TTL time.Duration `json:"ttl"` + Type string `json:"type"` + FileStateOS FileInode +} + +type FileInode struct { + Inode uint64 `json:"inode,"` + Device uint64 `json:"device,"` +} + +const ( + FilebeatBaseConf = "/usr/share/filebeat" + FilebeatExecCmd = FilebeatBaseConf + "/filebeat" + FilebeatConfFile = FilebeatBaseConf + "/filebeat.yml" + FilebeatConfDir = FilebeatBaseConf + "/inputs.d" + FilebeatRegistry = FilebeatBaseConf + "/data/registry/filebeat/log.json" +) + +// GetConfPath get configuration path FilebeatConfDir/${container}.yaml +func (f FilebeatPointer) GetConfPath(container string) string { + return fmt.Sprintf("%s/%s.yml", FilebeatConfDir, container) +} + +// GetBaseConf returns plugin root directory +func (f FilebeatPointer) GetBaseConf() string { + return FilebeatBaseConf +} + +// GetConfHome returns configuration directory +func (f FilebeatPointer) GetConfHome() string { + return FilebeatConfDir +} diff --git a/pkg/runtime/container.go b/pkg/runtime/container.go new file mode 100644 index 0000000..b326863 --- /dev/null +++ b/pkg/runtime/container.go @@ -0,0 +1,17 @@ +package runtime + +import ( + "fmt" + "github.com/containerd/containerd" +) + +const sock = "/run/containerd/containerd.sock" + +func NewContainerClient() *containerd.Client { + cli, err := containerd.New(sock) + if err != nil { + panic(fmt.Sprintf("Error: Create container client failed, %s", err.Error())) + } + + return cli +} diff --git a/pkg/runtime/container/c.go b/pkg/runtime/container/c.go deleted file mode 100644 index a8e1d57..0000000 --- a/pkg/runtime/container/c.go +++ /dev/null @@ -1,37 +0,0 @@ -package container - -import ( - "fmt" - "github.com/containerd/containerd" - "github.com/containerd/containerd/containers" - "os" - "watchlog/pkg/runtime" -) - -const sock = "/run/containerd/containerd.sock" - -func NewClient() *containerd.Client { - cli, err := containerd.New(sock) - if err != nil { - panic(fmt.Sprintf("Error: Create container client failed, %s", err.Error())) - } - - return cli -} - -func Container(meta containers.Container) map[string]string { - labels := meta.Labels - c := make(map[string]string) - putIfNotEmpty(c, "k8s_pod", labels[runtime.KubernetesPodName]) - putIfNotEmpty(c, "k8s_pod_namespace", labels[runtime.KubernetesContainerNamespace]) - putIfNotEmpty(c, "k8s_container_name", labels[runtime.KubernetesContainerName]) - putIfNotEmpty(c, "k8s_node_name", os.Getenv("NODE_NAME")) - return c -} - -func putIfNotEmpty(store map[string]string, key, value string) { - if key == "" || value == "" { - return - } - store[key] = value -} diff --git a/pkg/runtime/docker.go b/pkg/runtime/docker.go new file mode 100644 index 0000000..1fbeedd --- /dev/null +++ b/pkg/runtime/docker.go @@ -0,0 +1,15 @@ +package runtime + +import ( + "fmt" + docker "github.com/docker/docker/client" +) + +func NewDockerClient() *docker.Client { + cli, err := docker.NewEnvClient() + if err != nil { + panic(fmt.Sprintf("Error: Create docker client failed, %s", err.Error())) + } + + return cli +} diff --git a/pkg/runtime/docker/c.go b/pkg/runtime/docker/c.go deleted file mode 100644 index 137328e..0000000 --- a/pkg/runtime/docker/c.go +++ /dev/null @@ -1,37 +0,0 @@ -package docker - -import ( - "fmt" - "github.com/docker/docker/api/types" - docker "github.com/docker/docker/client" - "os" - "strings" - "watchlog/pkg/runtime" -) - -func NewClient() *docker.Client { - cli, err := docker.NewEnvClient() - if err != nil { - panic(fmt.Sprintf("Error: Create docker client failed, %s", err.Error())) - } - - return cli -} - -func Container(containerJSON *types.ContainerJSON) map[string]string { - labels := containerJSON.Config.Labels - c := make(map[string]string) - putIfNotEmpty(c, "k8s_pod", labels[runtime.KubernetesPodName]) - putIfNotEmpty(c, "k8s_pod_namespace", labels[runtime.KubernetesContainerNamespace]) - putIfNotEmpty(c, "k8s_container_name", labels[runtime.KubernetesContainerName]) - putIfNotEmpty(c, "k8s_node_name", os.Getenv("NODE_NAME")) - putIfNotEmpty(c, "docker_container", strings.TrimPrefix(containerJSON.Name, "/")) - return c -} - -func putIfNotEmpty(store map[string]string, key, value string) { - if key == "" || value == "" { - return - } - store[key] = value -} diff --git a/pkg/runtime/types.go b/pkg/runtime/types.go index 484a9ba..d8edfa0 100644 --- a/pkg/runtime/types.go +++ b/pkg/runtime/types.go @@ -1,7 +1,25 @@ package runtime +import "os" + const ( KubernetesPodName = "io.kubernetes.pod.name" KubernetesContainerName = "io.kubernetes.container.name" KubernetesContainerNamespace = "io.kubernetes.pod.namespace" ) + +func putIfNotEmpty(store map[string]string, key, value string) { + if key == "" || value == "" { + return + } + store[key] = value +} + +func BuildContainerLabels(labels map[string]string) map[string]string { + c := make(map[string]string) + putIfNotEmpty(c, "k8s_pod", labels[KubernetesPodName]) + putIfNotEmpty(c, "k8s_pod_namespace", labels[KubernetesContainerNamespace]) + putIfNotEmpty(c, "k8s_container_name", labels[KubernetesContainerName]) + putIfNotEmpty(c, "k8s_node_name", os.Getenv("NODE_NAME")) + return c +} diff --git a/pkg/util/fotmat.go b/pkg/tools/format.go similarity index 99% rename from pkg/util/fotmat.go rename to pkg/tools/format.go index d07da93..6950a63 100644 --- a/pkg/util/fotmat.go +++ b/pkg/tools/format.go @@ -1,4 +1,4 @@ -package util +package tools import ( "fmt" @@ -30,7 +30,6 @@ type SimpleConverter struct { } func init() { - simpleConverter := func(properties []string) FormatConverter { return func(info *nodeInfo.LogInfoNode) (map[string]string, error) { validProperties := make(map[string]bool) diff --git a/pkg/util/utils.go b/pkg/tools/utils.go similarity index 95% rename from pkg/util/utils.go rename to pkg/tools/utils.go index c6ec3e0..8c67a60 100644 --- a/pkg/util/utils.go +++ b/pkg/tools/utils.go @@ -1,4 +1,4 @@ -package util +package tools import ( "io/ioutil" diff --git a/pkg/types/types.go b/pkg/types/types.go deleted file mode 100644 index ca948d2..0000000 --- a/pkg/types/types.go +++ /dev/null @@ -1,95 +0,0 @@ -package types - -import ( - "context" - "fmt" - "github.com/containerd/containerd" - docker "github.com/docker/docker/client" - "os" - "path/filepath" - "sync" - "time" - "watchlog/pkg/client/filebeat" -) - -// Pilot entry point -type Pilot struct { - Context context.Context - Piloter filebeat.InterFilebeatPointer - Mutex sync.Mutex - Client *docker.Client - ContainerCli *containerd.Client - LastReload time.Time - ReloadChan chan bool - StopChan chan bool - BaseDir string - LogPrefix string - CreateSymlink bool -} - -//func (p *Pilot) DoReload() { -// log.Info("Reload goroutine is ready") -// for { -// <-p.ReloadChan -// err := p.Reload() -// if err != nil { -// log.Errorf(err.Error()) -// return -// } -// } -//} - -//func (p *Pilot) Reload() error { -// p.Mutex.Lock() -// defer p.Mutex.Unlock() -// -// log.Infof("Reload %s", p.Piloter.Name()) -// interval := time.Now().Sub(p.LastReload) -// time.Sleep(30*time.Second - interval) -// -// log.Info("Start reloading") -// err := p.Piloter.Reload() -// p.LastReload = time.Now() -// return err -//} - -//func (p *Pilot) TryReload() { -// select { -// case p.ReloadChan <- true: -// default: -// log.Info("Another load is pending") -// } -//} - -func (p *Pilot) CleanConfigs() error { - p.Mutex.Lock() - defer p.Mutex.Unlock() - - confDir := fmt.Sprintf(p.Piloter.GetConfHome()) - d, err := os.Open(confDir) - if err != nil { - return err - } - defer d.Close() - - // 获取目录下所有数据, 包括目录和文件 - names, err := d.Readdirnames(-1) - if err != nil { - return err - } - - for _, name := range names { - conf := filepath.Join(confDir, name) - stat, err := os.Stat(filepath.Join(confDir, name)) - if err != nil { - return err - } - // 是否为普通文件 - if stat.Mode().IsRegular() { - if err := os.Remove(conf); err != nil { - return err - } - } - } - return nil -}