From 31a6d57f052d3049b245a226aa6691a3b9230ae7 Mon Sep 17 00:00:00 2001 From: Will Banfield Date: Wed, 9 Aug 2017 17:31:14 -0400 Subject: [PATCH] minor: move change stream to own file --- changestreams.go | 63 ++++++++++++++++++++++++++++++++++++++++++++++++ session.go | 62 ----------------------------------------------- 2 files changed, 63 insertions(+), 62 deletions(-) create mode 100644 changestreams.go diff --git a/changestreams.go b/changestreams.go new file mode 100644 index 000000000..cb5dda2f5 --- /dev/null +++ b/changestreams.go @@ -0,0 +1,63 @@ +package mgo + +type ChangeStream struct { + iter *Iter + options ChangeStreamOptions + pipeline interface{} + readPreference *ReadPreference +} + +type ChangeStreamOptions struct { + + // FullDocument controls the amount of data that the server will return when + // returning a changes document. + FullDocument string + + // ResumeAfter specifies the logical starting point for the new change stream. + ResumeAfter *bson.Raw + + // MaxAwaitTimeMS specifies the maximum amount of time for the server to wait + // on new documents to satisfy a change stream query. + MaxAwaitTimeMS int64 + + // BatchSize specifies the number of documents to return per batch. + BatchSize int32 + + // Collation specifies the way the server should collate returned data. + Collation *Collation +} + +func constructChangeStreamPipeline(pipeline interface{}, + options ChangeStreamOptions) interface{} { + pipelinev := reflect.ValueOf(pipeline) + + // ensure that the pipeline passed in is a slice. + if pipelinev.Kind() != reflect.Slice { + panic("pipeline argument must be a slice") + } + + // construct the options to be used by the change notification + // pipeline stage. + changeNotificationStageOptions := bson.M{} + + if options.FullDocument != "" { + changeNotificationStageOptions["fullDocument"] = options.FullDocument + } + if options.ResumeAfter != nil { + changeNotificationStageOptions["resumeAfter"] = options.ResumeAfter + } + changeNotificationStage := bson.M{"$changeNotification": changeNotificationStageOptions} + + pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1) + + // insert the change notification pipeline stage at the beginning of the + // aggregation. + pipeOfInterfaces[0] = changeNotificationStage + + // convert the passed in slice to a slice of interfaces. + for i := 0; i < pipelinev.Len(); i++ { + pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface() + } + var pipelineAsInterface interface{} = pipeOfInterfaces + return pipelineAsInterface +} diff --git a/session.go b/session.go index 6b0b1dcf6..108cdae88 100644 --- a/session.go +++ b/session.go @@ -2247,68 +2247,6 @@ func (c *Collection) FindId(id interface{}) *Query { return c.Find(bson.D{{"_id", id}}) } -type ChangeStream struct { - iter *Iter - options ChangeStreamOptions - pipeline interface{} - readPreference *ReadPreference -} - -type ChangeStreamOptions struct { - - // FullDocument controls the amount of data that the server will return when - // returning a changes document. - FullDocument string - - // ResumeAfter specifies the logical starting point for the new change stream. - ResumeAfter *bson.Raw - - // MaxAwaitTimeMS specifies the maximum amount of time for the server to wait - // on new documents to satisfy a change stream query. - MaxAwaitTimeMS int64 - - // BatchSize specifies the number of documents to return per batch. - BatchSize int32 - - // Collation specifies the way the server should collate returned data. - Collation *Collation -} - -func constructChangeStreamPipeline(pipeline interface{}, - options ChangeStreamOptions) interface{} { - pipelinev := reflect.ValueOf(pipeline) - - // ensure that the pipeline passed in is a slice. - if pipelinev.Kind() != reflect.Slice { - panic("pipeline argument must be a slice") - } - - // construct the options to be used by the change notification - // pipeline stage. - changeNotificationStageOptions := bson.M{} - - if options.FullDocument != "" { - changeNotificationStageOptions["fullDocument"] = options.FullDocument - } - if options.ResumeAfter != nil { - changeNotificationStageOptions["resumeAfter"] = options.ResumeAfter - } - changeNotificationStage := bson.M{"$changeNotification": changeNotificationStageOptions} - - pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1) - - // insert the change notification pipeline stage at the beginning of the - // aggregation. - pipeOfInterfaces[0] = changeNotificationStage - - // convert the passed in slice to a slice of interfaces. - for i := 0; i < pipelinev.Len(); i++ { - pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface() - } - var pipelineAsInterface interface{} = pipeOfInterfaces - return pipelineAsInterface -} - type Pipe struct { session *Session collection *Collection