Skip to content

Commit

Permalink
Track bytes (#79)
Browse files Browse the repository at this point in the history
* Make halfPipe log both up and down stats

* Add stats tracking and periodic reporting

* break up read/writes so we can track bytes as they are transferred, not just at the end of the connection

* track new instead of absolute local/api regs

* Correct detection for local registrations

Co-authored-by: Jack Wampler <jmwample@users.noreply.github.com>
  • Loading branch information
ewust and jmwample authored Mar 20, 2021
1 parent 7397f75 commit 55d86ce
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 26 deletions.
67 changes: 60 additions & 7 deletions application/lib/proxies.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,55 @@ func halfPipe(src, dst net.Conn,
// We could try to use io.CopyN in a loop or something that
// gives us occasional bytes. CopyN would not splice, though
// (uses a LimitedReader that only calls Read)
buf := bufferPool.Get().([]byte)
written, err := io.CopyBuffer(dst, src, buf)
//buf := bufferPool.Get().([]byte)
//written, err := io.CopyBuffer(dst, src, buf)

// On closer examination, it seems this code below seems about
// as performant. It's not using splice, but for CO comcast / curveball:
// io.CopyBuffer Read/Write
// curveball CPU ~2% ~2%
// DL 40MB time ~11.5s ~11.6s
// So while io.CopyBuffer is faster, it's not significantly faster

// If we run into perf problems, we can revert

written, err := func() (totWritten int64, err error) {
buf := make([]byte, 32*1024)
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
totWritten += int64(nw)
// Update stats:
if strings.HasPrefix(tag, "Up") {
Stat().AddBytesUp(int64(nw))
} else {
Stat().AddBytesDown(int64(nw))
}

if ew != nil {
if ew != io.EOF {
err = ew
}
break
}
if nw != nr {
err = io.ErrShortWrite
break
}
}
if er != nil {
if er != io.EOF {
err = er
}
break
}
}
return totWritten, err

}()

// Close dst
if closeWriter, ok := dst.(interface {
CloseWrite() error
}); ok {
Expand All @@ -109,13 +156,17 @@ func halfPipe(src, dst net.Conn,
dst.Close()
}

// Close src
if closeReader, ok := src.(interface {
CloseRead() error
}); ok {
closeReader.CloseRead()
} else {
src.Close()
}

// Compute/log stats

proxyEndTime := time.Since(proxyStartTime)
stats := sessionStats{
Duration: int64(proxyEndTime / time.Millisecond),
Expand All @@ -127,11 +178,13 @@ func halfPipe(src, dst net.Conn,
}
stats_str, _ := json.Marshal(stats)
logger.Printf("stopping forwarding %s", stats_str)
if strings.HasPrefix(tag, "Up") {
Stat().AddBytesUp(written)
} else {
Stat().AddBytesDown(written)
}
/*
if strings.HasPrefix(tag, "Up") {
Stat().AddBytesUp(written)
} else {
Stat().AddBytesDown(written)
}*/

wg.Done()
}

Expand Down
42 changes: 23 additions & 19 deletions application/lib/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ type Stats struct {
newConns int64 // new connections since last stats.reset()
newErrConns int64 // new connections that had some sort of error since last reset()

activeRegistrations int64 // Current number of active registrations we have
activeLocalRegistrations int64 // Current registrations that were picked up from this detector (also included in newRegistrations)
activeApiRegistrations int64 // Current registrations that we heard about from the API (also included in newRegistrations)
newRegistrations int64 // Added registrations since last reset()
newMissedRegistrations int64 // number of "missed" registrations (as seen by a connection with no registration)
newErrRegistrations int64 // number of registrations that had some kinda error
newDupRegistrations int64 // number of duplicate registrations (doesn't uniquify, so might have some double counting)
activeRegistrations int64 // Current number of active registrations we have
newLocalRegistrations int64 // Current registrations that were picked up from this detector (also included in newRegistrations)
newApiRegistrations int64 // Current registrations that we heard about from the API (also included in newRegistrations)
newRegistrations int64 // Added registrations since last reset()
newMissedRegistrations int64 // number of "missed" registrations (as seen by a connection with no registration)
newErrRegistrations int64 // number of registrations that had some kinda error
newDupRegistrations int64 // number of duplicate registrations (doesn't uniquify, so might have some double counting)


newLivenessPass int64 // Liveness tests that passed (non-live phantom) since reset()
newLivenessFail int64 // Liveness tests that failed (live phantom) since reset()
Expand Down Expand Up @@ -67,6 +68,8 @@ func initStats() {
func (s *Stats) Reset() {
atomic.StoreInt64(&s.newConns, 0)
atomic.StoreInt64(&s.newRegistrations, 0)
atomic.StoreInt64(&s.newLocalRegistrations, 0)
atomic.StoreInt64(&s.newApiRegistrations, 0)
atomic.StoreInt64(&s.newMissedRegistrations, 0)
atomic.StoreInt64(&s.newErrRegistrations, 0)
atomic.StoreInt64(&s.newDupRegistrations, 0)
Expand All @@ -77,11 +80,11 @@ func (s *Stats) Reset() {
}

func (s *Stats) PrintStats() {
s.logger.Printf("Conns: %d cur %d new %d err Regs: %d cur (%d local %d API) %d new %d miss %d err %d dup LiveT: %d valid %d live Byte: %d up %d down",
s.logger.Printf("Conns: %d cur %d new %d err Regs: %d cur %d new (%d local %d API) %d miss %d err %d dup LiveT: %d valid %d live Byte: %d up %d down",
atomic.LoadInt64(&s.activeConns), atomic.LoadInt64(&s.newConns), atomic.LoadInt64(&s.newErrConns),
atomic.LoadInt64(&s.activeRegistrations),
atomic.LoadInt64(&s.activeLocalRegistrations), atomic.LoadInt64(&s.activeApiRegistrations),
atomic.LoadInt64(&s.newRegistrations),
atomic.LoadInt64(&s.newLocalRegistrations), atomic.LoadInt64(&s.newApiRegistrations),
atomic.LoadInt64(&s.newMissedRegistrations),
atomic.LoadInt64(&s.newErrRegistrations), atomic.LoadInt64(&s.newDupRegistrations),
atomic.LoadInt64(&s.newLivenessPass), atomic.LoadInt64(&s.newLivenessFail),
Expand All @@ -107,12 +110,12 @@ func (s *Stats) AddReg(generation uint32, source *pb.RegistrationSource) {
atomic.AddInt64(&s.activeRegistrations, 1)
atomic.AddInt64(&s.newRegistrations, 1)

if *source == pb.RegistrationSource_DetectorPrescan {
atomic.AddInt64(&s.activeLocalRegistrations, 1)
//atomic.AddInt64(&s.newLocalRegistrations, 1) // I don't think a delta of this is super useful...
if *source == pb.RegistrationSource_Detector {
//atomic.AddInt64(&s.activeLocalRegistrations, 1) // Actually an absolute is not super useful.
atomic.AddInt64(&s.newLocalRegistrations, 1)
} else {
atomic.AddInt64(&s.activeApiRegistrations, 1)
//atomic.AddInt64(&s.newApiRegistrations, 1)
//atomic.AddInt64(&s.activeApiRegistrations, 1)
atomic.AddInt64(&s.newApiRegistrations, 1)
}
s.genMutex.Lock()
s.generations[generation] += 1
Expand All @@ -130,11 +133,12 @@ func (s *Stats) AddErrReg() {
func (s *Stats) ExpireReg(generation uint32, source *pb.RegistrationSource) {
atomic.AddInt64(&s.activeRegistrations, -1)

if *source == pb.RegistrationSource_DetectorPrescan {
atomic.AddInt64(&s.activeLocalRegistrations, -1)
} else {
atomic.AddInt64(&s.activeApiRegistrations, -1)
}
/*
if *source == pb.RegistrationSource_Detector {
atomic.AddInt64(&s.activeLocalRegistrations, -1)
} else {
atomic.AddInt64(&s.activeApiRegistrations, -1)
}*/
s.genMutex.Lock()
s.generations[generation] -= 1
s.genMutex.Unlock()
Expand Down

0 comments on commit 55d86ce

Please sign in to comment.