Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add get, create & update audience endpoint #874

Merged
merged 14 commits into from
Feb 27, 2025
Merged
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ TAG := $(shell git rev-list --tags --max-count=1)
VERSION := $(shell git describe --tags ${TAG})
.PHONY: build check fmt lint test test-race vet test-cover-html help install proto ui compose-up-dev
.DEFAULT_GOAL := build
PROTON_COMMIT := "698f57c206bb5377f5aead5c509393ff92bf58ce"
PROTON_COMMIT := "59bb9149c419709169891c9ba72c81de3550f512"

ui:
@echo " > generating ui build"
Expand Down
5 changes: 5 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"syscall"
"time"

"github.com/raystack/frontier/core/audience"
"github.com/raystack/frontier/core/kyc"

"golang.org/x/exp/slices"
Expand Down Expand Up @@ -362,6 +363,9 @@ func buildAPIDependencies(
userRepository := postgres.NewUserRepository(dbc)
userService := user.NewService(userRepository, relationService, policyService, roleService)

audienceRepository := postgres.NewAudienceRepository(dbc)
audienceService := audience.NewService(audienceRepository)

svUserRepo := postgres.NewServiceUserRepository(dbc)
scUserCredRepo := postgres.NewServiceUserCredentialRepository(dbc)
serviceUserService := serviceuser.NewService(svUserRepo, scUserCredRepo, relationService)
Expand Down Expand Up @@ -543,6 +547,7 @@ func buildAPIDependencies(
LogListener: logListener,
WebhookService: webhookService,
EventService: eventProcessor,
AudienceService: audienceService,
}
return dependencies, nil
}
Expand Down
55 changes: 55 additions & 0 deletions core/audience/audience.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package audience

import (
"context"
"time"

"github.com/raystack/frontier/pkg/metadata"
)

type Status string

const (
Unsubscribed Status = "unsubscribed"
Subscribed Status = "subscribed"
)

func (s Status) String() string {
return string(s)
}

func StringToStatus(s string) Status {
switch s {
case "status_unsubscribed":
return Unsubscribed
case "status_subscribed":
return Subscribed
default:
return Unsubscribed
}
}

func (s Status) ToDB() Status {
return s
}

type Audience struct {
ID string
Name string
Email string
Phone string
Activity string
Status Status // subscription status
ChangedAt time.Time
Source string
Verified bool
CreatedAt time.Time
UpdatedAt time.Time
Metadata metadata.Metadata
}

type Repository interface {
Create(ctx context.Context, audience Audience) (Audience, error)
List(ctx context.Context, filter Filter) ([]Audience, error)
Update(ctx context.Context, audience Audience) (Audience, error)
}
9 changes: 9 additions & 0 deletions core/audience/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package audience

import "errors"

var (
ErrInvalidEmail = errors.New("invalid email")
ErrEmailActivityAlreadyExists = errors.New("email and activity combination already exists")
ErrNotExist = errors.New("audience does not exist")
)
6 changes: 6 additions & 0 deletions core/audience/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package audience

type Filter struct {
Activity string
Email string
}
45 changes: 45 additions & 0 deletions core/audience/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package audience

import (
"context"
"strings"
)

type Service struct {
repository Repository
}

func NewService(repository Repository) *Service {
return &Service{repository: repository}
}

func (s *Service) Create(ctx context.Context, audience Audience) (Audience, error) {
return s.repository.Create(ctx, Audience{
Name: audience.Name,
Email: strings.ToLower(audience.Email),
Verified: audience.Verified,
Phone: audience.Phone,
Activity: audience.Activity,
Status: audience.Status,
Source: audience.Source,
Metadata: audience.Metadata,
})
}

func (s *Service) List(ctx context.Context, filters Filter) ([]Audience, error) {
return s.repository.List(ctx, filters)
}

func (s *Service) Update(ctx context.Context, audience Audience) (Audience, error) {
return s.repository.Update(ctx, Audience{
ID: audience.ID,
Name: audience.Name,
Email: strings.ToLower(audience.Email),
Phone: audience.Phone,
Activity: audience.Activity,
Status: audience.Status,
Source: audience.Source,
Verified: audience.Verified,
Metadata: audience.Metadata,
})
}
3 changes: 3 additions & 0 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/raystack/frontier/billing/product"
"github.com/raystack/frontier/billing/subscription"
"github.com/raystack/frontier/billing/usage"
"github.com/raystack/frontier/core/audience"
"github.com/raystack/frontier/core/audit"
"github.com/raystack/frontier/core/authenticate"
"github.com/raystack/frontier/core/authenticate/session"
Expand Down Expand Up @@ -71,4 +72,6 @@ type Deps struct {
EventService *event.Service

LogListener *event.ChanListener

AudienceService *audience.Service
}
193 changes: 193 additions & 0 deletions internal/api/v1beta1/audience.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package v1beta1

