From 88546e033c5e77d371a1475dbbefba43d71a0cf8 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 26 Mar 2020 21:58:20 +0700 Subject: [PATCH] export qlogs when the QLOGDIR env variable is set --- .gitignore | 2 ++ buffered_write_closer.go | 25 ++++++++++++++ buffered_write_closer_test.go | 26 +++++++++++++++ conn_test.go | 4 +-- listener.go | 2 +- transport.go | 62 +++++++++++++++++++++++++++-------- 6 files changed, 105 insertions(+), 16 deletions(-) create mode 100644 .gitignore create mode 100644 buffered_write_closer.go create mode 100644 buffered_write_closer_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..45e3d2a --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.qlog +*.qlog.gz diff --git a/buffered_write_closer.go b/buffered_write_closer.go new file mode 100644 index 0000000..aeeef00 --- /dev/null +++ b/buffered_write_closer.go @@ -0,0 +1,25 @@ +package libp2pquic + +import ( + "bufio" + "io" +) + +type bufferedWriteCloser struct { + *bufio.Writer + io.Closer +} + +func newBufferedWriteCloser(writer *bufio.Writer, closer io.Closer) io.WriteCloser { + return &bufferedWriteCloser{ + Writer: writer, + Closer: closer, + } +} + +func (h bufferedWriteCloser) Close() error { + if err := h.Writer.Flush(); err != nil { + return err + } + return h.Closer.Close() +} diff --git a/buffered_write_closer_test.go b/buffered_write_closer_test.go new file mode 100644 index 0000000..949e211 --- /dev/null +++ b/buffered_write_closer_test.go @@ -0,0 +1,26 @@ +package libp2pquic + +import ( + "bufio" + "bytes" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +type nopCloser struct{} + +func (nopCloser) Close() error { return nil } + +var _ = Describe("buffered io.WriteCloser", func() { + It("flushes before closing", func() { + buf := &bytes.Buffer{} + + w := bufio.NewWriter(buf) + wc := newBufferedWriteCloser(w, &nopCloser{}) + wc.Write([]byte("foobar")) + Expect(buf.Len()).To(BeZero()) + Expect(wc.Close()).To(Succeed()) + Expect(buf.String()).To(Equal("foobar")) + }) +}) diff --git a/conn_test.go b/conn_test.go index d70af9f..6132034 100644 --- a/conn_test.go +++ b/conn_test.go @@ -185,13 +185,13 @@ var _ = Describe("Connection", func() { Expect(err).ToNot(HaveOccurred()) // make sure that connection attempts fails - clientTransport.(*transport).config.HandshakeTimeout = 250 * time.Millisecond + clientTransport.(*transport).clientConfig.HandshakeTimeout = 250 * time.Millisecond _, err = clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID) Expect(err).To(HaveOccurred()) Expect(err.(net.Error).Timeout()).To(BeTrue()) // now allow the address and make sure the connection goes through - clientTransport.(*transport).config.HandshakeTimeout = 2 * time.Second + clientTransport.(*transport).clientConfig.HandshakeTimeout = 2 * time.Second filters.AddFilter(ipNet, filter.ActionAccept) conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID) Expect(err).ToNot(HaveOccurred()) diff --git a/listener.go b/listener.go index 422f7be..9c77d24 100644 --- a/listener.go +++ b/listener.go @@ -36,7 +36,7 @@ func newListener(rconn *reuseConn, t *transport, localPeer peer.ID, key ic.PrivK conf, _ := identity.ConfigForAny() return conf, nil } - ln, err := quic.Listen(rconn, &tlsConf, t.config) + ln, err := quic.Listen(rconn, &tlsConf, t.serverConfig) if err != nil { return nil, err } diff --git a/transport.go b/transport.go index fa54bd8..2ef059b 100644 --- a/transport.go +++ b/transport.go @@ -1,10 +1,15 @@ package libp2pquic import ( + "bufio" + "compress/gzip" "context" "errors" + "fmt" "io" "net" + "os" + "time" "github.com/minio/sha256-simd" "golang.org/x/crypto/hkdf" @@ -87,11 +92,12 @@ func (c *connManager) Dial(network string, raddr *net.UDPAddr) (*reuseConn, erro // The Transport implements the tpt.Transport interface for QUIC connections. type transport struct { - privKey ic.PrivKey - localPeer peer.ID - identity *p2ptls.Identity - connManager *connManager - config *quic.Config + privKey ic.PrivKey + localPeer peer.ID + identity *p2ptls.Identity + connManager *connManager + serverConfig *quic.Config + clientConfig *quic.Config } var _ tpt.Transport = &transport{} @@ -125,13 +131,17 @@ func NewTransport(key ic.PrivKey, psk pnet.PSK, filters *filter.Filters) (tpt.Tr return nil, err } - return &transport{ - privKey: key, - localPeer: localPeer, - identity: identity, - connManager: connManager, - config: config, - }, nil + t := &transport{ + privKey: key, + localPeer: localPeer, + identity: identity, + connManager: connManager, + serverConfig: config, + clientConfig: config.Clone(), + } + t.serverConfig.GetLogWriter = t.GetLogWriterFor("server") + t.clientConfig.GetLogWriter = t.GetLogWriterFor("client") + return t, nil } // Dial dials a new QUIC connection @@ -153,7 +163,7 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp if err != nil { return nil, err } - sess, err := quic.DialContext(ctx, pconn, addr, host, tlsConf, t.config) + sess, err := quic.DialContext(ctx, pconn, addr, host, tlsConf, t.clientConfig) if err != nil { pconn.DecreaseCount() return nil, err @@ -190,6 +200,32 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp }, nil } +func (t *transport) GetLogWriterFor(role string) func([]byte) io.WriteCloser { + qlogDir := os.Getenv("QLOGDIR") + if len(qlogDir) == 0 { + return nil + } + return func(connID []byte) io.WriteCloser { + // create the QLOGDIR, if it doesn't exist + if _, err := os.Stat(qlogDir); os.IsNotExist(err) { + if err := os.MkdirAll(qlogDir, 0777); err != nil { + log.Errorf("creating the QLOGDIR failed: %s", err) + return nil + } + } + now := time.Now() + t := fmt.Sprintf("%d-%02d-%02dT%02d-%02d-%02d-%06d", now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second(), now.Nanosecond()/1000) + filename := fmt.Sprintf("%s/log_%s_%s_%x.qlog.gz", qlogDir, t, role, connID) + f, err := os.Create(filename) + if err != nil { + log.Errorf("unable to create qlog file %s: %s", filename, err) + return nil + } + gz := gzip.NewWriter(f) + return newBufferedWriteCloser(bufio.NewWriter(gz), gz) + } +} + // CanDial determines if we can dial to an address func (t *transport) CanDial(addr ma.Multiaddr) bool { return mafmt.QUIC.Matches(addr)