Skip to content
This repository has been archived by the owner on Nov 25, 2024. It is now read-only.

[Federation] Send typing events #572

Merged
merged 11 commits into from
Aug 10, 2018
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumers

import (
"context"
"encoding/json"

"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/typingserver/dummy/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
"gopkg.in/Shopify/sarama.v1"
)

// OutputTypingEventConsumer consumes events that originate in typing server.
type OutputTypingEventConsumer struct {
consumer *common.ContinualConsumer
db *storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
}

// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from typing servers.
func NewOutputTypingEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store *storage.Database,
) *OutputTypingEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
c := &OutputTypingEventConsumer{
consumer: &consumer,
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
}
consumer.ProcessMessage = c.onMessage

return c
}

// Start consuming from typing servers
func (t *OutputTypingEventConsumer) Start() error {
return t.consumer.Start()
}

func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Extract the typing event from msg.
var ote api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &ote); err != nil {
// Skip this msg but continue processing messages.
log.WithError(err).Errorf("typingserver output log: message parse failed")
return nil
}

joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID())
if err != nil {
return err
}

names := make([]gomatrixserverlib.ServerName, len(joined))
for i := range joined {
names[i] = joined[i].ServerName
}

return t.queues.SendEvent(&ote.Event, t.ServerName, names)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@ func SetupFederationSenderComponent(

queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation)

consumer := consumers.NewOutputRoomEventConsumer(
rsConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, queues,
federationSenderDB, queryAPI,
)
if err = consumer.Start(); err != nil {
if err = rsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start room server consumer")
}

tsConsumer := consumers.NewOutputTypingEventConsumer(
base.Cfg, base.KafkaConsumer, queues, federationSenderDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ type destinationQueue struct {
origin gomatrixserverlib.ServerName
destination gomatrixserverlib.ServerName
// The running mutex protects running, sentCounter, lastTransactionIDs and
// pendingEvents.
// pendingEvents, pendingEDUs.
runningMutex sync.Mutex
running bool
sentCounter int
lastTransactionIDs []gomatrixserverlib.TransactionID
pendingEvents []*gomatrixserverlib.Event
pendingEDUs []*gomatrixserverlib.EDU
}

// Send event adds the event to the pending queue for the destination.
Expand All @@ -54,6 +55,19 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) {
}
}

// sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending event to that destination.
func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
oq.pendingEDUs = append(oq.pendingEDUs, ev)
if !oq.running {
oq.running = true
go oq.backgroundSend()
}
}

func (oq *destinationQueue) backgroundSend() {
for {
t := oq.next()
Expand Down Expand Up @@ -82,10 +96,12 @@ func (oq *destinationQueue) backgroundSend() {
func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
if len(oq.pendingEvents) == 0 {

if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 {
oq.running = false
return nil
}

var t gomatrixserverlib.Transaction
now := gomatrixserverlib.AsTimestamp(time.Now())
t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter))
Expand All @@ -96,11 +112,19 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
if t.PreviousIDs == nil {
t.PreviousIDs = []gomatrixserverlib.TransactionID{}
}

oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID}

for _, pdu := range oq.pendingEvents {
t.PDUs = append(t.PDUs, *pdu)
}
oq.pendingEvents = nil
oq.sentCounter += len(t.PDUs)

for _, edu := range oq.pendingEDUs {
t.EDUs = append(t.EDUs, *edu)
}
oq.pendingEDUs = nil

return &t
}
46 changes: 42 additions & 4 deletions src/github.com/matrix-org/dendrite/federationsender/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ func (oqs *OutgoingQueues) SendEvent(
destinations []gomatrixserverlib.ServerName,
) error {
if origin != oqs.origin {
// TODO: Support virtual hosting by allowing us to send events using
// different origin server names.
// For now assume we are always asked to send as the single server configured
// in the dendrite config.
// TODO: Support virtual hosting; gh issue #577.
return fmt.Errorf(
"sendevent: unexpected server to send as: got %q expected %q",
origin, oqs.origin,
Expand All @@ -76,8 +73,49 @@ func (oqs *OutgoingQueues) SendEvent(
}
oqs.queues[destination] = oq
}

oq.sendEvent(ev)
}

return nil
}