import (
"context"
"errors"
"strings"

"github.com/raystack/frontier/core/audience"
"github.com/raystack/frontier/internal/bootstrap/schema"
"github.com/raystack/frontier/pkg/metadata"
frontierv1beta1 "github.com/raystack/frontier/proto/v1beta1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

var (
grpcUserTypeNotSupportedErr = status.Errorf(codes.InvalidArgument, "user type not supported")
grpcActivityRequiredErr = status.Errorf(codes.InvalidArgument, "activity is required")
grpcSourceRequiredErr = status.Errorf(codes.InvalidArgument, "source is required")
grpcAudienceIdRequiredErr = status.Errorf(codes.InvalidArgument, "audience ID is required")
grpcAudienceNotFoundErr = status.Errorf(codes.NotFound, "record not found for the given input")
)

type AudienceService interface {
Create(ctx context.Context, audience audience.Audience) (audience.Audience, error)
List(ctx context.Context, filter audience.Filter) ([]audience.Audience, error)
Update(ctx context.Context, audience audience.Audience) (audience.Audience, error)
}

func (h Handler) CreateEnrollmentForCurrentUser(ctx context.Context, request *frontierv1beta1.CreateEnrollmentForCurrentUserRequest) (*frontierv1beta1.CreateEnrollmentForCurrentUserResponse, error) {
principal, err := h.GetLoggedInPrincipal(ctx)
if err != nil {
return nil, err
}
if principal.Type != schema.UserPrincipal {
return nil, grpcUserTypeNotSupportedErr
}

activity := strings.TrimSpace(request.GetActivity())
if activity == "" {
return nil, grpcActivityRequiredErr
}
source := request.GetSource()
if source == "" {
return nil, grpcSourceRequiredErr
}

email := principal.User.Email
name := principal.User.Title
subsStatus := frontierv1beta1.Audience_Status_name[int32(request.GetStatus())] // convert using proto methods
metaDataMap := metadata.Build(request.GetMetadata().AsMap())

newAudience, err := h.audienceService.Create(ctx, audience.Audience{
Name: name,
Email: email,
Activity: activity,
Status: audience.StringToStatus(strings.ToLower(subsStatus)),
Verified: true, // if user is logged in on platform them we already would have already verified the email
Source: source,
Metadata: metaDataMap,
})
if err != nil {
switch {
case errors.Is(err, audience.ErrEmailActivityAlreadyExists):
return &frontierv1beta1.CreateEnrollmentForCurrentUserResponse{}, grpcConflictError
default:
return &frontierv1beta1.CreateEnrollmentForCurrentUserResponse{}, grpcInternalServerError
}
}

transformedAudience, err := transformAudienceToPB(newAudience)
if err != nil {
return nil, err
}
return &frontierv1beta1.CreateEnrollmentForCurrentUserResponse{Audience: transformedAudience}, nil
}

func (h Handler) ListEnrollmentsForCurrentUser(ctx context.Context, request *frontierv1beta1.ListEnrollmentsForCurrentUserRequest) (*frontierv1beta1.ListEnrollmentsForCurrentUserResponse, error) {
principal, err := h.GetLoggedInPrincipal(ctx)
if err != nil {
return nil, err
}
email := principal.User.Email
activity := request.GetActivity()

filters := audience.Filter{Activity: activity, Email: email}

enrollments, err := h.audienceService.List(ctx, filters)
if err != nil {
switch {
case errors.Is(err, audience.ErrNotExist):
return nil, grpcAudienceNotFoundErr
default:
return nil, grpcInternalServerError
}
}
var transformedAudiences []*frontierv1beta1.Audience
for _, enrollment := range enrollments {
transformedAudience, err := transformAudienceToPB(enrollment)
if err != nil {
return nil, err
}
transformedAudiences = append(transformedAudiences, transformedAudience)
}
return &frontierv1beta1.ListEnrollmentsForCurrentUserResponse{Audience: transformedAudiences}, nil
}

func (h Handler) UpdateEnrollmentForCurrentUser(ctx context.Context, request *frontierv1beta1.UpdateEnrollmentForCurrentUserRequest) (*frontierv1beta1.UpdateEnrollmentForCurrentUserResponse, error) {
principal, err := h.GetLoggedInPrincipal(ctx)
if err != nil {
return nil, err
}
if principal.Type != schema.UserPrincipal {
return nil, grpcUserTypeNotSupportedErr
}

audienceId := request.GetId()
if audienceId == "" {
return nil, grpcAudienceIdRequiredErr
}
activity := strings.TrimSpace(request.GetActivity())
if activity == "" {
return nil, grpcActivityRequiredErr
}
source := request.GetSource()
if source == "" {
return nil, grpcSourceRequiredErr
}

email := principal.User.Email
name := principal.User.Title
subsStatus := frontierv1beta1.Audience_Status_name[int32(request.GetStatus())] // convert using proto methods
metaDataMap := metadata.Build(request.GetMetadata().AsMap())

updatedAudience, err := h.audienceService.Update(ctx, audience.Audience{
ID: audienceId,
Name: name,
Email: email,
Verified: true,
Activity: activity,
Status: audience.StringToStatus(strings.ToLower(subsStatus)),
Source: source,
Metadata: metaDataMap,
})
if err != nil {
switch {
case errors.Is(err, audience.ErrNotExist):
return &frontierv1beta1.UpdateEnrollmentForCurrentUserResponse{}, grpcAudienceNotFoundErr
case errors.Is(err, audience.ErrEmailActivityAlreadyExists):
return &frontierv1beta1.UpdateEnrollmentForCurrentUserResponse{}, grpcConflictError
default:
return &frontierv1beta1.UpdateEnrollmentForCurrentUserResponse{}, grpcInternalServerError
}
}
transformedAudience, err := transformAudienceToPB(updatedAudience)
if err != nil {
return nil, err
}
return &frontierv1beta1.UpdateEnrollmentForCurrentUserResponse{Audience: transformedAudience}, nil
}

func convertStatusToPBFormat(status audience.Status) frontierv1beta1.Audience_Status {
switch status {
case audience.Unsubscribed:
return frontierv1beta1.Audience_STATUS_UNSUBSCRIBED
case audience.Subscribed:
return frontierv1beta1.Audience_STATUS_SUBSCRIBED
default:
return frontierv1beta1.Audience_STATUS_UNSUBSCRIBED
}
}

func transformAudienceToPB(audience audience.Audience) (*frontierv1beta1.Audience, error) {
metaData, err := audience.Metadata.ToStructPB()
if err != nil {
return &frontierv1beta1.Audience{}, err
}
return &frontierv1beta1.Audience{
Id: audience.ID,
Name: audience.Name,
Email: audience.Email,
Phone: audience.Phone,
Activity: audience.Activity,
Status: convertStatusToPBFormat(audience.Status),
ChangedAt: timestamppb.New(audience.ChangedAt),
Source: audience.Source,
Verified: audience.Verified,
CreatedAt: timestamppb.New(audience.CreatedAt),
UpdatedAt: timestamppb.New(audience.UpdatedAt),
Metadata: metaData,
}, nil
}
2 changes: 2 additions & 0 deletions internal/api/v1beta1/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ var (
ErrEmptyEmailID = errors.New("email id is empty")
ErrEmailConflict = errors.New("user email can't be updated")
ErrOperationUnsupported = errors.New("operation not supported")
ErrInternalServerError = errors.New("internal server error")

grpcConflictError = status.Errorf(codes.AlreadyExists, ErrConflictRequest.Error())
grpcBadBodyError = status.Error(codes.InvalidArgument, ErrBadRequest.Error())
grpcBadBodyMetaSchemaError = status.Error(codes.InvalidArgument, ErrBadRequest.Error()+" : "+ErrInvalidMetadata.Error())
grpcUnauthenticated = status.Error(codes.Unauthenticated, errors.ErrUnauthenticated.Error())
grpcPermissionDenied = status.Error(codes.PermissionDenied, errors.ErrForbidden.Error())
grpcOperationUnsupported = status.Error(codes.Unavailable, ErrOperationUnsupported.Error()) //nolint:unused
grpcInternalServerError = status.Errorf(codes.Internal, ErrInternalServerError.Error())
)

func ErrInvalidInput(err string) error {
Expand Down
2 changes: 2 additions & 0 deletions internal/api/v1beta1/v1beta1.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Handler struct {
invoiceService InvoiceService
webhookService WebhookService
eventService EventService
audienceService AudienceService
}

func Register(s *grpc.Server, deps api.Deps, authConf authenticate.Config) {
Expand Down Expand Up @@ -81,6 +82,7 @@ func Register(s *grpc.Server, deps api.Deps, authConf authenticate.Config) {
invoiceService: deps.InvoiceService,
webhookService: deps.WebhookService,
eventService: deps.EventService,
audienceService: deps.AudienceService,
}
s.RegisterService(&frontierv1beta1.FrontierService_ServiceDesc, handler)
s.RegisterService(&frontierv1beta1.AdminService_ServiceDesc, handler)
Expand Down
Loading
Loading