Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AliECS core Kafka producer #520

Merged
merged 43 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
f963751
[core] Refactor serverutil in preparation for eventstream
teo Sep 1, 2022
25fdadd
[core] Events protofile
teo Feb 15, 2024
ca1c759
[core] Additional events
teo Feb 15, 2024
0f92acb
[core] Update events.proto+o2control.proto with NewEnvironmentAsync
teo Feb 16, 2023
cf40d75
[common] Use events.proto in o2control.proto
teo Feb 20, 2024
8c5f41f
[coconut] Fix Protobuf generator call
teo Feb 21, 2024
34b7a0d
[core] Kafka wrapper
teo Feb 23, 2024
0a7c61b
[core] Emit environment events
teo Feb 29, 2024
e983b2c
[coconut] Add asynchronous mode (-y) to coconut env create command
teo Feb 16, 2023
fabece2
[coconut] Implement coconut env create -y
teo Feb 29, 2024
d13d12b
[core] Add task traits and CallEvent to events.proto
teo Feb 16, 2023
231d3d9
[common] Add CallEvent
teo Feb 29, 2024
aa4ac3f
[core] Emit call events to inform on plugin calls
teo Feb 16, 2023
95c41fe
[core] Send EnvId with TaskEvents
teo Feb 29, 2024
225a5f9
[core] Rename busEvent in task.go
teo Mar 8, 2024
48499fb
[core] Add IntegratedServiceEvent and rename Envid field
teo Mar 8, 2024
fe2dc90
[core] Push env vars on workflow load
teo Mar 13, 2024
7530576
[common] Allow event creation with specific timestamp
teo Mar 14, 2024
28c3e68
[common] Various additions to events in events.proto
teo Mar 14, 2024
6b7c8fd
[build] Bump dependencies
teo Mar 14, 2024
92d513f
[core] Include parent role path in task events
teo Mar 14, 2024
e82a40e
[core] Improve call information in CallEvents
teo Mar 14, 2024
dc190db
[core] Emit IntegratedServiceEvents from DCS
teo Mar 14, 2024
8c57a45
[core] Make sure we always output ECS detector codes, not DCS ones
teo Mar 14, 2024
274e73f
[core] Don't forget to include error in DCS ERROR events
teo Mar 14, 2024
b3e3461
[core] Better DCS event descriptions
teo Mar 15, 2024
27eec71
[core] Emit ddscheduler events
teo Mar 15, 2024
68b2c46
[core] Remove legacy ODC handlers
teo Mar 15, 2024
16ce849
[core] Emit ODC events
teo Mar 15, 2024
db62a9f
[core] Emit TRG events
teo Mar 18, 2024
ae22193
[common] Enable AllowAutoTopicCreation in Kafka client
teo Mar 19, 2024
831178d
[core] Correct Kafka topic
teo Mar 19, 2024
9c62431
[build] Generate fdset file for decoding Kafka messages with pq
teo Mar 19, 2024
d17a4a1
[core] Emit call events to aliecs.call topic and include envId
teo Mar 19, 2024
8695b2a
[core] Enable IntegratedServiceEvents
teo Mar 20, 2024
31eb36d
[core] Pass IntegratedServiceEvents by ref
teo Mar 20, 2024
e733190
[core] Write to Kafka asynchronously
teo Mar 21, 2024
6ffe6b6
[core] Nullify odc Devices list before emitting events
teo Mar 21, 2024
d8785f0
[core] Trim down ODC events some more
teo Mar 21, 2024
e3a392f
[core] Publish ODC partition state changes
teo Mar 21, 2024
e3d59c1
[core] Document events.proto and change currentRunNumber field
teo Mar 22, 2024
62dcce8
[core] Document currently unused topics
teo Mar 22, 2024
f7c2786
[docs] Document Kafka producer functionality
teo Mar 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[core] Refactor serverutil in preparation for eventstream
  • Loading branch information
