Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNF-13067: O-RAN V3 Rest Api: Status Notification #71

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.0
github.com/prometheus/client_golang v1.14.0
github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9
github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.8.0
golang.org/x/net v0.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9 h1:qDOGSHOtHRszd8FnM0GZVUvbIvHhZrw5GeccXYPwT04=
github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM=
github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612 h1:TnnP33wqdtZ4GCp8WYHVFVywWxrcGonc0ijGCpfqTdU=
github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
Expand Down
2 changes: 1 addition & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestMain(m *testing.M) {
Subject: func(s string) *string { return &s }("topic"),
}.AsV1(),
}
_ = e.SetData(cloudevents.ApplicationJSON, cneEvent)
_ = e.SetData("", cneEvent)
func() {
defer func() {
if err := recover(); err != nil {
Expand Down
72 changes: 36 additions & 36 deletions v2/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
return
}
sub := pubsub.PubSub{}
sub.SetVersion(API_VERSION)
if err = json.Unmarshal(bodyBytes, &sub); err != nil {
respondWithStatusCode(w, http.StatusBadRequest, fmt.Sprintf("marshalling error %v", err))
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
Expand All @@ -67,26 +68,18 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
return
}
if subExists, ok := s.pubSubAPI.HasSubscription(sub.GetResource()); ok {
clientIDs := s.subscriberAPI.GetClientIDByResource(sub.GetResource())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Plural clientID's?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be one clientID for the resource

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returns clientIDs []uuid.UUID. I did not change the logic.
https://github.com/redhat-cne/sdk-go/blob/main/v1/subscriber/subscriber.go#L254

if len(clientIDs) != 0 {
respondWithStatusCode(w, http.StatusConflict,
fmt.Sprintf("subscription (id: %s) with same resource already exists, skipping creation",
subExists.GetID()))
fmt.Sprintf("subscription (clientID: %s) with same resource already exists, skipping creation",
clientIDs[0]))
return
}

id := uuid.New().String()
sub.SetID(id)
sub.SetVersion(API_VERSION)
sub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck

// TODO: cleanup: local pubsub is no longer needed since we are using configMap
newSub, err := s.pubSubAPI.CreateSubscription(sub)
if err != nil {
respondWithStatusCode(w, http.StatusNotFound, fmt.Sprintf("error creating subscription %v", err))
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
return
}
addr := newSub.GetResource()
sub.SetURILocation(fmt.Sprintf("http://%s:%d%s%s/%s", s.apiHost, s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck
addr := sub.GetResource()

// this is placeholder not sending back to report
out := channel.DataChan{
Expand Down Expand Up @@ -119,7 +112,8 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
}

restClient := restclient.New()
out.Data.SetID(newSub.ID) // set ID to the subscriptionID
// make sure event ID is unique
out.Data.SetID(uuid.New().String())
status, err := restClient.PostCloudEvent(sub.EndPointURI, *out.Data)
if err != nil {
respondWithStatusCode(w, http.StatusBadRequest,
Expand All @@ -139,7 +133,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
subs := subscriber.New(s.getClientIDFromURI(endPointURI))
_ = subs.SetEndPointURI(endPointURI)

subs.AddSubscription(newSub)
subs.AddSubscription(sub)
subs.Action = channel.NEW
cevent, _ := subs.CreateCloudEvents()
cevent.SetSource(addr)
Expand All @@ -162,10 +156,10 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
} else {
out.Status = channel.SUCCESS
_ = out.Data.SetData(cloudevents.ApplicationJSON, updatedObj)
_ = out.Data.SetData("", updatedObj)
log.Infof("subscription created successfully.")
localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, 1)
respondWithJSON(w, http.StatusCreated, newSub)
respondWithJSON(w, http.StatusCreated, sub)
}

s.dataOut <- &out
Expand All @@ -188,6 +182,7 @@ func (s *Server) createPublisher(w http.ResponseWriter, r *http.Request) {
return
}
pub := pubsub.PubSub{}
pub.SetVersion(API_VERSION)
if err = json.Unmarshal(bodyBytes, &pub); err != nil {
localmetrics.UpdatePublisherCount(localmetrics.FAILCREATE, 1)
respondWithError(w, "marshalling error")
Expand Down Expand Up @@ -256,12 +251,15 @@ func (s *Server) getSubscriptionByID(w http.ResponseWriter, r *http.Request) {
respondWithStatusCode(w, http.StatusNotFound, "")
return
}
sub, err := s.pubSubAPI.GetSubscription(subscriptionID)
if err != nil {
respondWithStatusCode(w, http.StatusNotFound, "")
return

for _, c := range s.subscriberAPI.GetClientIDBySubID(subscriptionID) {
sub, err := s.subscriberAPI.GetSubscription(c, subscriptionID)
if err == nil {
respondWithJSON(w, http.StatusOK, sub)
return
}
}
respondWithJSON(w, http.StatusOK, sub)
respondWithStatusCode(w, http.StatusNotFound, "")
}

func (s *Server) getPublisherByID(w http.ResponseWriter, r *http.Request) {
Expand All @@ -278,9 +276,11 @@ func (s *Server) getPublisherByID(w http.ResponseWriter, r *http.Request) {
}
respondWithJSON(w, http.StatusOK, pub)
}

func (s *Server) getSubscriptions(w http.ResponseWriter, _ *http.Request) {
b, err := s.pubSubAPI.GetSubscriptionsFromFile()
b, err := s.subscriberAPI.GetSubscriptions()
if err != nil {
log.Errorf("error loading subscriber data %v", err)
respondWithError(w, "error loading subscriber data")
return
}
Expand Down Expand Up @@ -322,21 +322,21 @@ func (s *Server) deleteSubscription(w http.ResponseWriter, r *http.Request) {
return
}

if err := s.pubSubAPI.DeleteSubscription(subscriptionID); err != nil {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILDELETE, 1)
respondWithStatusCode(w, http.StatusNotFound, err.Error())
clientIDs := s.subscriberAPI.GetClientIDBySubID(subscriptionID)
if len(clientIDs) == 0 {
respondWithStatusCode(w, http.StatusNotFound, "")
return
}

// update configMap
for _, c := range s.subscriberAPI.GetClientIDBySubID(subscriptionID) {
for _, c := range clientIDs {
if err := s.subscriberAPI.DeleteSubscription(c, subscriptionID); err != nil {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILDELETE, 1)
respondWithStatusCode(w, http.StatusNotFound, err.Error())
return
}
}

// update configMap
for _, subs := range s.subscriberAPI.SubscriberStore.Store {
cevent, _ := subs.CreateCloudEvents()
out := channel.DataChan{
Expand Down Expand Up @@ -400,7 +400,7 @@ func (s *Server) publishEvent(w http.ResponseWriter, r *http.Request) {
respondWithError(w, fmt.Sprintf("no publisher data for id %s found to publish event for", cneEvent.ID))
return
}
ceEvent, err := cneEvent.NewCloudEvent(&pub)
ceEvent, err := cneEvent.NewCloudEventV2(&pub)
if err != nil {
localmetrics.UpdateEventPublishedCount(pub.Resource, localmetrics.FAIL, 1)
respondWithError(w, err.Error())
Expand Down Expand Up @@ -441,12 +441,12 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) {
}
}
} else {
respondWithError(w, "subscription not found")
respondWithStatusCode(w, http.StatusNotFound, "subscription not found")
return
}

if sub == nil {
respondWithError(w, "subscription not found")
respondWithStatusCode(w, http.StatusNotFound, "subscription not found")
return
}

Expand All @@ -471,14 +471,14 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) {
// statusReceiveOverrideFn must return value for
if s.statusReceiveOverrideFn != nil {
if statusErr := s.statusReceiveOverrideFn(*e, &out); statusErr != nil {
respondWithError(w, statusErr.Error())
respondWithStatusCode(w, http.StatusNotFound, statusErr.Error())
} else if out.Data != nil {
respondWithJSON(w, http.StatusOK, *out.Data)
} else {
respondWithError(w, "event not found")
respondWithStatusCode(w, http.StatusNotFound, "event not found")
}
} else {
respondWithError(w, "onReceive function not defined")
respondWithStatusCode(w, http.StatusNotFound, "onReceive function not defined")
}
}

Expand All @@ -504,7 +504,7 @@ func (s *Server) pingForSubscribedEventStatus(w http.ResponseWriter, r *http.Req

Version: "v1",
})
ceEvent, err := cneEvent.NewCloudEvent(&sub)
ceEvent, err := cneEvent.NewCloudEventV2(&sub)

if err != nil {
respondWithError(w, err.Error())
Expand Down
4 changes: 3 additions & 1 deletion v2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const (
// Server defines rest routes server object
type Server struct {
port int
apiHost string
apiPath string
//use dataOut chanel to write to configMap
dataOut chan<- *channel.DataChan
Expand Down Expand Up @@ -138,12 +139,13 @@ type swaggReqAccepted struct { //nolint:deadcode,unused
}

// InitServer is used to supply configurations for rest routes server
func InitServer(port int, apiPath, storePath string,
func InitServer(port int, apiHost, apiPath, storePath string,
dataOut chan<- *channel.DataChan, closeCh <-chan struct{},
onStatusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error) *Server {
once.Do(func() {
ServerInstance = &Server{
port: port,
apiHost: apiHost,
apiPath: apiPath,
dataOut: dataOut,
closeCh: closeCh,
Expand Down
11 changes: 6 additions & 5 deletions v2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
types2 "github.com/cloudevents/sdk-go/v2/types"
"github.com/google/uuid"

restapi "github.com/redhat-cne/rest-api"
restapi "github.com/redhat-cne/rest-api/v2"
"github.com/redhat-cne/sdk-go/pkg/channel"
"github.com/redhat-cne/sdk-go/pkg/event"
"github.com/redhat-cne/sdk-go/pkg/event/ptp"
Expand All @@ -50,6 +50,7 @@ var (
closeCh chan struct{}
wg sync.WaitGroup
port = 8989
apHost = "localhost"
apPath = "/routes/cne/v1/"
resource = "test/test"
resourceNoneSubscribed = "test/nonesubscribed"
Expand All @@ -64,7 +65,7 @@ func init() {
}

func TestMain(m *testing.M) {
server = restapi.InitServer(port, apPath, storePath, eventOutCh, closeCh)
server = restapi.InitServer(port, apHost, apPath, storePath, eventOutCh, closeCh, nil)
//start http server
server.Start()

Expand Down Expand Up @@ -100,7 +101,7 @@ func TestMain(m *testing.M) {
Subject: func(s string) *string { return &s }("topic"),
}.AsV1(),
}
_ = e.SetData(cloudevents.ApplicationJSON, cneEvent)
_ = e.SetData("", cneEvent)
func() {
defer func() {
if err := recover(); err != nil {
Expand Down Expand Up @@ -362,7 +363,7 @@ func TestServer_GetCurrentState_withoutSubscription(t *testing.T) {
s, err2 := io.ReadAll(resp.Body)
assert.Nil(t, err2)
log.Infof("tedt %s ", string(s))
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

func TestServer_TestPingStatusStatusCode(t *testing.T) {
Expand Down Expand Up @@ -424,7 +425,7 @@ func TestServer_GetNonExistingSubscription(t *testing.T) {
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

func TestServer_TestDummyStatusCode(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions vendor/github.com/redhat-cne/sdk-go/pkg/common/common.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions vendor/github.com/redhat-cne/sdk-go/pkg/event/event.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/resource.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/syncstate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading