Skip to content

Commit

Permalink
feat: support for reading csv files with several rows
Browse files Browse the repository at this point in the history
  • Loading branch information
0xff-dev committed Dec 13, 2023
1 parent 8465356 commit 340e454
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 0 deletions.
55 changes: 55 additions & 0 deletions graphql-server/go-server/pkg/common/read_csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2023 KubeAGI.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common

import (
"encoding/csv"
"io"
)

// ReadCSV function reads the data in lines from startLine and returns an error if there is an error,
// you can determine if there is still data by determining if err is io.
func ReadCSV(o io.Reader, startLine, lines int64) ([][]string, error) {
var (
line []string
err error
cur = int64(0)
recordLines = int64(0)
result [][]string
)

csvReader := csv.NewReader(o)

for {
line, err = csvReader.Read()
if err != nil {
if err != io.EOF {
return nil, err
}
break
}
cur++
if cur >= startLine {
if recordLines >= lines {
break
}
result = append(result, line)
recordLines++
}
}

return result, err
}
158 changes: 158 additions & 0 deletions graphql-server/go-server/pkg/common/read_csv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
Copyright 2023 KubeAGI.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common

import (
"bytes"
"io"
"reflect"
"testing"
)

const csvData = `a,a,a
b,b,b
c,c,c
d,d,d
e,e,e
f,f,f
g,g,g
i,i,i
1,1,1
2,2,2
3,3,3
4,4,4
5,5,5
6,6,6`

func TestReadCSV(t *testing.T) {
type input struct {
startLine, size int64
exp [][]string
expErr error
}

reader := bytes.NewReader([]byte(csvData))
for _, tc := range []input{
{
1, 1, [][]string{{"a", "a", "a"}}, nil,
},
{
1, 2, [][]string{{"a", "a", "a"}, {"b", "b", "b"}}, nil,
},
{
2, 1, [][]string{{"b", "b", "b"}}, nil,
},
{
2, 2, [][]string{{"b", "b", "b"}, {"c", "c", "c"}}, nil,
},
{
9, 10, [][]string{{"1", "1", "1"}, {"2", "2", "2"}, {"3", "3", "3"}, {"4", "4", "4"}, {"5", "5", "5"}, {"6", "6", "6"}}, io.EOF,
},
{
14, 1, [][]string{{"6", "6", "6"}}, io.EOF,
},
{
14, 2, [][]string{{"6", "6", "6"}}, io.EOF,
},
{
8, 3, [][]string{{"i", "i", "i"}, {"1", "1", "1"}, {"2", "2", "2"}}, nil,
},
{
1, 15, [][]string{
{"a", "a", "a"},
{"b", "b", "b"},
{"c", "c", "c"},
{"d", "d", "d"},
{"e", "e", "e"},
{"f", "f", "f"},
{"g", "g", "g"},
{"i", "i", "i"},
{"1", "1", "1"},
{"2", "2", "2"},
{"3", "3", "3"},
{"4", "4", "4"},
{"5", "5", "5"},
{"6", "6", "6"},
}, io.EOF,
},
{
1, 14, [][]string{
{"a", "a", "a"},
{"b", "b", "b"},
{"c", "c", "c"},
{"d", "d", "d"},
{"e", "e", "e"},
{"f", "f", "f"},
{"g", "g", "g"},
{"i", "i", "i"},
{"1", "1", "1"},
{"2", "2", "2"},
{"3", "3", "3"},
{"4", "4", "4"},
{"5", "5", "5"},
{"6", "6", "6"},
}, io.EOF,
},
{
15, 2, nil, io.EOF,
},
{
// page=1, size=3
1, 3, [][]string{
{"a", "a", "a"},
{"b", "b", "b"},
{"c", "c", "c"},
}, nil,
},
{
// page=2,size=3
4, 3, [][]string{
{"d", "d", "d"},
{"e", "e", "e"},
{"f", "f", "f"},
}, nil,
},
{
// page=3,size=3
7, 3, [][]string{
{"g", "g", "g"},
{"i", "i", "i"},
{"1", "1", "1"},
}, nil,
},
{
// page=4,size=3
10, 3, [][]string{
{"2", "2", "2"},
{"3", "3", "3"},
{"4", "4", "4"},
}, nil,
},
{
// page=5,size=3
13, 3, [][]string{
{"5", "5", "5"},
{"6", "6", "6"},
}, io.EOF,
},
} {
r, err := ReadCSV(reader, tc.startLine, tc.size)
if err != tc.expErr || !reflect.DeepEqual(tc.exp, r) {
t.Fatalf("expect %v get %v, expect error %v get %v", tc.exp, r, tc.expErr, err)
}
_, _ = reader.Seek(0, io.SeekStart)
}
}
70 changes: 70 additions & 0 deletions graphql-server/go-server/service/minio_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,19 @@ type (
FileName string `json:"fileName"`
UploadID string `json:"uploadID"`
}

ReadCSVResp struct {
Rows [][]string `json:"rows"`
More bool `json:"more"`
}
)

const (
bucketQuery = "bucket"
bucketPathQuery = "bucketPath"
md5Query = "md5"

maxCSVLines = 100
)

/*
Expand Down Expand Up @@ -568,6 +575,68 @@ func (m *minioAPI) Download(ctx *gin.Context) {
_, _ = io.Copy(ctx.Writer, info)
}

func (m *minioAPI) ReadCSVLines(ctx *gin.Context) {
var (
page int64
lines int64

bucket, bucketPath string
fileName string
)
_, _ = fmt.Sscanf(ctx.Query("page"), "%d", &page)
_, _ = fmt.Sscanf(ctx.Query("size"), "%d", &lines)
if page <= 0 {
klog.Errorf("the minimum page should be 1")
ctx.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
"message": "the minimum page should be 1",
})
return
}
if lines <= 0 || lines > maxCSVLines {
klog.Errorf("the number of lines read should be greater than zero and less than or equal to %d", maxCSVLines)
ctx.AbortWithStatusJSON(http.StatusBadGateway, gin.H{
"message": fmt.Sprintf("the number of lines read should be greater than zero and less than or equal to %d", maxCSVLines),
})
return
}
bucket = ctx.Query(bucketQuery)
bucketPath = ctx.Query(bucketPathQuery)
fileName = ctx.Query("fileName")

objectName := fmt.Sprintf("%s/%s", bucketPath, fileName)
source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client)
if err != nil {
klog.Errorf("failed to get system datasource error %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}

object, err := source.Client.GetObject(context.TODO(), bucket, objectName, minio.GetObjectOptions{})
if err != nil {
klog.Errorf("failed to get data, error is %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}
startLine := (page-1)*lines + 1
result, err := common.ReadCSV(object, startLine, lines)
if err != nil && err != io.EOF {
klog.Errorf("there is an error reading the csv file, the error is %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}
resp := ReadCSVResp{
Rows: result,
More: err == nil,
}
ctx.JSON(http.StatusOK, resp)
}

func RegisterMinIOAPI(group *gin.RouterGroup, conf gqlconfig.ServerConfig) {
c, err := client.GetClient(nil)
if err != nil {
Expand Down Expand Up @@ -598,5 +667,6 @@ func RegisterMinIOAPI(group *gin.RouterGroup, conf gqlconfig.ServerConfig) {
group.DELETE("/versioneddataset/files", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "delete", "versioneddatasets"), api.DeleteFiles)
group.GET("/versioneddataset/files/stat", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "versioneddatasets"), api.StatFile)
group.GET("/versioneddataset/files/download", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "versioneddatasets"), api.Download)
group.GET("/versioneddataset/files/csv", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "versioneddatasets"), api.ReadCSVLines)
}
}

0 comments on commit 340e454

Please sign in to comment.