Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Currentstatus rest api implementation #34

Merged
merged 2 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/prometheus/client_golang v1.11.0
github.com/redhat-cne/sdk-go v0.1.1-0.20220907191325-02b031866855
github.com/redhat-cne/sdk-go v0.1.1-0.20220930204039-4f4a81946028
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
golang.org/x/net v0.0.0-20210614182718-04defd469f4e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/redhat-cne/sdk-go v0.1.1-0.20220907191325-02b031866855 h1:xdRtqjKmyuUmZ+gYRy5rhmC65xEa6LlWY606swuPquY=
github.com/redhat-cne/sdk-go v0.1.1-0.20220907191325-02b031866855/go.mod h1:oiSLb2ub22f7hLZshVwzER4VgY8Q0gkdtD6j5bAbWVc=
github.com/redhat-cne/sdk-go v0.1.1-0.20220930204039-4f4a81946028 h1:VsY/6A7fMi5AfvluWmzHsUVDjgDZf1JTAlvUKITnNmU=
github.com/redhat-cne/sdk-go v0.1.1-0.20220930204039-4f4a81946028/go.mod h1:oiSLb2ub22f7hLZshVwzER4VgY8Q0gkdtD6j5bAbWVc=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down
100 changes: 92 additions & 8 deletions routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package restapi
import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/redhat-cne/sdk-go/pkg/types"
Expand All @@ -41,6 +42,11 @@ import (
"net/http"
)

const (
AMQ = "AMQ"
HTTP = "HTTP"
)

// createSubscription create subscription and send it to a channel that is shared by middleware to process
// Creates a new subscription .
// If subscription exists with same resource then existing subscription is returned .
Expand Down Expand Up @@ -306,7 +312,80 @@ func (s *Server) publishEvent(w http.ResponseWriter, r *http.Request) {
}
}

// pingForSubscribedEventStatus sends ping to the a listening address in the producer to fire all status as events
// getCurrentState get current status of the events that are subscribed to
func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) {
queries := mux.Vars(r)
resourceAddress, ok := queries["resourceAddress"]
if !ok {
respondWithError(w, "resourceAddress parameter not found")
return
}

//identify publisher or subscriber is asking for status
var sub *pubsub.PubSub
if len(s.pubSubAPI.GetSubscriptions()) > 0 {
for _, subscriptions := range s.pubSubAPI.GetSubscriptions() {
if strings.Contains(subscriptions.GetResource(), resourceAddress) {
sub = subscriptions
break
}
}
} else if len(s.pubSubAPI.GetPublishers()) > 0 {
for _, publishers := range s.pubSubAPI.GetPublishers() {
if strings.Contains(publishers.GetResource(), resourceAddress) {
sub = publishers
break
}
}
} else {
respondWithError(w, "subscription not found")
return
}

if sub == nil {
respondWithError(w, "subscription not found")
return
}
cneEvent := event.CloudNativeEvent()
cneEvent.SetID(sub.ID)
cneEvent.Type = channel.STATUS.String()
cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
cneEvent.SetDataContentType(cloudevents.ApplicationJSON)
cneEvent.SetData(cne.Data{
Version: "v1",
})
ceEvent, err := cneEvent.NewCloudEvent(sub)

if err != nil {
respondWithError(w, err.Error())
} else {
// for http you send to the protocol address
statusChannel := make(chan *channel.StatusChan, 1)
if s.transportType == HTTP {
s.dataOut <- &channel.DataChan{
Type: channel.STATUS,
Data: ceEvent,
Address: sub.GetResource(),
StatusChan: statusChannel,
}
select {
case d := <-statusChannel:
if d.Data == nil {
respondWithError(w, "event not found")
} else {
respondWithJSON(w, http.StatusOK, *d.Data)
}
case <-time.After(5 * time.Second):
close(statusChannel)
respondWithError(w, "time Out waiting for status")
}
} else {
respondWithError(w, "Non HTTP protocol not supported")
}
}
}

