Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Predict functionality #92

Open
wants to merge 17 commits into
base: develoment
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion netops_demo/Makefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
NETOPS_TAG := $(if $(NETOPS_TAG),$(NETOPS_TAG),latest)
NETOPS_REGISTRY_URL := $(if $(NETOPS_REGISTRY_URL),$(NETOPS_REGISTRY_URL),iguaziodocker)
MODEL_FILE := $(if $(MODEL_FILE), $(MODEL_FILE),/tmp/model/netops.model)
zilbermanor marked this conversation as resolved.
Show resolved Hide resolved
MODEL_LOCATION_URL := $(if $(MODEL_LOCATION_URL), $(MODEL_LOCATION_URL),http://192.168.224.49:8081/1/netops.model)

.PHONY: default
default: golang py

.PHONY: py
py:
cd py && docker build . -t netops-demo-py:$(NETOPS_TAG)
cd py && docker build \
--tag netops-demo-py:$(NETOPS_TAG) \
--build-arg MODEL_FILE=$(MODEL_FILE) \
--build-arg MODEL_LOCATION_URL=$(MODEL_LOCATION_URL) \
.

.PHONY: golang
golang:
Expand Down
34 changes: 29 additions & 5 deletions netops_demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This demo generates network data and detects, predicts and visualizes anomalies.
3. Nuclio functions that generate and ingest historical and real-time metric samples into the Iguazio TSDB and Anodot
4. Grafana for visualization of metrics

Setting up an Iguazio system including Grafana and Nuclio is out of the scope of this document.
Setting up an Iguazio system including Grafana and Nuclio is out of the scope of this document.

## Deploying and running the demo

Expand All @@ -20,7 +20,7 @@ helm install v3io-demo/netops \
--set ingest.tsdb.path <tsdb path>
```

> Note: To ingest into Anodot, add `--set ingest.anodot.token <anodot token>` to the above.
> Note: To ingest into Anodot, add `--set ingest.anodot.token <anodot token>` to the above.

The demo is configured with defaults, as can be found in the values.yaml (#REF). You can download and modify these settings and pass `--values <values-file-path>` rather than the `--set` arguments above. The generator is configured through a Kubernetes configmap, so it comes up configured. All we need to do is start the generation, including a day of historical data:

Expand Down Expand Up @@ -75,6 +75,15 @@ nuctl deploy --run-image iguaziodocker/netops-demo-py:0.0.5 \
--triggers '{"periodic": {"kind": "cron", "workerAllocatorName": "defaultHTTPWorkerAllocator", "attributes": {"interval": "1s"}}}' \
--platform-config '{"attributes": {"network":"netops"}}' \
netops-demo-generate

nuctl deploy --run-image iguaziodocker/netops-demo-py:0.0.5 \
--runtime python:3.6 \
--handler functions.predict.predict:predict \
--readiness-timeout 10 \
--platform local \
--triggers '{"periodic": {"kind": "cron", "workerAllocatorName": "defaultHTTPWorkerAllocator", "attributes": {"interval": "1m"}}}' \
--platform-config '{"attributes": {"network":"netops"}}' \
netops-demo-predict
```

You can choose to follow the logs by running `docker logs -f default-<function name>`, for example:
Expand Down Expand Up @@ -157,6 +166,22 @@ We can now start the generation:
http localhost:<function port>/start
```

By default, the predict function is idling - waiting for configuration. Let's configure it by POSTing the following configuration to `/configure`:
```
echo '
{
'metrics': <metrics array>,
'tsdb': <tsdb server url:proemtheus port>,
'state': 'predicting',
zilbermanor marked this conversation as resolved.
Show resolved Hide resolved
'target': "function:netops-demo-ingest"
}
' | http localhost:<function port>/configure
```
We can now start predicting:

```sh
http localhost:<function port>/start
```
## Developing

### Python (Pycharm)
Expand Down Expand Up @@ -184,11 +209,10 @@ Modify the source code and build the images:
NETOPS_TAG=latest make
```

This will output `netops-demo-golang:latest` and `netops-demo-py:latest` using Nuclio's ability to [build function images from Dockerfiles](https://github.com/nuclio/nuclio/blob/master/docs/tasks/deploy-functions-from-dockerfile.md).
> The `golang` image contains the `ingest` and `query` functions. The `py` image contains the `generate` and `train` functions. By bunching together a few functions inside a single image we allow for easily sharing code without worrying about versioning, reducing the number of moving parts, etc.
This will output `netops-demo-golang:latest` and `netops-demo-py:latest` using Nuclio's ability to [build function images from Dockerfiles](https://github.com/nuclio/nuclio/blob/master/docs/tasks/deploy-functions-from-dockerfile.md).
> The `golang` image contains the `ingest` and `query` functions. The `py` image contains the `generate` and `train` functions. By bunching together a few functions inside a single image we allow for easily sharing code without worrying about versioning, reducing the number of moving parts, etc.

Push the images to your favorite Docker registry:
```
NETOPS_TAG=latest NETOPS_REGISTRY_URL=mydockerhubaccount make push
```

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are tons of binary / submodule files added in this PR - please check the changed files and make sure only required files are in

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions netops_demo/golang/src/github.com/nuclio/logger
Submodule logger added at ccc5ab
1 change: 1 addition & 0 deletions netops_demo/golang/src/github.com/nuclio/nuclio-sdk-go
Submodule nuclio-sdk-go added at f750b9
1 change: 1 addition & 0 deletions netops_demo/golang/src/github.com/v3io/demos/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ HEALTHCHECK --interval=1s --timeout=3s CMD /usr/local/bin/uhttpc --url http://12

# Run processor with configuration and platform configuration
CMD [ "processor", "--config", "/etc/nuclio/config/processor/processor.yaml", "--platform-config", "/etc/nuclio/config/platform/platform.yaml" ]

Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
package anodot

import (
"net/http"
"time"
"fmt"
"encoding/json"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/nuclio/logger"
)

type Metric struct {
Properties map[string]interface{} `json:"properties,omitempty"`
Tags map[string]interface{} `json:"tags,omitempty"`
Timestamp uint64 `json:"timestamp,omitempty"`
Value float64 `json:"value,omitempty"`
Tags map[string]interface{} `json:"tags,omitempty"`
Timestamp uint64 `json:"timestamp,omitempty"`
Value float64 `json:"value,omitempty"`
}

type Appender struct {
logger logger.Logger
httpClient *http.Client
logger logger.Logger
httpClient *http.Client
appendEndpoint string
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package ingest

import (
"encoding/json"
"os"
"sync"
"fmt"
"sort"
"golang.org/x/sync/errgroup"
"os"
"sort"
"strconv"
"sync"

"github.com/v3io/demos/functions/ingest/anodot"

Expand All @@ -29,13 +30,14 @@ type metricSamples struct {
}

type emitter struct {
Labels map[string]interface{} `json:"labels,omitempty"`
Labels map[string]interface{} `json:"labels,omitempty"`
Metrics map[string]*metricSamples `json:"metrics,omitempty"`
}

type userData struct {
tsdbAppender tsdb.Appender
anodotAppender *anodot.Appender
kvAppender *v3io.Container
}

func Ingest(context *nuclio.Context, event nuclio.Event) (interface{}, error) {
Expand Down Expand Up @@ -63,7 +65,6 @@ func Ingest(context *nuclio.Context, event nuclio.Event) (interface{}, error) {
}
}


return nil, nil
}

Expand All @@ -74,11 +75,13 @@ func InitContext(context *nuclio.Context) error {

// get configuration from env
tsdbAppenderPath := os.Getenv("INGEST_V3IO_TSDB_PATH")
kvAppenderPath := os.Getenv("INGEST_V3IO_KV_PATH")
anodotAppenderURL := os.Getenv("INGEST_ANODOT_URL")
anodotAppenderToken := os.Getenv("INGEST_ANODOT_TOKEN")

context.Logger.InfoWith("Initializing",
"tsdbAppenderPath", tsdbAppenderPath,
"kvAppenderPath", kvAppenderPath,
"anodotAppenderURL", anodotAppenderURL,
"anodotAppenderToken", anodotAppenderToken)

Expand All @@ -100,6 +103,10 @@ func InitContext(context *nuclio.Context) error {
}
}

if kvAppenderPath != "" {
userData.kvAppender = context.DataBinding["db0"].(*v3io.Container)
}

// set user data into the context
context.UserData = &userData

Expand Down Expand Up @@ -176,6 +183,13 @@ func ingestMetricSamples(context *nuclio.Context,
})
}

