Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track bytes #79

Merged
merged 6 commits into from
Mar 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 60 additions & 7 deletions application/lib/proxies.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,55 @@ func halfPipe(src, dst net.Conn,
// We could try to use io.CopyN in a loop or something that
// gives us occasional bytes. CopyN would not splice, though
// (uses a LimitedReader that only calls Read)
buf := bufferPool.Get().([]byte)
written, err := io.CopyBuffer(dst, src, buf)
//buf := bufferPool.Get().([]byte)
//written, err := io.CopyBuffer(dst, src, buf)

// On closer examination, it seems this code below seems about
// as performant. It's not using splice, but for CO comcast / curveball:
// io.CopyBuffer Read/Write
// curveball CPU ~2% ~2%
// DL 40MB time ~11.5s ~11.6s
// So while io.CopyBuffer is faster, it's not significantly faster

// If we run into perf problems, we can revert

written, err := func() (totWritten int64, err error) {
buf := make([]byte, 32*1024)
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
totWritten += int64(nw)
// Update stats:
if strings.HasPrefix(tag, "Up") {
Stat().AddBytesUp(int64(nw))
} else {
Stat().AddBytesDown(int64(nw))
}

if ew != nil {
if ew != io.EOF {
err = ew
}
break
}
if nw != nr {
err = io.ErrShortWrite
break
}
}
if er != nil {
if er != io.EOF {
err = er
}
break
}
}
return totWritten, err

}()

