From b500f2bfb675ff28be068bdeb23f87be279e9b56 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Thu, 2 Aug 2018 22:58:41 +0530 Subject: [PATCH 01/11] GetJoinedHosts from federation server db --- .../storage/joined_hosts_table.go | 15 ++++++++++++++- .../dendrite/federationsender/storage/storage.go | 11 ++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go b/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go index 487de9e61d..5d652a1a17 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go @@ -97,10 +97,22 @@ func (s *joinedHostsStatements) deleteJoinedHosts( return err } -func (s *joinedHostsStatements) selectJoinedHosts( +func (s *joinedHostsStatements) selectJoinedHostsWithTx( ctx context.Context, txn *sql.Tx, roomID string, ) ([]types.JoinedHost, error) { stmt := common.TxStmt(txn, s.selectJoinedHostsStmt) + return joinedHostsFromStmt(ctx, stmt, roomID) +} + +func (s *joinedHostsStatements) selectJoinedHosts( + ctx context.Context, roomID string, +) ([]types.JoinedHost, error) { + return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID) +} + +func joinedHostsFromStmt( + ctx context.Context, stmt *sql.Stmt, roomID string, +) ([]types.JoinedHost, error) { rows, err := stmt.QueryContext(ctx, roomID) if err != nil { return nil, err @@ -118,5 +130,6 @@ func (s *joinedHostsStatements) selectJoinedHosts( ServerName: gomatrixserverlib.ServerName(serverName), }) } + return result, nil } diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go index e84d639d0c..3a0f877521 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go @@ -92,7 +92,7 @@ func (d *Database) UpdateRoom( } } - joinedHosts, err = d.selectJoinedHosts(ctx, txn, roomID) + joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID) if err != nil { return err } @@ -110,3 +110,12 @@ func (d *Database) UpdateRoom( }) return } + +// GetJoinedHosts returns the currently joined hosts for room, +// as known to federationserver. +// Returns an error if something goes wrong. +func (d *Database) GetJoinedHosts( + ctx context.Context, roomID string, +) ([]types.JoinedHost, error) { + return d.selectJoinedHosts(ctx, roomID) +} From acbcf5c0c1c0aeac0b533964be072c7d022c8469 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Sat, 4 Aug 2018 18:58:05 +0530 Subject: [PATCH 02/11] Add dummy api.OutputTypingEvent --- .../dendrite/typingserver/dummy/api/api.go | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 src/github.com/matrix-org/dendrite/typingserver/dummy/api/api.go diff --git a/src/github.com/matrix-org/dendrite/typingserver/dummy/api/api.go b/src/github.com/matrix-org/dendrite/typingserver/dummy/api/api.go new file mode 100644 index 0000000000..3533a3a26b --- /dev/null +++ b/src/github.com/matrix-org/dendrite/typingserver/dummy/api/api.go @@ -0,0 +1,23 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import "github.com/matrix-org/gomatrixserverlib" + +// TODO: Remove this package after, typingserver/api is updated to contain a gomatrixserverlib.Event + +// OutputTypingEvent is an entry in typing server output kafka log. +type OutputTypingEvent struct { + // The Event for the typing edu event. + Event gomatrixserverlib.Event `json:"event"` +} From 7f6764aac6e158ef8bd31f806434e962bb754aa0 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Sat, 4 Aug 2018 18:58:21 +0530 Subject: [PATCH 03/11] Add a typing server consumer to federation sender --- .../consumers/typingserver.go | 85 +++++++++++++++++++ .../federationsender/federationsender.go | 9 +- 2 files changed, 92 insertions(+), 2 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go new file mode 100644 index 0000000000..20ba350b2a --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go @@ -0,0 +1,85 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/federationsender/queue" + "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/typingserver/dummy/api" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" + "gopkg.in/Shopify/sarama.v1" +) + +// OutputTypingEventConsumer consumes events that originate in typing server. +type OutputTypingEventConsumer struct { + consumer *common.ContinualConsumer + db *storage.Database + queues *queue.OutgoingQueues + ServerName gomatrixserverlib.ServerName +} + +// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from typing servers. +func NewOutputTypingEventConsumer( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + queues *queue.OutgoingQueues, + store *storage.Database, +) *OutputTypingEventConsumer { + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputTypingEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + } + c := &OutputTypingEventConsumer{ + consumer: &consumer, + queues: queues, + db: store, + ServerName: cfg.Matrix.ServerName, + } + consumer.ProcessMessage = c.onMessage + + return c +} + +// Start consuming from typing servers +func (t *OutputTypingEventConsumer) Start() error { + return t.consumer.Start() +} + +func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { + // Extract the typing event from msg. + var ote api.OutputTypingEvent + if err := json.Unmarshal(msg.Value, &ote); err != nil { + // Skip this msg but continue processing messages. + log.WithError(err).Errorf("typingserver output log: message parse failed") + return nil + } + + joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID()) + if err != nil { + return err + } + + names := make([]gomatrixserverlib.ServerName, len(joined)) + for i := range joined { + names[i] = joined[i].ServerName + } + + return t.queues.SendEvent(&ote.Event, t.ServerName, names) +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/federationsender.go b/src/github.com/matrix-org/dendrite/federationsender/federationsender.go index fa54a05c62..ed3a1a1f8d 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/federationsender.go +++ b/src/github.com/matrix-org/dendrite/federationsender/federationsender.go @@ -38,11 +38,16 @@ func SetupFederationSenderComponent( queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation) - consumer := consumers.NewOutputRoomEventConsumer( + rsConsumer := consumers.NewOutputRoomEventConsumer( base.Cfg, base.KafkaConsumer, queues, federationSenderDB, queryAPI, ) - if err = consumer.Start(); err != nil { + if err = rsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start room server consumer") } + + tsConsumer := consumers.NewOutputTypingEventConsumer(cfg, kafkaConsumer, queues, store) + if err := tsConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start typing server consumer") + } } From fe17eb5ed78a86813359add2fbc335b604adc3a5 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Sat, 4 Aug 2018 19:19:16 +0530 Subject: [PATCH 04/11] fix lint --- .../matrix-org/dendrite/federationsender/federationsender.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/github.com/matrix-org/dendrite/federationsender/federationsender.go b/src/github.com/matrix-org/dendrite/federationsender/federationsender.go index ed3a1a1f8d..9b732b3865 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/federationsender.go +++ b/src/github.com/matrix-org/dendrite/federationsender/federationsender.go @@ -46,7 +46,9 @@ func SetupFederationSenderComponent( logrus.WithError(err).Panic("failed to start room server consumer") } - tsConsumer := consumers.NewOutputTypingEventConsumer(cfg, kafkaConsumer, queues, store) + tsConsumer := consumers.NewOutputTypingEventConsumer( + base.Cfg, base.KafkaConsumer, queues, federationSenderDB, + ) if err := tsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start typing server consumer") } From 0bd298ad85387a9b4dabfb9f195734c2622d2b7e Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Tue, 7 Aug 2018 22:00:12 +0530 Subject: [PATCH 05/11] Update queue to support EDU events --- .../queue/destinationqueue.go | 28 ++++++++++- .../dendrite/federationsender/queue/queue.go | 46 +++++++++++++++++-- 2 files changed, 68 insertions(+), 6 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go index 2013a7a4be..026e3e51c3 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go @@ -33,12 +33,13 @@ type destinationQueue struct { origin gomatrixserverlib.ServerName destination gomatrixserverlib.ServerName // The running mutex protects running, sentCounter, lastTransactionIDs and - // pendingEvents. + // pendingEvents, pendingEDUs. runningMutex sync.Mutex running bool sentCounter int lastTransactionIDs []gomatrixserverlib.TransactionID pendingEvents []*gomatrixserverlib.Event + pendingEDUs []*gomatrixserverlib.EDU } // Send event adds the event to the pending queue for the destination. @@ -54,6 +55,19 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) { } } +// sendEDU adds the EDU event to the pending queue for the destination. +// If the queue is empty then it starts a background goroutine to +// start sending event to that destination. +func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) { + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() + oq.pendingEDUs = append(oq.pendingEDUs, ev) + if !oq.running { + oq.running = true + go oq.backgroundSend() + } +} + func (oq *destinationQueue) backgroundSend() { for { t := oq.next() @@ -82,10 +96,12 @@ func (oq *destinationQueue) backgroundSend() { func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() - if len(oq.pendingEvents) == 0 { + + if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 { oq.running = false return nil } + var t gomatrixserverlib.Transaction now := gomatrixserverlib.AsTimestamp(time.Now()) t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter)) @@ -96,11 +112,19 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { if t.PreviousIDs == nil { t.PreviousIDs = []gomatrixserverlib.TransactionID{} } + oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} + for _, pdu := range oq.pendingEvents { t.PDUs = append(t.PDUs, *pdu) } oq.pendingEvents = nil oq.sentCounter += len(t.PDUs) + + for _, edu := range oq.pendingEDUs { + t.EDUs = append(t.EDUs, *edu) + } + oq.pendingEDUs = nil + return &t } diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go index d31c12f998..8d910f6273 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go @@ -47,10 +47,7 @@ func (oqs *OutgoingQueues) SendEvent( destinations []gomatrixserverlib.ServerName, ) error { if origin != oqs.origin { - // TODO: Support virtual hosting by allowing us to send events using - // different origin server names. - // For now assume we are always asked to send as the single server configured - // in the dendrite config. + // TODO: Support virtual hosting; gh issue #577. return fmt.Errorf( "sendevent: unexpected server to send as: got %q expected %q", origin, oqs.origin, @@ -76,8 +73,49 @@ func (oqs *OutgoingQueues) SendEvent( } oqs.queues[destination] = oq } + oq.sendEvent(ev) } + + return nil +} + +// SendEDU sends an EDU event to the destinations +func (oqs *OutgoingQueues) SendEDU( + ev *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName, + destinations []gomatrixserverlib.ServerName, +) error { + if origin != oqs.origin { + // TODO: Support virtual hosting; gh issue #577. + return fmt.Errorf( + "sendevent: unexpected server to send as: got %q expected %q", + origin, oqs.origin, + ) + } + + // Remove our own server from the list of destinations. + destinations = filterDestinations(oqs.origin, destinations) + + log.WithFields(log.Fields{ + "destinations": destinations, "edu_type": ev.Type, + }).Info("Sending EDU event") + + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + for _, destination := range destinations { + oq := oqs.queues[destination] + if oq == nil { + oq = &destinationQueue{ + origin: oqs.origin, + destination: destination, + client: oqs.client, + } + oqs.queues[destination] = oq + } + + oq.sendEDU(ev) + } + return nil } From 7ae50fffd46f544b860aa1614d7d8c1ed7080282 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Tue, 7 Aug 2018 22:33:18 +0530 Subject: [PATCH 06/11] Update OutputTypingEvent format --- .../dendrite/typingserver/api/output.go | 14 +++++------ .../dendrite/typingserver/input/input.go | 23 +++++++++++-------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/typingserver/api/output.go b/src/github.com/matrix-org/dendrite/typingserver/api/output.go index 08f834993b..39b4b7d1ec 100644 --- a/src/github.com/matrix-org/dendrite/typingserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/typingserver/api/output.go @@ -16,16 +16,14 @@ package api type OutputTypingEvent struct { // The Event for the typing edu event. Event TypingEvent `json:"event"` + // Users typing in the room at the event. + TypingUsers []string `json:"typing_users"` } // TypingEvent represents a matrix edu event of type 'm.typing'. type TypingEvent struct { - Type string `json:"type"` - RoomID string `json:"room_id"` - Content TypingEventContent `json:"content"` -} - -// TypingEventContent for TypingEvent -type TypingEventContent struct { - UserIDs []string `json:"user_ids"` + Type string `json:"type"` + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + Typing bool `json:"typing"` } diff --git a/src/github.com/matrix-org/dendrite/typingserver/input/input.go b/src/github.com/matrix-org/dendrite/typingserver/input/input.go index 735c4da65f..b9968ce4cb 100644 --- a/src/github.com/matrix-org/dendrite/typingserver/input/input.go +++ b/src/github.com/matrix-org/dendrite/typingserver/input/input.go @@ -53,24 +53,29 @@ func (t *TypingServerInputAPI) InputTypingEvent( t.Cache.RemoveUser(ite.UserID, ite.RoomID) } - return t.sendUpdateForRoom(ite.RoomID) + return t.sendEvent(ite) } -func (t *TypingServerInputAPI) sendUpdateForRoom(roomID string) error { - userIDs := t.Cache.GetTypingUsers(roomID) - event := &api.TypingEvent{ - Type: gomatrixserverlib.MTyping, - RoomID: roomID, - Content: api.TypingEventContent{UserIDs: userIDs}, +func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { + userIDs := t.Cache.GetTypingUsers(ite.RoomID) + ev := &api.TypingEvent{ + Type: gomatrixserverlib.MTyping, + RoomID: ite.RoomID, + UserID: ite.UserID, } - eventJSON, err := json.Marshal(api.OutputTypingEvent{Event: *event}) + ote := &api.OutputTypingEvent{ + Event: *ev, + TypingUsers: userIDs, + } + + eventJSON, err := json.Marshal(ote) if err != nil { return err } m := &sarama.ProducerMessage{ Topic: string(t.OutputTypingEventTopic), - Key: sarama.StringEncoder(roomID), + Key: sarama.StringEncoder(ite.RoomID), Value: sarama.ByteEncoder(eventJSON), } From 3d6a862433a7ce24f7e2de3b59db19dab5026382 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Wed, 8 Aug 2018 01:13:02 +0530 Subject: [PATCH 07/11] Use SendEDU in federation server, remove dummy/api --- .../consumers/typingserver.go | 18 ++++++++++++--- .../dendrite/typingserver/dummy/api/api.go | 23 ------------------- 2 files changed, 15 insertions(+), 26 deletions(-) delete mode 100644 src/github.com/matrix-org/dendrite/typingserver/dummy/api/api.go diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go index 20ba350b2a..ddc926f0d9 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go @@ -20,7 +20,7 @@ import ( "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" - "github.com/matrix-org/dendrite/typingserver/dummy/api" + "github.com/matrix-org/dendrite/typingserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" "gopkg.in/Shopify/sarama.v1" @@ -71,7 +71,7 @@ func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error return nil } - joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID()) + joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID) if err != nil { return err } @@ -81,5 +81,17 @@ func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error names[i] = joined[i].ServerName } - return t.queues.SendEvent(&ote.Event, t.ServerName, names) + edu := &gomatrixserverlib.EDU{ + Type: ote.Event.Type, + Origin: string(t.ServerName), + } + if edu.Content, err = json.Marshal(map[string]interface{}{ + "room_id": ote.Event.RoomID, + "user_id": ote.Event.UserID, + "typing": ote.Event.Typing, + }); err != nil { + return err + } + + return t.queues.SendEDU(edu, t.ServerName, names) } diff --git a/src/github.com/matrix-org/dendrite/typingserver/dummy/api/api.go b/src/github.com/matrix-org/dendrite/typingserver/dummy/api/api.go deleted file mode 100644 index 3533a3a26b..0000000000 --- a/src/github.com/matrix-org/dendrite/typingserver/dummy/api/api.go +++ /dev/null @@ -1,23 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package api - -import "github.com/matrix-org/gomatrixserverlib" - -// TODO: Remove this package after, typingserver/api is updated to contain a gomatrixserverlib.Event - -// OutputTypingEvent is an entry in typing server output kafka log. -type OutputTypingEvent struct { - // The Event for the typing edu event. - Event gomatrixserverlib.Event `json:"event"` -} From 1de4050962bebee0ccd5aff7268b1f1e1c53dc00 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Wed, 8 Aug 2018 19:04:28 +0530 Subject: [PATCH 08/11] Add helpful comments --- .../dendrite/federationsender/consumers/typingserver.go | 2 ++ .../dendrite/federationsender/queue/destinationqueue.go | 4 ++-- .../matrix-org/dendrite/federationsender/queue/queue.go | 6 +++--- .../matrix-org/dendrite/typingserver/api/output.go | 4 +++- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go index ddc926f0d9..0cf9ea50d8 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go @@ -62,6 +62,8 @@ func (t *OutputTypingEventConsumer) Start() error { return t.consumer.Start() } +// onMessage is called for OutputTypingEvent recieved from the typing servers. +// Parses the msg, creates a matrix federation EDU and sends it to joined hosts. func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Extract the typing event from msg. var ote api.OutputTypingEvent diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go index 026e3e51c3..64d721d4a7 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go @@ -58,10 +58,10 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) { // sendEDU adds the EDU event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to // start sending event to that destination. -func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) { +func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() - oq.pendingEDUs = append(oq.pendingEDUs, ev) + oq.pendingEDUs = append(oq.pendingEDUs, e) if !oq.running { oq.running = true go oq.backgroundSend() diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go index 8d910f6273..4a38dc086d 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go @@ -82,7 +82,7 @@ func (oqs *OutgoingQueues) SendEvent( // SendEDU sends an EDU event to the destinations func (oqs *OutgoingQueues) SendEDU( - ev *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName, + e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName, ) error { if origin != oqs.origin { @@ -97,7 +97,7 @@ func (oqs *OutgoingQueues) SendEDU( destinations = filterDestinations(oqs.origin, destinations) log.WithFields(log.Fields{ - "destinations": destinations, "edu_type": ev.Type, + "destinations": destinations, "edu_type": e.Type, }).Info("Sending EDU event") oqs.queuesMutex.Lock() @@ -113,7 +113,7 @@ func (oqs *OutgoingQueues) SendEDU( oqs.queues[destination] = oq } - oq.sendEDU(ev) + oq.sendEDU(e) } return nil diff --git a/src/github.com/matrix-org/dendrite/typingserver/api/output.go b/src/github.com/matrix-org/dendrite/typingserver/api/output.go index 39b4b7d1ec..813b9b7c7d 100644 --- a/src/github.com/matrix-org/dendrite/typingserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/typingserver/api/output.go @@ -13,10 +13,12 @@ package api // OutputTypingEvent is an entry in typing server output kafka log. +// This contains the event with extra fields used to create 'm.typing' event +// in clientapi & federation. type OutputTypingEvent struct { // The Event for the typing edu event. Event TypingEvent `json:"event"` - // Users typing in the room at the event. + // Users typing in the room when the event was generated. TypingUsers []string `json:"typing_users"` } From 6c1412de0131045c731c055d650a0bf9abd1c99c Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Wed, 8 Aug 2018 19:17:26 +0530 Subject: [PATCH 09/11] fix typo --- .../dendrite/federationsender/consumers/typingserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go index 0cf9ea50d8..5611ce93c8 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go @@ -62,7 +62,7 @@ func (t *OutputTypingEventConsumer) Start() error { return t.consumer.Start() } -// onMessage is called for OutputTypingEvent recieved from the typing servers. +// onMessage is called for OutputTypingEvent received from the typing servers. // Parses the msg, creates a matrix federation EDU and sends it to joined hosts. func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Extract the typing event from msg. From 23cf26bbbb13888e3c7518cdd94da59ff6ffed5f Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Wed, 8 Aug 2018 19:55:14 +0530 Subject: [PATCH 10/11] remove origin field --- .../dendrite/federationsender/consumers/typingserver.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go index 5611ce93c8..c4cd0e5994 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go @@ -83,10 +83,7 @@ func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error names[i] = joined[i].ServerName } - edu := &gomatrixserverlib.EDU{ - Type: ote.Event.Type, - Origin: string(t.ServerName), - } + edu := &gomatrixserverlib.EDU{Type: ote.Event.Type} if edu.Content, err = json.Marshal(map[string]interface{}{ "room_id": ote.Event.RoomID, "user_id": ote.Event.UserID, From 58dceb1a08f2b21681c4e8177c8713b62088c0ff Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Fri, 10 Aug 2018 04:39:56 +0530 Subject: [PATCH 11/11] Count EDUs in sendCounter --- .../dendrite/federationsender/queue/destinationqueue.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go index 64d721d4a7..c0afe3be29 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go @@ -125,6 +125,7 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { t.EDUs = append(t.EDUs, *edu) } oq.pendingEDUs = nil + oq.sentCounter += len(t.EDUs) return &t }