// TODO: Ingest into KV
zilbermanor marked this conversation as resolved.
Show resolved Hide resolved
if userData.kvAppender != nil {
ingestErrGroup.Go(func() error {
return ingestMetricSamplesToKV(context, userData.kvAppender, emitterLabels, metricName, samples)
})
}

// wait and return composite error
return ingestErrGroup.Wait()
}
Expand All @@ -196,7 +210,7 @@ func ingestMetricSamplesToTSDB(context *nuclio.Context,
// iterate over label source and copy over
for labelKey, labelValue := range labelSource {
labels = append(labels, utils.Label{
Name: labelKey,
Name: labelKey,
Value: fmt.Sprintf("%v", labelValue),
})
}
Expand Down Expand Up @@ -227,6 +241,55 @@ func ingestMetricSamplesToTSDB(context *nuclio.Context,
return nil
}

func ingestMetricSamplesToKV(context *nuclio.Context,
kvContainer *v3io.Container,
emitterLabels map[string]interface{},
metricName string,
samples *metricSamples) error {

baseKVPath := os.Getenv("INGEST_V3IO_KV_PATH")
metricPath := baseKVPath + "/" + metricName
responseChannel := make(chan *v3io.Response, 1) // Only one response per PutItems

items := make(map[string]map[string]interface{}, len(samples.Timestamps))
for index, timestamp := range samples.Timestamps {
key := strconv.FormatInt(timestamp, 10) + "_" + emitterLabels["device"].(string)
items [key] = map[string]interface{}{
string("timestamp"): string(timestamp),
string("alert"): string(samples.Alerts[index]),
string("IsError"): int(samples.IsError[index]),
}
for k, v := range emitterLabels {
items [key][k] = v
}
}

// TODO: Why Emitter Labels doesn't hold device ID? Only company name / site
_, PutItemErr := kvContainer.PutItems(&v3io.PutItemsInput{
zilbermanor marked this conversation as resolved.
Show resolved Hide resolved
Path: metricPath,
Items: items},
context,
responseChannel)

if PutItemErr != nil {
context.Logger.ErrorWith("Failed to put kv alert", "err", PutItemErr)
}

resp := <-responseChannel
if resp.Error != nil {
context.Logger.ErrorWith("Failed to put kv alert", "err", resp.Error)
}

output := resp.Output.(*v3io.PutItemsOutput)
if !output.Success {
for key, error := range output.Errors {
context.Logger.ErrorWith("Failed to put kv alert: Key:", key, "Error:", error)
}
}

return nil
}

func ingestMetricSamplesToAnodot(context *nuclio.Context,
anodotAppender *anodot.Appender,
emitterLabels map[string]interface{},
Expand All @@ -241,8 +304,8 @@ func ingestMetricSamplesToAnodot(context *nuclio.Context,
for sampleIndex := 0; sampleIndex < len(samples.Timestamps); sampleIndex++ {
metrics = append(metrics, &anodot.Metric{
Properties: samples.Labels,
Timestamp: uint64(samples.Timestamps[sampleIndex]),
Value: samples.Values[sampleIndex],
Timestamp: uint64(samples.Timestamps[sampleIndex]),
Value: samples.Values[sampleIndex],
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func (suite *ingestSuite) TestIngestValid() {
"longitude": 51.4990232516124,
"latitute": -0.16853921733979035,
"company_name": "Wong Ltd",
"site_name": "Wong Ltd/0"
"site_name": "Wong Ltd/0",
"device": "Wong Ltd/0/0"
zilbermanor marked this conversation as resolved.
Show resolved Hide resolved
},
"metrics": {
"cpu_utilization": {
Expand Down Expand Up @@ -87,7 +88,8 @@ func (suite *ingestSuite) TestIngestValid() {
"longitude": 51.4990232516124,
"latitute": -0.16853921733979035,
"company_name": "Wong Ltd",
"site_name": "Wong Ltd/0"
"site_name": "Wong Ltd/0",
"device": "Wong Ltd/0/1"
},
"metrics": {
"cpu_utilization": {
Expand Down Expand Up @@ -135,7 +137,8 @@ func (suite *ingestSuite) TestIngestValid() {
"longitude": 51.4990232516124,
"latitute": -0.16853921733979035,
"company_name": "Wong Ltd",
"site_name": "Wong Ltd/0"
"site_name": "Wong Ltd/0",
"device": "Wong Ltd/0/2"
},
"metrics": {
"cpu_utilization": {
Expand Down Expand Up @@ -183,7 +186,8 @@ func (suite *ingestSuite) TestIngestValid() {
"longitude": 51.4990232516124,
"latitute": -0.16853921733979035,
"company_name": "Wong Ltd",
"site_name": "Wong Ltd/0"
"site_name": "Wong Ltd/0",
"device": "Wong Ltd/0/3"
},
"metrics": {
"cpu_utilization": {
Expand Down Expand Up @@ -231,7 +235,8 @@ func (suite *ingestSuite) TestIngestValid() {
"longitude": 51.4990232516124,
"latitute": -0.16853921733979035,
"company_name": "Wong Ltd",
"site_name": "Wong Ltd/0"
"site_name": "Wong Ltd/0",
"device": "Wong Ltd/0/4"
},
"metrics": {
"cpu_utilization": {
Expand Down Expand Up @@ -279,7 +284,8 @@ func (suite *ingestSuite) TestIngestValid() {
"longitude": 51.518019501496426,
"latitute": -0.09269391832986805,
"company_name": "Wong Ltd",
"site_name": "Wong Ltd/1"
"site_name": "Wong Ltd/1",
"device": "Wong Ltd/1/0"
},
"metrics": {
"cpu_utilization": {
Expand Down Expand Up @@ -327,7 +333,8 @@ func (suite *ingestSuite) TestIngestValid() {
"longitude": 51.518019501496426,
"latitute": -0.09269391832986805,
"company_name": "Wong Ltd",
"site_name": "Wong Ltd/1"
"site_name": "Wong Ltd/1",
"device": "Wong Ltd/1/1"
},
"metrics": {
"cpu_utilization": {
Expand Down Expand Up @@ -375,7 +382,8 @@ func (suite *ingestSuite) TestIngestValid() {
"longitude": 51.518019501496426,
"latitute": -0.09269391832986805,
"company_name": "Wong Ltd",
"site_name": "Wong Ltd/1"
"site_name": "Wong Ltd/1",
"device": "Wong Ltd/1/2"
},
"metrics": {
"cpu_utilization": {
Expand Down Expand Up @@ -423,7 +431,8 @@ func (suite *ingestSuite) TestIngestValid() {
"longitude": 51.518019501496426,
"latitute": -0.09269391832986805,
"company_name": "Wong Ltd",
"site_name": "Wong Ltd/1"
"site_name": "Wong Ltd/1",
"device": "Wong Ltd/1/3"
},
"metrics": {
"cpu_utilization": {
Expand Down Expand Up @@ -471,7 +480,8 @@ func (suite *ingestSuite) TestIngestValid() {
"longitude": 51.518019501496426,
"latitute": -0.09269391832986805,
"company_name": "Wong Ltd",
"site_name": "Wong Ltd/1"
"site_name": "Wong Ltd/1",
"device": "Wong Ltd/1/4"
},
"metrics": {
"cpu_utilization": {
Expand Down
Loading