teo committed Mar 19, 2024
commit f96375104baac3cbe287257f254309e9fc32e6d1
14 changes: 7 additions & 7 deletions core/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func NewServer(state *globalState) *grpc.Server {
s := grpc.NewServer()
grpc_health_v1.RegisterHealthServer(s, health.NewServer())
pb.RegisterControlServer(s, &RpcServer{
state: state,
streams: newSafeStreamsMap(),
state: state,
envStreams: newSafeStreamsMap(),
})
// Register reflection service on gRPC server.
reflection.Register(s)
Expand Down Expand Up @@ -112,8 +112,8 @@ func (m *RpcServer) logMethodHandled() {

// Implements interface pb.ControlServer
type RpcServer struct {
state *globalState
streams SafeStreamsMap
state *globalState
envStreams SafeStreamsMap
}

func (m *RpcServer) GetIntegratedServices(ctx context.Context, empty *pb.Empty) (*pb.ListIntegratedServicesReply, error) {
Expand Down Expand Up @@ -1064,14 +1064,14 @@ func (m *RpcServer) Subscribe(req *pb.SubscribeRequest, srv pb.Control_Subscribe
defer m.logMethodHandled()

for {
ch, ok := m.streams.GetChannel(req.GetId())
ch, ok := m.envStreams.GetChannel(req.GetId())
if !ok {
continue
}
select {
case event, ok := <-ch:
if !ok {
m.streams.delete(req.GetId())
m.envStreams.delete(req.GetId())
return nil
}
err := srv.Send(event)
Expand All @@ -1090,7 +1090,7 @@ func (m *RpcServer) NewAutoEnvironment(cxt context.Context, request *pb.NewAutoE
defer m.logMethodHandled()

ch := make(chan *pb.Event)
m.streams.add(request.GetId(), ch)
m.envStreams.add(request.GetId(), ch)
sub := environment.SubscribeToStream(ch)
go m.state.environments.CreateAutoEnvironment(request.GetWorkflowTemplate(), request.GetVars(), sub)
r := &pb.NewAutoEnvironmentReply{}
Expand Down
35 changes: 0 additions & 35 deletions core/serverutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package core

import (
"fmt"
"sync"
"unicode/utf8"

"github.com/AliceO2Group/Control/core/repos"
Expand Down Expand Up @@ -190,40 +189,6 @@ func workflowToRoleTree(root workflow.Role) (ri *pb.RoleInfo) {
return
}

// SafeStreamsMap is a safe map where the key is usually a
// subscriptionID received from the grpc call and as a value
// a channel where get events from the environment
// and we stream them to the grpc client.
type SafeStreamsMap struct {
mu sync.RWMutex
streams map[string]chan *pb.Event
}

func (s *SafeStreamsMap) add(id string, ch chan *pb.Event) {
s.mu.Lock()
s.streams[id] = ch
s.mu.Unlock()
}

func (s *SafeStreamsMap) delete(id string) {
s.mu.Lock()
delete(s.streams, id)
s.mu.Unlock()
}

func (s *SafeStreamsMap) GetChannel(id string) (ch chan *pb.Event, ok bool) {
s.mu.RLock()
defer s.mu.RUnlock()
ch, ok = s.streams[id]
return
}

func newSafeStreamsMap() SafeStreamsMap {
return SafeStreamsMap{
streams: make(map[string]chan *pb.Event),
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood correctly, you are just moving this somewhere else... is it connected to kafka producer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, streamutil.go. It's part of a general cleanup at the very start of this effort.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

func VarSpecMapToPbVarSpecMap(varSpecMap map[string]repos.VarSpec) map[string]*pb.VarSpecMessage {
ret := make(map[string]*pb.VarSpecMessage)
var vsm *pb.VarSpecMessage
Expand Down
66 changes: 66 additions & 0 deletions core/streamutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2020-2022 CERN and copyright holders of ALICE O².
* Author: Miltiadis Alexis <miltiadis.alexis@cern.ch>
* Teo Mrnjavac <teo.mrnjavac@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package core

import (
"sync"

pb "github.com/AliceO2Group/Control/core/protos"
)

// SafeStreamsMap is a safe map where the key is usually a
// subscriptionID received from the grpc call and as a value
// a channel where get events from the environment
// and we stream them to the grpc client.
type SafeStreamsMap struct {
mu sync.RWMutex
streams map[string]chan *pb.Event
}

func (s *SafeStreamsMap) add(id string, ch chan *pb.Event) {
s.mu.Lock()
s.streams[id] = ch
s.mu.Unlock()
}

func (s *SafeStreamsMap) delete(id string) {
s.mu.Lock()
delete(s.streams, id)
s.mu.Unlock()
}

func (s *SafeStreamsMap) GetChannel(id string) (ch chan *pb.Event, ok bool) {
s.mu.RLock()
defer s.mu.RUnlock()
ch, ok = s.streams[id]
return
}

func newSafeStreamsMap() SafeStreamsMap {
return SafeStreamsMap{
streams: make(map[string]chan *pb.Event),
}
}