Skip to content

Commit

Permalink
Multi-master only run single startMirror at a time (#3003)
Browse files Browse the repository at this point in the history
Also randomize the sleep appropriately based
on Unix() time source.
  • Loading branch information
harshavardhana authored Dec 11, 2019
1 parent 871b8da commit e109a53
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions cmd/mirror-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,13 @@ EXAMPLES:
const uaMirrorAppName = "mc-mirror"

type mirrorJob struct {

// the channel to trap SIGKILL signals
trapCh <-chan bool
stopCh chan struct{}

// mutex for shutdown, this prevents the shutdown
// to be initiated multiple times
m *sync.Mutex
m sync.Mutex

// the global watcher object, which receives notifications of created
// and deleted files
Expand All @@ -204,10 +203,11 @@ type mirrorJob struct {
sourceURL string
targetURL string

isFake, isRemove, isOverwrite, isWatch, isPreserve bool
olderThan, newerThan string
storageClass string
userMetadata map[string]string
isFake, isRemove, isOverwrite bool
isWatch, isPreserve bool
olderThan, newerThan string
storageClass string
userMetadata map[string]string

excludeOptions []string
encKeyDB map[string][]prefixSSEPair
Expand Down Expand Up @@ -561,6 +561,10 @@ func (mj *mirrorJob) watchURL(sourceClient Client) *probe.Error {

// Fetch urls that need to be mirrored
func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.CancelFunc, stopParallel func()) {
// Do not run multiple startMirror's
mj.m.Lock()
defer mj.m.Unlock()

isMetadata := len(mj.userMetadata) > 0 || mj.isPreserve
URLsCh := prepareMirrorURLs(mj.sourceURL, mj.targetURL, mj.isFake, mj.isOverwrite, mj.isRemove, isMetadata, mj.excludeOptions, mj.encKeyDB)

Expand Down Expand Up @@ -645,6 +649,7 @@ func (mj *mirrorJob) mirror(ctx context.Context, cancelMirror context.CancelFunc
close(mj.queueCh)
mj.parallel.wait()
}
// startMirror locks and blocks itself.
mj.startMirror(ctx, cancelMirror, stopParallel)
}()

Expand All @@ -669,7 +674,10 @@ func (mj *mirrorJob) mirror(ctx context.Context, cancelMirror context.CancelFunc
case <-ticker.C:
// Start with random sleep time, so as to avoid
// "synchronous checks" between servers
time.Sleep(time.Duration(rand.Float64() * float64(time.Second*5)))
r := rand.New(rand.NewSource(time.Now().Unix()))
time.Sleep(time.Duration(r.Float64() * float64(time.Second*5)))
// startMirror blocks if there is already
// another mirror running.
mj.startMirror(ctx, cancelMirror, nil)
}
}
Expand All @@ -692,7 +700,6 @@ func newMirrorJob(srcURL, dstURL string, isFake, isRemove, isOverwrite, isWatch,
mj := mirrorJob{
trapCh: signalTrap(os.Interrupt, syscall.SIGTERM, syscall.SIGKILL),
stopCh: make(chan struct{}),
m: new(sync.Mutex),

sourceURL: srcURL,
targetURL: dstURL,
Expand Down

0 comments on commit e109a53

Please sign in to comment.