From 3655c1cdd40fcf0b7f3c0522e6c29d5659bffcb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 10 Aug 2018 17:32:43 -0700 Subject: [PATCH] add a streaming CID set used in https://github.com/ipfs/go-ipfs/pull/4804 --- set.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/set.go b/set.go index b801ade..1096649 100644 --- a/set.go +++ b/set.go @@ -1,5 +1,9 @@ package cid +import ( + "context" +) + // Set is a implementation of a set of Cids, that is, a structure // to which holds a single copy of every Cids that is added to it. type Set struct { @@ -65,3 +69,34 @@ func (s *Set) ForEach(f func(c *Cid) error) error { } return nil } + +// StreamingSet is an extension of Set which allows to implement back-pressure +// for the Visit function +type StreamingSet struct { + Set *Set + New chan *Cid +} + +// NewStreamingSet initializes and returns new Set. +func NewStreamingSet() *StreamingSet { + return &StreamingSet{ + Set: NewSet(), + New: make(chan *Cid), + } +} + +// Visitor creates new visitor which adds a Cids to the set and emits them to +// the set.New channel +func (s *StreamingSet) Visitor(ctx context.Context) func(c *Cid) bool { + return func(c *Cid) bool { + if s.Set.Visit(c) { + select { + case s.New <- c: + case <-ctx.Done(): + } + return true + } + + return false + } +}