// pingForSubscribedEventStatus sends ping to the listening address in the producer to fire all status as events
func (s *Server) pingForSubscribedEventStatus(w http.ResponseWriter, r *http.Request) {
queries := mux.Vars(r)
subscriptionID, ok := queries["subscriptionid"]
Expand All @@ -333,9 +412,10 @@ func (s *Server) pingForSubscribedEventStatus(w http.ResponseWriter, r *http.Req
respondWithError(w, err.Error())
} else {
s.dataOut <- &channel.DataChan{
Type: channel.STATUS,
Data: ceEvent,
Address: fmt.Sprintf("%s/%s", sub.GetResource(), "status"),
Type: channel.STATUS,
StatusChan: nil,
Data: ceEvent,
Address: fmt.Sprintf("%s/%s", sub.GetResource(), "status"),
}
respondWithMessage(w, http.StatusAccepted, "ping sent")
}
Expand Down Expand Up @@ -367,11 +447,15 @@ func respondWithError(w http.ResponseWriter, message string) {
}

func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
response, _ := json.Marshal(payload)
w.Header().Set("Content-Type", cloudevents.ApplicationJSON)
w.WriteHeader(code)
w.Write(response) //nolint:errcheck
if response, err := json.Marshal(payload); err == nil {
w.Header().Set("Content-Type", cloudevents.ApplicationJSON)
w.WriteHeader(code)
w.Write(response) //nolint:errcheck
} else {
respondWithMessage(w, http.StatusBadRequest, "error parsing event data")
}
}

