From f383b08f925dd214e9cf686063a2bc514dc436d0 Mon Sep 17 00:00:00 2001 From: Ivan Tivonenko Date: Sat, 26 Jun 2021 13:27:33 +0300 Subject: [PATCH] add 'timeout' parameter to the OSSession.SaveData method --- core/capabilities_test.go | 11 +++++++---- core/livepeernode_test.go | 6 +++--- core/orchestrator.go | 2 +- core/playlistmanager.go | 2 +- core/playlistmanager_test.go | 2 +- drivers/drivers.go | 6 +++--- drivers/gs.go | 9 ++++++--- drivers/local.go | 3 ++- drivers/local_test.go | 8 ++++---- drivers/s3.go | 25 +++++++++++++++---------- drivers/session_mock.go | 3 ++- server/broadcast.go | 8 ++++---- server/broadcast_test.go | 6 +++--- server/mediaserver.go | 4 ++-- server/recordings_test.go | 16 ++++++++-------- server/segment_rpc.go | 2 +- server/segment_rpc_test.go | 6 +++--- verification/verify_test.go | 2 +- 18 files changed, 67 insertions(+), 54 deletions(-) diff --git a/core/capabilities_test.go b/core/capabilities_test.go index 4c636a92cb..030f498c9f 100644 --- a/core/capabilities_test.go +++ b/core/capabilities_test.go @@ -4,6 +4,7 @@ import ( "context" "sort" "testing" + "time" "github.com/livepeer/go-livepeer/drivers" "github.com/livepeer/go-livepeer/net" @@ -270,10 +271,12 @@ func (os *stubOS) GetInfo() *net.OSInfo { } return &net.OSInfo{StorageType: net.OSInfo_StorageType(os.storageType)} } -func (os *stubOS) EndSession() {} -func (os *stubOS) SaveData(string, []byte, map[string]string) (string, error) { return "", nil } -func (os *stubOS) IsExternal() bool { return false } -func (os *stubOS) IsOwn(url string) bool { return true } +func (os *stubOS) EndSession() {} +func (os *stubOS) SaveData(string, []byte, map[string]string, time.Duration) (string, error) { + return "", nil +} +func (os *stubOS) IsExternal() bool { return false } +func (os *stubOS) IsOwn(url string) bool { return true } func (os *stubOS) ListFiles(ctx context.Context, prefix, delim string) (drivers.PageInfo, error) { return nil, nil } diff --git a/core/livepeernode_test.go b/core/livepeernode_test.go index 83feba6f9f..23a854e143 100644 --- a/core/livepeernode_test.go +++ b/core/livepeernode_test.go @@ -130,7 +130,7 @@ func TestServiceURIChange(t *testing.T) { drivers.NodeStorage = drivers.NewMemoryDriver(n.GetServiceURI()) sesh := drivers.NodeStorage.NewSession("testpath") - savedUrl, err := sesh.SaveData("testdata1", []byte{0, 0, 0}, nil) + savedUrl, err := sesh.SaveData("testdata1", []byte{0, 0, 0}, nil, 0) require.Nil(err) assert.Equal("test://testurl.com/stream/testpath/testdata1", savedUrl) @@ -138,7 +138,7 @@ func TestServiceURIChange(t *testing.T) { newUrl, err := url.Parse("test://newurl.com") n.SetServiceURI(newUrl) require.Nil(err) - furl, err := sesh.SaveData("testdata2", []byte{0, 0, 0}, nil) + furl, err := sesh.SaveData("testdata2", []byte{0, 0, 0}, nil, 0) require.Nil(err) assert.Equal("test://newurl.com/stream/testpath/testdata2", furl) @@ -146,7 +146,7 @@ func TestServiceURIChange(t *testing.T) { secondUrl, err := url.Parse("test://secondurl.com") n.SetServiceURI(secondUrl) require.Nil(err) - surl, err := sesh.SaveData("testdata3", []byte{0, 0, 0}, nil) + surl, err := sesh.SaveData("testdata3", []byte{0, 0, 0}, nil, 0) require.Nil(err) assert.Equal("test://secondurl.com/stream/testpath/testdata3", surl) } diff --git a/core/orchestrator.go b/core/orchestrator.go index 9734c4a6d2..a249bf10ab 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -537,7 +537,7 @@ func (n *LivepeerNode) transcodeSeg(config transcodeConfig, seg *stream.HLSSegme // Need to store segment in our local OS var err error name := fmt.Sprintf("%d.tempfile", seg.SeqNo) - url, err = config.LocalOS.SaveData(name, seg.Data, nil) + url, err = config.LocalOS.SaveData(name, seg.Data, nil, 0) if err != nil { return terr(err) } diff --git a/core/playlistmanager.go b/core/playlistmanager.go index 77348d9fc8..11a12b5e20 100644 --- a/core/playlistmanager.go +++ b/core/playlistmanager.go @@ -257,7 +257,7 @@ func (mgr *BasicPlaylistManager) FlushRecord() { } go func(name string, data []byte) { now := time.Now() - _, err := mgr.recordSession.SaveData(name, b, nil) + _, err := mgr.recordSession.SaveData(name, b, nil, 0) took := time.Since(now) if err != nil { glog.Errorf("Error saving json playlist name=%s bytes=%d took=%s err=%v", name, diff --git a/core/playlistmanager_test.go b/core/playlistmanager_test.go index 150a551a89..c72bdf0b3b 100644 --- a/core/playlistmanager_test.go +++ b/core/playlistmanager_test.go @@ -281,7 +281,7 @@ func TestCleanup(t *testing.T) { testData := []byte{1, 2, 3, 4} c := NewBasicPlaylistManager(mid, osSession, nil) - uri, err := c.GetOSSession().SaveData("testName", testData, nil) + uri, err := c.GetOSSession().SaveData("testName", testData, nil, 0) if err != nil { t.Fatal(err) } diff --git a/drivers/drivers.go b/drivers/drivers.go index 0ced9e037f..d47bb305f4 100644 --- a/drivers/drivers.go +++ b/drivers/drivers.go @@ -62,7 +62,7 @@ type PageInfo interface { type OSSession interface { OS() OSDriver - SaveData(name string, data []byte, meta map[string]string) (string, error) + SaveData(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) EndSession() // Info in order to have this session used via RPC @@ -187,12 +187,12 @@ func ParseOSURL(input string, useFullAPI bool) (OSDriver, error) { // SaveRetried tries to SaveData specified number of times func SaveRetried(sess OSSession, name string, data []byte, meta map[string]string, retryCount int) (string, error) { if retryCount < 1 { - return "", fmt.Errorf("Invalid retry count %d", retryCount) + return "", fmt.Errorf("invalid retry count %d", retryCount) } var uri string var err error for i := 0; i < retryCount; i++ { - uri, err = sess.SaveData(name, data, meta) + uri, err = sess.SaveData(name, data, meta, 0) if err == nil { return uri, err } diff --git a/drivers/gs.go b/drivers/gs.go index 8893df2b53..4d847b8ea7 100644 --- a/drivers/gs.go +++ b/drivers/gs.go @@ -153,7 +153,7 @@ func (os *gsSession) createClient() error { return nil } -func (os *gsSession) SaveData(name string, data []byte, meta map[string]string) (string, error) { +func (os *gsSession) SaveData(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) { if os.useFullAPI { if os.client == nil { if err := os.createClient(); err != nil { @@ -163,7 +163,10 @@ func (os *gsSession) SaveData(name string, data []byte, meta map[string]string) keyname := os.key + "/" + name objh := os.client.Bucket(os.bucket).Object(keyname) glog.V(common.VERBOSE).Infof("Saving to GS %s/%s", os.bucket, keyname) - ctx, cancel := context.WithTimeout(context.Background(), saveTimeout) + if timeout == 0 { + timeout = saveTimeout + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() wr := objh.NewWriter(ctx) if len(meta) > 0 && wr.Metadata == nil { @@ -185,7 +188,7 @@ func (os *gsSession) SaveData(name string, data []byte, meta map[string]string) glog.V(common.VERBOSE).Infof("Saved to GS %s", uri) return uri, err } - return os.s3Session.SaveData(name, data, meta) + return os.s3Session.SaveData(name, data, meta, timeout) } type gsPageInfo struct { diff --git a/drivers/local.go b/drivers/local.go index 3de88bd4e7..4721e905c1 100644 --- a/drivers/local.go +++ b/drivers/local.go @@ -10,6 +10,7 @@ import ( "path" "strings" "sync" + "time" "github.com/livepeer/go-livepeer/net" ) @@ -194,7 +195,7 @@ func (ostore *MemorySession) GetInfo() *net.OSInfo { return nil } -func (ostore *MemorySession) SaveData(name string, data []byte, meta map[string]string) (string, error) { +func (ostore *MemorySession) SaveData(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) { path, file := path.Split(ostore.getAbsolutePath(name)) ostore.dLock.Lock() diff --git a/drivers/local_test.go b/drivers/local_test.go index a3fe91df4a..d1320dcb69 100644 --- a/drivers/local_test.go +++ b/drivers/local_test.go @@ -30,17 +30,17 @@ func TestLocalOS(t *testing.T) { assert.NoError((err)) os := NewMemoryDriver(u) sess := os.NewSession(("sesspath")).(*MemorySession) - path, err := sess.SaveData("name1/1.ts", copyBytes(tempData1), nil) + path, err := sess.SaveData("name1/1.ts", copyBytes(tempData1), nil, 0) glog.Info(path) fmt.Println(path) assert.Equal("fake.com/url/stream/sesspath/name1/1.ts", path) data := sess.GetData("sesspath/name1/1.ts") fmt.Printf("got Data: '%s'\n", data) assert.Equal(tempData1, string(data)) - path, err = sess.SaveData("name1/1.ts", copyBytes(tempData2), nil) + path, err = sess.SaveData("name1/1.ts", copyBytes(tempData2), nil, 0) data = sess.GetData("sesspath/name1/1.ts") assert.Equal(tempData2, string(data)) - path, err = sess.SaveData("name1/2.ts", copyBytes(tempData3), nil) + path, err = sess.SaveData("name1/2.ts", copyBytes(tempData3), nil, 0) data = sess.GetData("sesspath/name1/2.ts") assert.Equal(tempData3, string(data)) // Test trim prefix when baseURI != nil @@ -55,7 +55,7 @@ func TestLocalOS(t *testing.T) { // Test trim prefix when baseURI = nil os = NewMemoryDriver(nil) sess = os.NewSession("sesspath").(*MemorySession) - path, err = sess.SaveData("name1/1.ts", copyBytes(tempData1), nil) + path, err = sess.SaveData("name1/1.ts", copyBytes(tempData1), nil, 0) assert.Nil(err) assert.Equal("/stream/sesspath/name1/1.ts", path) diff --git a/drivers/s3.go b/drivers/s3.go index 41fd6165d9..9e7f6d2a45 100644 --- a/drivers/s3.go +++ b/drivers/s3.go @@ -273,7 +273,7 @@ func (os *s3Session) ReadData(ctx context.Context, name string) (*FileInfoReader return res, nil } -func (os *s3Session) saveDataPut(name string, data []byte, meta map[string]string) (string, error) { +func (os *s3Session) saveDataPut(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) { now := time.Now() bucket := aws.String(os.bucket) keyname := aws.String(os.key + "/" + name) @@ -294,7 +294,10 @@ func (os *s3Session) saveDataPut(name string, data []byte, meta map[string]strin ContentType: contentType, ContentLength: aws.Int64(int64(len(data))), } - ctx, cancel := context.WithTimeout(context.Background(), saveTimeout) + if timeout == 0 { + timeout = saveTimeout + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) resp, err := os.s3svc.PutObjectWithContext(ctx, params, request.WithLogLevel(aws.LogDebug)) cancel() if err != nil { @@ -306,14 +309,14 @@ func (os *s3Session) saveDataPut(name string, data []byte, meta map[string]strin return uri, err } -func (os *s3Session) SaveData(name string, data []byte, meta map[string]string) (string, error) { +func (os *s3Session) SaveData(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) { if os.s3svc != nil { - return os.saveDataPut(name, data, meta) + return os.saveDataPut(name, data, meta, timeout) } // tentativeUrl just used for logging tentativeURL := path.Join(os.host, os.key, name) glog.V(common.VERBOSE).Infof("Saving to S3 %s", tentativeURL) - path, err := os.postData(name, data, meta) + path, err := os.postData(name, data, meta, timeout) if err != nil { // handle error glog.Errorf("Save S3 error: %v", err) @@ -358,7 +361,7 @@ func (os *s3Session) getContentType(fileName string, buffer []byte) string { } // if s3 storage is not our own, we are saving data into it using POST request -func (os *s3Session) postData(fileName string, buffer []byte, meta map[string]string) (string, error) { +func (os *s3Session) postData(fileName string, buffer []byte, meta map[string]string, timeout time.Duration) (string, error) { fileBytes := bytes.NewReader(buffer) fileType := os.getContentType(fileName, buffer) path, fileName := path.Split(path.Join(os.key, fileName)) @@ -375,7 +378,7 @@ func (os *s3Session) postData(fileName string, buffer []byte, meta map[string]st if !strings.Contains(postURL, os.bucket) { postURL += "/" + os.bucket } - req, cancel, err := newfileUploadRequest(postURL, fields, fileBytes, fileName) + req, cancel, err := newfileUploadRequest(postURL, fields, fileBytes, fileName, timeout) if err != nil { glog.Error(err) return "", err @@ -446,7 +449,7 @@ func createPolicy(key, bucket, region, secret, path string) (string, string, str return policy, signString(policy, region, xAmzDate, secret), xAmzCredential, xAmzDate + "T000000Z" } -func newfileUploadRequest(uri string, params map[string]string, fData io.Reader, fileName string) (*http.Request, context.CancelFunc, error) { +func newfileUploadRequest(uri string, params map[string]string, fData io.Reader, fileName string, timeout time.Duration) (*http.Request, context.CancelFunc, error) { glog.Infof("Posting data to %s (params %+v)", uri, params) body := &bytes.Buffer{} writer := multipart.NewWriter(body) @@ -466,8 +469,10 @@ func newfileUploadRequest(uri string, params map[string]string, fData io.Reader, if err != nil { return nil, nil, err } - - ctx, cancel := context.WithTimeout(context.Background(), saveTimeout) + if timeout == 0 { + timeout = saveTimeout + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) req, err := http.NewRequestWithContext(ctx, "POST", uri, body) if err != nil { cancel() diff --git a/drivers/session_mock.go b/drivers/session_mock.go index bf370bd7f6..49a089f554 100644 --- a/drivers/session_mock.go +++ b/drivers/session_mock.go @@ -2,6 +2,7 @@ package drivers import ( "context" + "time" "github.com/livepeer/go-livepeer/net" "github.com/stretchr/testify/mock" @@ -11,7 +12,7 @@ type MockOSSession struct { mock.Mock } -func (s *MockOSSession) SaveData(name string, data []byte, meta map[string]string) (string, error) { +func (s *MockOSSession) SaveData(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) { args := s.Called() return args.String(0), args.Error(1) } diff --git a/server/broadcast.go b/server/broadcast.go index 0c6fe912e6..ae0dd108cc 100644 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -418,7 +418,7 @@ func processSegment(cxn *rtmpConnection, seg *stream.HLSSegment) ([]string, erro } }() } - uri, err := cpl.GetOSSession().SaveData(name, seg.Data, nil) + uri, err := cpl.GetOSSession().SaveData(name, seg.Data, nil, 0) if err != nil { glog.Errorf("Error saving segment nonce=%d seqNo=%d: %v", nonce, seg.SeqNo, err) if monitor.Enabled { @@ -498,7 +498,7 @@ func transcodeSegment(cxn *rtmpConnection, seg *stream.HLSSegment, name string, // storage the orchestrator prefers if ios := sess.OrchestratorOS; ios != nil { // XXX handle case when orch expects direct upload - uri, err := ios.SaveData(name, seg.Data, nil) + uri, err := ios.SaveData(name, seg.Data, nil, 0) if err != nil { glog.Errorf("Error saving segment to OS nonce=%d seqNo=%d: %v", nonce, seg.SeqNo, err) if monitor.Enabled { @@ -686,7 +686,7 @@ func transcodeSegment(cxn *rtmpConnection, seg *stream.HLSSegment, name string, return } name := fmt.Sprintf("%s/%d%s", profile.Name, seg.SeqNo, ext) - newURL, err := bos.SaveData(name, data, nil) + newURL, err := bos.SaveData(name, data, nil, 0) if err != nil { switch err.Error() { case "Session ended": @@ -830,7 +830,7 @@ func verify(verifier *verification.SegmentVerifier, cxn *rtmpConnection, // Hence, trim the /stream/ prefix if it exists. pfx := fmt.Sprintf("/stream/%s/", sess.Params.ManifestID) uri := strings.TrimPrefix(accepted.URIs[i], pfx) - _, err := sess.BroadcasterOS.SaveData(uri, data, nil) + _, err := sess.BroadcasterOS.SaveData(uri, data, nil, 0) if err != nil { return err } diff --git a/server/broadcast_test.go b/server/broadcast_test.go index bc1b82d934..754fa0734d 100644 --- a/server/broadcast_test.go +++ b/server/broadcast_test.go @@ -114,7 +114,7 @@ type stubOSSession struct { err error } -func (s *stubOSSession) SaveData(name string, data []byte, meta map[string]string) (string, error) { +func (s *stubOSSession) SaveData(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) { s.saved = append(s.saved, name) return "saved_" + name, s.err } @@ -1437,7 +1437,7 @@ func TestVerifier_Verify(t *testing.T) { } mem, ok := drivers.NewMemoryDriver(nil).NewSession("streamName").(*drivers.MemorySession) assert.True(ok) - name, err := mem.SaveData("/rendition/seg/1", []byte("attempt1"), nil) + name, err := mem.SaveData("/rendition/seg/1", []byte("attempt1"), nil, 0) assert.Nil(err) assert.Equal([]byte("attempt1"), mem.GetData(name)) sess.BroadcasterOS = mem @@ -1449,7 +1449,7 @@ func TestVerifier_Verify(t *testing.T) { // Now "insert" 2nd attempt into OS // and ensure 1st attempt is what remains after verification - _, err = mem.SaveData("/rendition/seg/1", []byte("attempt2"), nil) + _, err = mem.SaveData("/rendition/seg/1", []byte("attempt2"), nil, 0) assert.Nil(err) assert.Equal([]byte("attempt2"), mem.GetData(name)) renditionData = [][]byte{[]byte("attempt2")} diff --git a/server/mediaserver.go b/server/mediaserver.go index 1f55193fd1..0ecb0ab802 100644 --- a/server/mediaserver.go +++ b/server/mediaserver.go @@ -1390,7 +1390,7 @@ func (s *LivepeerServer) HandleRecordings(w http.ResponseWriter, r *http.Request mainJspl.AddSegmentsToMPL(manifests, trackName, mpl, resp.RecordObjectStoreURL) fileName := trackName + ".m3u8" nows := time.Now() - _, err = sess.SaveData(fileName, mpl.Encode().Bytes(), nil) + _, err = sess.SaveData(fileName, mpl.Encode().Bytes(), nil, 0) glog.V(common.VERBOSE).Infof("Saving playlist fileName=%s for manifestID=%s took=%s", fileName, manifestID, time.Since(nows)) if err != nil { glog.Error(err) @@ -1399,7 +1399,7 @@ func (s *LivepeerServer) HandleRecordings(w http.ResponseWriter, r *http.Request } } nows := time.Now() - _, err = sess.SaveData("index.m3u8", masterPList.Encode().Bytes(), nil) + _, err = sess.SaveData("index.m3u8", masterPList.Encode().Bytes(), nil, 0) glog.V(common.VERBOSE).Infof("Saving playlist fileName=%s for manifestID=%s took=%s", "index.m3u8", manifestID, time.Since(nows)) if err != nil { glog.Error(err) diff --git a/server/recordings_test.go b/server/recordings_test.go index da2bab09e8..7d1c942ec3 100644 --- a/server/recordings_test.go +++ b/server/recordings_test.go @@ -61,15 +61,15 @@ func TestRecordingHandler(t *testing.T) { profile := ffmpeg.P144p25fps16x9 jpl.InsertHLSSegment(&profile, 1, "sess1/testNode/P144p25fps16x9/1.ts", 2100) bjpl, _ := json.Marshal(jpl) - msess1.SaveData("testNode/playlist_1.json", bjpl, nil) + msess1.SaveData("testNode/playlist_1.json", bjpl, nil, 0) jpl = core.NewJSONPlaylist() jpl.InsertHLSSegment(&profile, 2, "sess2/testNode/P144p25fps16x9/2.ts", 2100) bjpl, _ = json.Marshal(jpl) - msess2.SaveData("testNode/playlist_2.json", bjpl, nil) + msess2.SaveData("testNode/playlist_2.json", bjpl, nil, 0) jpl = core.NewJSONPlaylist() jpl.InsertHLSSegment(&profile, 1, "sess3/testNode/P144p25fps16x9/3.ts", 2100) bjpl, _ = json.Marshal(jpl) - msess3.SaveData("testNode/playlist_3.json", bjpl, nil) + msess3.SaveData("testNode/playlist_3.json", bjpl, nil, 0) resp = makeReq("GET", "/recordings/sess3/P144p25fps16x9.m3u8") body, _ := ioutil.ReadAll(resp.Body) @@ -121,7 +121,7 @@ func TestRecording(t *testing.T) { mos := drivers.TestMemoryStorages["recstore4"] msess := mos.NewSession("sess1") - msess.SaveData("testNode/source/1.ts", []byte("segmentdata"), nil) + msess.SaveData("testNode/source/1.ts", []byte("segmentdata"), nil, 0) resp = makeReq("GET", "/live/sess1/testNode/source/1.ts") body, _ := ioutil.ReadAll(resp.Body) @@ -134,12 +134,12 @@ func TestRecording(t *testing.T) { jpl.InsertHLSSegment(&profile, 1, "testNode/P144p25fps16x9/1.ts", 2100) bjpl, err := json.Marshal(jpl) assert.Nil(err) - msess.SaveData("testNode/playlist_1.json", bjpl, nil) + msess.SaveData("testNode/playlist_1.json", bjpl, nil, 0) jpl = core.NewJSONPlaylist() jpl.InsertHLSSegment(&profile, 2, "testNode/P144p25fps16x9/2.ts", 2100) bjpl, err = json.Marshal(jpl) assert.Nil(err) - msess.SaveData("testNode/playlist_2.json", bjpl, nil) + msess.SaveData("testNode/playlist_2.json", bjpl, nil, 0) resp = makeReq("GET", "/live/sess1/index.m3u8") body, _ = ioutil.ReadAll(resp.Body) @@ -170,12 +170,12 @@ func TestRecording(t *testing.T) { jpl.InsertHLSSegment(&profile, 3, "testNode/P144p25fps16x9/3.ts", 2100) bjpl, err = json.Marshal(jpl) assert.Nil(err) - msess.SaveData("testNode/playlist_1.json", bjpl, nil) + msess.SaveData("testNode/playlist_1.json", bjpl, nil, 0) jpl = core.NewJSONPlaylist() jpl.InsertHLSSegment(&profile, 4, "testNode/P144p25fps16x9/4.ts", 2450) bjpl, err = json.Marshal(jpl) assert.Nil(err) - msess.SaveData("testNode/playlist_2.json", bjpl, nil) + msess.SaveData("testNode/playlist_2.json", bjpl, nil, 0) resp = makeReq("GET", "/live/sess2/P144p25fps16x9.m3u8?finalize=false") body, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() diff --git a/server/segment_rpc.go b/server/segment_rpc.go index 6c09f66494..00f45a8a1a 100644 --- a/server/segment_rpc.go +++ b/server/segment_rpc.go @@ -182,7 +182,7 @@ func (h *lphttp) ServeSegment(w http.ResponseWriter, r *http.Request) { } name := fmt.Sprintf("%s/%d%s", segData.Profiles[i].Name, segData.Seq, ext) // The use of := here is probably a bug?!? - uri, err := res.OS.SaveData(name, res.TranscodeData.Segments[i].Data, nil) + uri, err := res.OS.SaveData(name, res.TranscodeData.Segments[i].Data, nil, 0) if err != nil { glog.Error("Could not upload segment ", segData.Seq) break diff --git a/server/segment_rpc_test.go b/server/segment_rpc_test.go index f251344ec6..e6d6ab7503 100644 --- a/server/segment_rpc_test.go +++ b/server/segment_rpc_test.go @@ -764,7 +764,7 @@ func TestServeSegment_OSSaveDataError(t *testing.T) { mos := &drivers.MockOSSession{} - mos.On("SaveData", mock.Anything, mock.Anything).Return("", errors.New("SaveData error")) + mos.On("SaveData", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", errors.New("SaveData error")) tData := &core.TranscodeData{Segments: []*core.TranscodedSegmentData{{Data: []byte("foo")}}} tRes := &core.TranscodeResult{ @@ -1561,8 +1561,8 @@ func TestServeSegment_DebitFees_OSSaveDataError_BreakLoop(t *testing.T) { } orch.On("TranscodeSeg", md, seg).Return(tRes, nil) - mos.On("SaveData", mock.Anything, mock.Anything).Return("720pdotcom", nil).Once() - mos.On("SaveData", mock.Anything, mock.Anything).Return("", errors.New("SaveData error")).Once() + mos.On("SaveData", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("720pdotcom", nil).Once() + mos.On("SaveData", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", errors.New("SaveData error")).Once() orch.On("DebitFees", mock.Anything, core.ManifestID(s.OrchestratorInfo.AuthToken.SessionId), mock.Anything, tData720.Pixels) diff --git a/verification/verify_test.go b/verification/verify_test.go index e7daf35d77..e320c1ad6d 100644 --- a/verification/verify_test.go +++ b/verification/verify_test.go @@ -333,7 +333,7 @@ func TestVerifyPixels(t *testing.T) { bos := drivers.NewMemoryDriver(nil).NewSession("foo") data, err := ioutil.ReadFile("../server/test.flv") require.Nil(err) - fname, err := bos.SaveData("test.ts", data, nil) + fname, err := bos.SaveData("test.ts", data, nil, 0) require.Nil(err) memOS, ok := bos.(*drivers.MemorySession) require.True(ok)