// Close dst
if closeWriter, ok := dst.(interface {
CloseWrite() error
}); ok {
Expand All @@ -109,13 +156,17 @@ func halfPipe(src, dst net.Conn,
dst.Close()
}

// Close src
if closeReader, ok := src.(interface {
CloseRead() error
}); ok {
closeReader.CloseRead()
} else {
src.Close()
}

// Compute/log stats

proxyEndTime := time.Since(proxyStartTime)
stats := sessionStats{
Duration: int64(proxyEndTime / time.Millisecond),
Expand All @@ -127,11 +178,13 @@ func halfPipe(src, dst net.Conn,
}
stats_str, _ := json.Marshal(stats)
logger.Printf("stopping forwarding %s", stats_str)
if strings.HasPrefix(tag, "Up") {
Stat().AddBytesUp(written)
} else {
Stat().AddBytesDown(written)
}
/*
if strings.HasPrefix(tag, "Up") {
Stat().AddBytesUp(written)
} else {
Stat().AddBytesDown(written)
}*/

wg.Done()
}

Expand Down
42 changes: 23 additions & 19 deletions application/lib/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ type Stats struct {
newConns int64 // new connections since last stats.reset()
newErrConns int64 // new connections that had some sort of error since last reset()

activeRegistrations int64 // Current number of active registrations we have
activeLocalRegistrations int64 // Current registrations that were picked up from this detector (also included in newRegistrations)
activeApiRegistrations int64 // Current registrations that we heard about from the API (also included in newRegistrations)
newRegistrations int64 // Added registrations since last reset()
newMissedRegistrations int64 // number of "missed" registrations (as seen by a connection with no registration)
newErrRegistrations int64 // number of registrations that had some kinda error
newDupRegistrations int64 // number of duplicate registrations (doesn't uniquify, so might have some double counting)
activeRegistrations int64 // Current number of active registrations we have
newLocalRegistrations int64 // Current registrations that were picked up from this detector (also included in newRegistrations)
newApiRegistrations int64 // Current registrations that we heard about from the API (also included in newRegistrations)
newRegistrations int64 // Added registrations since last reset()
newMissedRegistrations int64 // number of "missed" registrations (as seen by a connection with no registration)
newErrRegistrations int64 // number of registrations that had some kinda error
newDupRegistrations int64 // number of duplicate registrations (doesn't uniquify, so might have some double counting)


newLivenessPass int64 // Liveness tests that passed (non-live phantom) since reset()
newLivenessFail int64 // Liveness tests that failed (live phantom) since reset()
Expand Down Expand Up @@ -67,6 +68,8 @@ func initStats() {
func (s *Stats) Reset() {
atomic.StoreInt64(&s.newConns, 0)
atomic.StoreInt64(&s.newRegistrations, 0)
atomic.StoreInt64(&s.newLocalRegistrations, 0)
atomic.StoreInt64(&s.newApiRegistrations, 0)
atomic.StoreInt64(&s.newMissedRegistrations, 0)
atomic.StoreInt64(&s.newErrRegistrations, 0)
atomic.StoreInt64(&s.newDupRegistrations, 0)
Expand All @@ -77,11 +80,11 @@ func (s *Stats) Reset() {
}

func (s *Stats) PrintStats() {
s.logger.Printf("Conns: %d cur %d new %d err Regs: %d cur (%d local %d API) %d new %d miss %d err %d dup LiveT: %d valid %d live Byte: %d up %d down",
s.logger.Printf("Conns: %d cur %d new %d err Regs: %d cur %d new (%d local %d API) %d miss %d err %d dup LiveT: %d valid %d live Byte: %d up %d down",
atomic.LoadInt64(&s.activeConns), atomic.LoadInt64(&s.newConns), atomic.LoadInt64(&s.newErrConns),
atomic.LoadInt64(&s.activeRegistrations),
atomic.LoadInt64(&s.activeLocalRegistrations), atomic.LoadInt64(&s.activeApiRegistrations),
atomic.LoadInt64(&s.newRegistrations),
atomic.LoadInt64(&s.newLocalRegistrations), atomic.LoadInt64(&s.newApiRegistrations),
atomic.LoadInt64(&s.newMissedRegistrations),
atomic.LoadInt64(&s.newErrRegistrations), atomic.LoadInt64(&s.newDupRegistrations),
atomic.LoadInt64(&s.newLivenessPass), atomic.LoadInt64(&s.newLivenessFail),
Expand All @@ -107,12 +110,12 @@ func (s *Stats) AddReg(generation uint32, source *pb.RegistrationSource) {
atomic.AddInt64(&s.activeRegistrations, 1)
atomic.AddInt64(&s.newRegistrations, 1)

if *source == pb.RegistrationSource_DetectorPrescan {
atomic.AddInt64(&s.activeLocalRegistrations, 1)
//atomic.AddInt64(&s.newLocalRegistrations, 1) // I don't think a delta of this is super useful...
if *source == pb.RegistrationSource_Detector {
//atomic.AddInt64(&s.activeLocalRegistrations, 1) // Actually an absolute is not super useful.
atomic.AddInt64(&s.newLocalRegistrations, 1)
} else {
atomic.AddInt64(&s.activeApiRegistrations, 1)
//atomic.AddInt64(&s.newApiRegistrations, 1)
//atomic.AddInt64(&s.activeApiRegistrations, 1)
atomic.AddInt64(&s.newApiRegistrations, 1)
}
s.genMutex.Lock()
s.generations[generation] += 1
Expand All @@ -130,11 +133,12 @@ func (s *Stats) AddErrReg() {
func (s *Stats) ExpireReg(generation uint32, source *pb.RegistrationSource) {
atomic.AddInt64(&s.activeRegistrations, -1)

if *source == pb.RegistrationSource_DetectorPrescan {
atomic.AddInt64(&s.activeLocalRegistrations, -1)
} else {
atomic.AddInt64(&s.activeApiRegistrations, -1)
}
/*
if *source == pb.RegistrationSource_Detector {
atomic.AddInt64(&s.activeLocalRegistrations, -1)
} else {
atomic.AddInt64(&s.activeApiRegistrations, -1)
}*/
s.genMutex.Lock()
s.generations[generation] -= 1
s.genMutex.Unlock()
Expand Down