func respondWithMessage(w http.ResponseWriter, code int, message string) {
w.Header().Set("Content-Type", cloudevents.ApplicationJSON)
respondWithJSON(w, code, map[string]string{"status": message})
Expand Down
31 changes: 19 additions & 12 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ type Server struct {
port int
apiPath string
//data out is amqp in channel
dataOut chan<- *channel.DataChan
closeCh <-chan struct{}
HTTPClient *http.Client
httpServer *http.Server
pubSubAPI *pubsubv1.API
status serverStatus
dataOut chan<- *channel.DataChan
closeCh <-chan struct{}
HTTPClient *http.Client
httpServer *http.Server
pubSubAPI *pubsubv1.API
status serverStatus
transportType string // AMQ or HTTP
transportAddress string // for AMQ this is base address to return events
}

// publisher/subscription data model
Expand Down Expand Up @@ -116,14 +118,16 @@ type swaggReqAccepted struct { //nolint:deadcode,unused
}

// InitServer is used to supply configurations for rest routes server
func InitServer(port int, apiPath, storePath string, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}) *Server {
func InitServer(port int, apiPath, storePath string, transportType string, transportAddress string, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}) *Server {
once.Do(func() {
ServerInstance = &Server{
port: port,
apiPath: apiPath,
dataOut: dataOut,
closeCh: closeCh,
status: notReady,
port: port,
apiPath: apiPath,
dataOut: dataOut,
closeCh: closeCh,
status: notReady,
transportType: transportType,
transportAddress: transportAddress,
HTTPClient: &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 20,
Expand Down Expand Up @@ -193,6 +197,7 @@ func (s *Server) Start() {
}
s.status = starting
r := mux.NewRouter()

api := r.PathPrefix(s.apiPath).Subrouter()

// createSubscription create subscription and send it to a channel that is shared by middleware to process
Expand Down Expand Up @@ -263,6 +268,8 @@ func (s *Server) Start() {
// "$ref": "#/responses/badReq"
api.HandleFunc("/subscriptions/status/{subscriptionid}", s.pingForSubscribedEventStatus).Methods(http.MethodPut)

api.HandleFunc("/{resourceAddress:.*}/CurrentState", s.getCurrentState).Methods(http.MethodGet)

api.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, "OK") //nolint:errcheck
}).Methods(http.MethodGet)
Expand Down
86 changes: 76 additions & 10 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
types2 "github.com/cloudevents/sdk-go/v2/types"
"github.com/google/uuid"

restapi "github.com/redhat-cne/rest-api"
"github.com/redhat-cne/sdk-go/pkg/channel"
"github.com/redhat-cne/sdk-go/pkg/event"
Expand All @@ -42,15 +46,17 @@ import (
var (
server *restapi.Server

eventOutCh chan *channel.DataChan
closeCh chan struct{}
wg sync.WaitGroup
port int = 8989
apPath string = "/routes/cne/v1/"
resource string = "test/test"
storePath string = "."
ObjSub pubsub.PubSub
ObjPub pubsub.PubSub
eventOutCh chan *channel.DataChan
closeCh chan struct{}
wg sync.WaitGroup
port int = 8989
apPath string = "/routes/cne/v1/"
resource string = "test/test"
storePath string = "."
ObjSub pubsub.PubSub
ObjPub pubsub.PubSub
transportType = "HTTP"
transportAddress = "localhost"
)

func init() {
Expand All @@ -59,13 +65,57 @@ func init() {
}

func TestMain(m *testing.M) {
server = restapi.InitServer(port, apPath, storePath, eventOutCh, closeCh)
server = restapi.InitServer(port, apPath, storePath, transportType, transportAddress, eventOutCh, closeCh)
//start http server
server.Start()

wg.Add(1)
go func() {
for d := range eventOutCh {
if d.Type == channel.STATUS && d.StatusChan != nil {
log.Info("WHY HERE")
clientID, _ := uuid.Parse("123e4567-e89b-12d3-a456-426614174001")
cneEvent := v1event.CloudNativeEvent()
cneEvent.SetID(ObjPub.ID)
cneEvent.Type = string(ptp.PtpStateChange)
cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
cneEvent.SetDataContentType(event.ApplicationJSON)
data := event.Data{
Version: "event",
Values: []event.DataValue{{
Resource: "test",
DataType: event.NOTIFICATION,
ValueType: event.ENUMERATION,
Value: ptp.ACQUIRING_SYNC,
},
},
}
data.SetVersion("v1") //nolint:errcheck
cneEvent.SetData(data)
e := cloudevents.Event{
Context: cloudevents.EventContextV1{
Type: string(ptp.PtpStateChange),
Source: cloudevents.URIRef{URL: url.URL{Scheme: "http", Host: "example.com", Path: "/source"}},
ID: "status event",
Time: &cloudevents.Timestamp{Time: time.Date(2020, 03, 21, 12, 34, 56, 780000000, time.UTC)},
DataSchema: &types2.URI{URL: url.URL{Scheme: "http", Host: "example.com", Path: "/schema"}},
Subject: func(s string) *string { return &s }("topic"),
}.AsV1(),
}
_ = e.SetData(cloudevents.ApplicationJSON, cneEvent)
func() {
defer func() {
if err := recover(); err != nil {
log.Errorf("error on clsoe channel")
}
}()
d.StatusChan <- &channel.StatusChan{
ID: "123",
ClientID: clientID,
Data: &e,
}
}()
}
log.Infof("incoming data %#v", d)
}
}()
Expand Down Expand Up @@ -231,6 +281,22 @@ func TestServer_ListPublishers(t *testing.T) {
assert.Greater(t, len(pubList), 0)
}

func TestServer_GetCurrentState(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
time.Sleep(2 * time.Second)
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, resource, "CurrentState"), nil)
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)
_, err = ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
}

func TestServer_TestPingStatusStatusCode(t *testing.T) {
req, err := http.NewRequest("PUT", fmt.Sprintf("http://localhost:%d%s%s%s", port, apPath, "subscriptions/status/", ObjSub.ID), nil)
assert.Nil(t, err)
Expand Down
10 changes: 10 additions & 0 deletions vendor/github.com/redhat-cne/sdk-go/pkg/channel/data.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ github.com/prometheus/common/model
github.com/prometheus/procfs
github.com/prometheus/procfs/internal/fs
github.com/prometheus/procfs/internal/util
# github.com/redhat-cne/sdk-go v0.1.1-0.20220907191325-02b031866855
# github.com/redhat-cne/sdk-go v0.1.1-0.20220930204039-4f4a81946028
## explicit; go 1.17
github.com/redhat-cne/sdk-go/pkg/channel
github.com/redhat-cne/sdk-go/pkg/event
Expand Down