From 5f2c008b8eccb491972270e67ea26ecd7e86936a Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Tue, 23 Feb 2021 19:13:26 +0100 Subject: [PATCH 1/3] add an option to disable concurrent reads Fixs #345 --- client.go | 32 +++++++++++++++++++++++++++----- client_integration_test.go | 33 ++++++++++++++++++--------------- 2 files changed, 45 insertions(+), 20 deletions(-) diff --git a/client.go b/client.go index 11bb0bc2..87ce0cb9 100644 --- a/client.go +++ b/client.go @@ -112,6 +112,24 @@ func UseConcurrentWrites(value bool) ClientOption { } } +// UseConcurrentReads allows the Client to perform concurrent Reads. +// +// Concurrent reads are generally safe to use and not using them will degrade +// performance, so this option is enabled by default. +// +// When enabled, WriteTo will use Stat/Fstat to get the file size and determines +// how many concurrent workers to use. +// Some "read once" servers will delete the file if they receive a stat call on an +// open file and then the download will fail. +// Disabling concurrent reads you will be able to download files from these servers. +// If concurrent reads are disabled, the UseFstat option is ignored. +func UseConcurrentReads(value bool) ClientOption { + return func(c *Client) error { + c.disableConcurrentReads = !value + return nil + } +} + // UseFstat sets whether to use Fstat or Stat when File.WriteTo is called // (usually when copying files). // Some servers limit the amount of open files and calling Stat after opening @@ -152,8 +170,9 @@ type Client struct { // write concurrency is… error prone. // Default behavior should be to not use it. - useConcurrentWrites bool - useFstat bool + useConcurrentWrites bool + useFstat bool + disableConcurrentReads bool } // NewClient creates a new SFTP client on conn, using zero or more option @@ -951,9 +970,8 @@ func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err erro // the number of bytes read and an error, if any. ReadAt follows io.ReaderAt semantics, // so the file offset is not altered during the read. func (f *File) ReadAt(b []byte, off int64) (int, error) { - if len(b) <= f.c.maxPacket { - // This should be able to be serviced with 1/2 requests. - // So, just do it directly. + if len(b) <= f.c.maxPacket || f.c.disableConcurrentReads { + // This should be able to be serviced with 1/2 requests or concurrent reads are disabled. return f.readChunkAt(nil, b, off) } @@ -1098,6 +1116,10 @@ func (f *File) WriteTo(w io.Writer) (written int64, err error) { f.mu.Lock() defer f.mu.Unlock() + if f.c.disableConcurrentReads { + return f.writeToSequential(w) + } + // For concurrency, we want to guess how many concurrent workers we should use. var fileSize uint64 if f.c.useFstat { diff --git a/client_integration_test.go b/client_integration_test.go index fcb41c3b..ac10959f 100644 --- a/client_integration_test.go +++ b/client_integration_test.go @@ -1219,21 +1219,24 @@ func TestClientRead(t *testing.T) { } defer os.RemoveAll(d) - for _, tt := range clientReadTests { - f, err := ioutil.TempFile(d, "read-test") - if err != nil { - t.Fatal(err) - } - defer f.Close() - hash := writeN(t, f, tt.n) - f2, err := sftp.Open(f.Name()) - if err != nil { - t.Fatal(err) - } - defer f2.Close() - hash2, n := readHash(t, f2) - if hash != hash2 || tt.n != n { - t.Errorf("Read: hash: want: %q, got %q, read: want: %v, got %v", hash, hash2, tt.n, n) + for _, disableConcurrentReads := range []bool{true, false} { + for _, tt := range clientReadTests { + f, err := ioutil.TempFile(d, "read-test") + if err != nil { + t.Fatal(err) + } + defer f.Close() + hash := writeN(t, f, tt.n) + sftp.disableConcurrentReads = disableConcurrentReads + f2, err := sftp.Open(f.Name()) + if err != nil { + t.Fatal(err) + } + defer f2.Close() + hash2, n := readHash(t, f2) + if hash != hash2 || tt.n != n { + t.Errorf("Read: hash: want: %q, got %q, read: want: %v, got %v", hash, hash2, tt.n, n) + } } } } From e1e59da6e369b9b0fa5108de9623616e444c483e Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Fri, 5 Mar 2021 15:04:17 +0100 Subject: [PATCH 2/3] add readAtSequential: used if concurrent reads are disabled ... ... and the requested buffer is bigger than maxPacket --- client.go | 33 +++++++++++++++++++-- client_integration_test.go | 61 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 2 deletions(-) diff --git a/client.go b/client.go index 87ce0cb9..7656c50e 100644 --- a/client.go +++ b/client.go @@ -966,15 +966,44 @@ func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err erro return } +func (f *File) readAtSequential(b []byte, off int64) (int, error) { + readed := 0 + + for { + endRead := int64(math.Min(float64(readed+f.c.maxPacket), float64(len(b)))) + n, err := f.readChunkAt(nil, b[readed:endRead], off+int64(readed)) + if n < 0 { + panic("sftp.File: returned negative count from readChunkAt") + } + if n > 0 { + readed += n + } + if err != nil { + if errors.Is(err, io.EOF) { + return readed, nil // return nil explicitly. + } + return readed, err + } + if readed == len(b) { + return readed, nil + } + } +} + // ReadAt reads up to len(b) byte from the File at a given offset `off`. It returns // the number of bytes read and an error, if any. ReadAt follows io.ReaderAt semantics, // so the file offset is not altered during the read. func (f *File) ReadAt(b []byte, off int64) (int, error) { - if len(b) <= f.c.maxPacket || f.c.disableConcurrentReads { - // This should be able to be serviced with 1/2 requests or concurrent reads are disabled. + if len(b) <= f.c.maxPacket { + // This should be able to be serviced with 1/2 requests. + // So, just do it directly. return f.readChunkAt(nil, b, off) } + if f.c.disableConcurrentReads { + return f.readAtSequential(b, off) + } + // Split the read into multiple maxPacket-sized concurrent reads bounded by maxConcurrentRequests. // This allows writes with a suitably large buffer to transfer data at a much faster rate // by overlapping round trip times. diff --git a/client_integration_test.go b/client_integration_test.go index ac10959f..e0401152 100644 --- a/client_integration_test.go +++ b/client_integration_test.go @@ -1115,6 +1115,67 @@ func TestClientReadSimple(t *testing.T) { } } +func TestClientReadSequential(t *testing.T) { + sftp, cmd := testClient(t, READONLY, NODELAY) + defer cmd.Wait() + defer sftp.Close() + + sftp.disableConcurrentReads = true + d, err := ioutil.TempDir("", "sftptest-readsequential") + require.NoError(t, err) + + defer os.RemoveAll(d) + + f, err := ioutil.TempFile(d, "read-sequential-test") + require.NoError(t, err) + fname := f.Name() + content := []byte("hello world") + f.Write(content) + f.Close() + + for _, maxPktSize := range []int{1, 2, 3, 4} { + sftp.maxPacket = maxPktSize + + sftpFile, err := sftp.Open(fname) + require.NoError(t, err) + + stuff := make([]byte, 32) + n, err := sftpFile.Read(stuff) + require.NoError(t, err) + require.Equal(t, len(content), n) + require.Equal(t, content, stuff[0:len(content)]) + + err = sftpFile.Close() + require.NoError(t, err) + + sftpFile, err = sftp.Open(fname) + require.NoError(t, err) + + stuff = make([]byte, 5) + n, err = sftpFile.Read(stuff) + require.NoError(t, err) + require.Equal(t, len(stuff), n) + require.Equal(t, content[:len(stuff)], stuff) + + err = sftpFile.Close() + require.NoError(t, err) + + // now read from a offset + off := int64(3) + sftpFile, err = sftp.Open(fname) + require.NoError(t, err) + + stuff = make([]byte, 5) + n, err = sftpFile.ReadAt(stuff, off) + require.NoError(t, err) + require.Equal(t, len(stuff), n) + require.Equal(t, content[off:off+int64(len(stuff))], stuff) + + err = sftpFile.Close() + require.NoError(t, err) + } +} + func TestClientReadDir(t *testing.T) { sftp1, cmd1 := testClient(t, READONLY, NODELAY) sftp2, cmd2 := testClientGoSvr(t, READONLY, NODELAY) From c539fdb9b40e5669834d67d8726389687cb83191 Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Fri, 5 Mar 2021 17:10:39 +0100 Subject: [PATCH 3/3] improve readAtSequential as for review --- client.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/client.go b/client.go index 7656c50e..dc8a423e 100644 --- a/client.go +++ b/client.go @@ -966,28 +966,27 @@ func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err erro return } -func (f *File) readAtSequential(b []byte, off int64) (int, error) { - readed := 0 - - for { - endRead := int64(math.Min(float64(readed+f.c.maxPacket), float64(len(b)))) - n, err := f.readChunkAt(nil, b[readed:endRead], off+int64(readed)) +func (f *File) readAtSequential(b []byte, off int64) (read int, err error) { + for read < len(b) { + rb := b[read:] + if len(rb) > f.c.maxPacket { + rb = rb[:f.c.maxPacket] + } + n, err := f.readChunkAt(nil, rb, off+int64(read)) if n < 0 { panic("sftp.File: returned negative count from readChunkAt") } if n > 0 { - readed += n + read += n } if err != nil { if errors.Is(err, io.EOF) { - return readed, nil // return nil explicitly. + return read, nil // return nil explicitly. } - return readed, err - } - if readed == len(b) { - return readed, nil + return read, err } } + return read, nil } // ReadAt reads up to len(b) byte from the File at a given offset `off`. It returns