// SendEDU sends an EDU event to the destinations
func (oqs *OutgoingQueues) SendEDU(
ev *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,
destinations []gomatrixserverlib.ServerName,
) error {
if origin != oqs.origin {
// TODO: Support virtual hosting; gh issue #577.
return fmt.Errorf(
"sendevent: unexpected server to send as: got %q expected %q",
origin, oqs.origin,
)
}

// Remove our own server from the list of destinations.
destinations = filterDestinations(oqs.origin, destinations)

log.WithFields(log.Fields{
"destinations": destinations, "edu_type": ev.Type,
}).Info("Sending EDU event")

oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
for _, destination := range destinations {
oq := oqs.queues[destination]
if oq == nil {
oq = &destinationQueue{
origin: oqs.origin,
destination: destination,
client: oqs.client,
}
oqs.queues[destination] = oq
}

oq.sendEDU(ev)
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,22 @@ func (s *joinedHostsStatements) deleteJoinedHosts(
return err
}

func (s *joinedHostsStatements) selectJoinedHosts(
func (s *joinedHostsStatements) selectJoinedHostsWithTx(
ctx context.Context, txn *sql.Tx, roomID string,
) ([]types.JoinedHost, error) {
stmt := common.TxStmt(txn, s.selectJoinedHostsStmt)
return joinedHostsFromStmt(ctx, stmt, roomID)
}

func (s *joinedHostsStatements) selectJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error) {
return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID)
}

func joinedHostsFromStmt(
ctx context.Context, stmt *sql.Stmt, roomID string,
) ([]types.JoinedHost, error) {
rows, err := stmt.QueryContext(ctx, roomID)
if err != nil {
return nil, err
Expand All @@ -118,5 +130,6 @@ func (s *joinedHostsStatements) selectJoinedHosts(
ServerName: gomatrixserverlib.ServerName(serverName),
})
}

return result, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (d *Database) UpdateRoom(
}
}

joinedHosts, err = d.selectJoinedHosts(ctx, txn, roomID)
joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
if err != nil {
return err
}
Expand All @@ -110,3 +110,12 @@ func (d *Database) UpdateRoom(
})
return
}

// GetJoinedHosts returns the currently joined hosts for room,
// as known to federationserver.
// Returns an error if something goes wrong.
func (d *Database) GetJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID)
}
14 changes: 6 additions & 8 deletions src/github.com/matrix-org/dendrite/typingserver/api/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@ package api
type OutputTypingEvent struct {
// The Event for the typing edu event.
Event TypingEvent `json:"event"`
// Users typing in the room at the event.
TypingUsers []string `json:"typing_users"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand the OutputTypingEvent doc comment to explain why we're sending both a TypingEvent and list of currently typing users?

}

// TypingEvent represents a matrix edu event of type 'm.typing'.
type TypingEvent struct {
Type string `json:"type"`
RoomID string `json:"room_id"`
Content TypingEventContent `json:"content"`
}

// TypingEventContent for TypingEvent
type TypingEventContent struct {
UserIDs []string `json:"user_ids"`
Type string `json:"type"`
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Typing bool `json:"typing"`
}
23 changes: 23 additions & 0 deletions src/github.com/matrix-org/dendrite/typingserver/dummy/api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import "github.com/matrix-org/gomatrixserverlib"

// TODO: Remove this package after, typingserver/api is updated to contain a gomatrixserverlib.Event
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably move this TODO out of dummy package now that the plan is no longer to change to using Event


// OutputTypingEvent is an entry in typing server output kafka log.
type OutputTypingEvent struct {
// The Event for the typing edu event.
Event gomatrixserverlib.Event `json:"event"`
}
23 changes: 14 additions & 9 deletions src/github.com/matrix-org/dendrite/typingserver/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,29 @@ func (t *TypingServerInputAPI) InputTypingEvent(
t.Cache.RemoveUser(ite.UserID, ite.RoomID)
}

return t.sendUpdateForRoom(ite.RoomID)
return t.sendEvent(ite)
}

func (t *TypingServerInputAPI) sendUpdateForRoom(roomID string) error {
userIDs := t.Cache.GetTypingUsers(roomID)
event := &api.TypingEvent{
Type: gomatrixserverlib.MTyping,
RoomID: roomID,
Content: api.TypingEventContent{UserIDs: userIDs},
func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
userIDs := t.Cache.GetTypingUsers(ite.RoomID)
ev := &api.TypingEvent{
Type: gomatrixserverlib.MTyping,
RoomID: ite.RoomID,
UserID: ite.UserID,
}
eventJSON, err := json.Marshal(api.OutputTypingEvent{Event: *event})
ote := &api.OutputTypingEvent{
Event: *ev,
TypingUsers: userIDs,
}

eventJSON, err := json.Marshal(ote)
if err != nil {
return err
}

m := &sarama.ProducerMessage{
Topic: string(t.OutputTypingEventTopic),
Key: sarama.StringEncoder(roomID),
Key: sarama.StringEncoder(ite.RoomID),
Value: sarama.ByteEncoder(eventJSON),
}

Expand Down