Skip to content

Commit

Permalink
feat: add option authType for s3 storage
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc committed Feb 14, 2023
1 parent a6de948 commit a27c7cc
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 26 deletions.
4 changes: 2 additions & 2 deletions common/runtime/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package runtime

import (
"fmt"
"github.com/getsentry/sentry-go"

"github.com/getsentry/sentry-go"
"github.com/sirupsen/logrus"
"github.com/turt2live/matrix-media-repo/common/config"
"github.com/turt2live/matrix-media-repo/common/rcontext"
Expand Down Expand Up @@ -73,7 +73,7 @@ func LoadDatastores() {

err = s3.EnsureBucketExists()
if err != nil {
logrus.Warn("\t\tBucket does not exist!")
logrus.Warn("\t\tBucket does not exist!", err)
}

err = s3.EnsureTempPathExists()
Expand Down
4 changes: 3 additions & 1 deletion config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ datastores:
accessSecret: ""
ssl: true
bucketName: "your-media-bucket"
# Available auth types: static, env, iam, default to static
#authType: static
# An optional region for where this S3 endpoint is located. Typically not needed, though
# some providers will need this (like Scaleway). Uncomment to use.
#region: "sfo2"
Expand Down Expand Up @@ -597,4 +599,4 @@ sentry:
environment: ""

# Whether or not to turn on sentry's built in debugging. This will increase log output.
debug: false
debug: false
65 changes: 42 additions & 23 deletions storage/datastore/ds_s3/s3_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package ds_s3
import (
"fmt"
"io"
"io/ioutil"
"os"
"strconv"
"strings"

"github.com/minio/minio-go/v6"
"github.com/minio/minio-go/v6/pkg/credentials"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -39,35 +39,50 @@ func GetOrCreateS3Datastore(dsId string, conf config.DatastoreConfig) (*s3Datast

endpoint, epFound := conf.Options["endpoint"]
bucket, bucketFound := conf.Options["bucketName"]
authType, authTypeFound := conf.Options["authType"]
accessKeyId, keyFound := conf.Options["accessKeyId"]
accessSecret, secretFound := conf.Options["accessSecret"]
region, regionFound := conf.Options["region"]
region := conf.Options["region"]
tempPath, tempPathFound := conf.Options["tempPath"]
storageClass, storageClassFound := conf.Options["storageClass"]
if !epFound || !bucketFound || !keyFound || !secretFound {
return nil, errors.New("invalid configuration: missing s3 options")
if !epFound || !bucketFound {
return nil, errors.New("invalid configuration: missing s3 endpoint/bucket")
}
if !tempPathFound {
logrus.Warn("Datastore ", dsId, " (s3) does not have a tempPath set - this could lead to excessive memory usage by the media repo")
}
if !storageClassFound {
storageClass = "STANDARD"
}

useSsl := true
useSslStr, sslFound := conf.Options["ssl"]
if sslFound && useSslStr != "" {
useSsl, _ = strconv.ParseBool(useSslStr)
if !authTypeFound {
authType = "static"
}

var s3client *minio.Client
var err error
useSSL := true
useSSLStr, sslFound := conf.Options["ssl"]
if sslFound && useSSLStr != "" {
useSSL, _ = strconv.ParseBool(useSSLStr)
}

if regionFound {
s3client, err = minio.NewWithRegion(endpoint, accessKeyId, accessSecret, useSsl, region)
} else {
s3client, err = minio.New(endpoint, accessKeyId, accessSecret, useSsl)
var cred *credentials.Credentials
switch authType {
case "static":
if !keyFound || !secretFound {
return nil, errors.New("invalid configuration: missing s3 key/secret")
}
cred = credentials.NewStaticV4(accessKeyId, accessSecret, "")
case "env":
cred = credentials.NewEnvAWS()
case "iam":
cred = credentials.NewIAM("")
default:
return nil, errors.New("invalid configuration: unsupported s3 auth type")
}
s3client, err := minio.NewWithOptions(endpoint, &minio.Options{
Creds: cred,
Region: region,
Secure: useSSL,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -113,7 +128,7 @@ func ParseS3URL(s3url string) (string, string, string, error) {
func (s *s3Datastore) EnsureBucketExists() error {
found, err := s.client.BucketExists(s.bucket)
if err != nil {
return err
return errors.New("error checking if bucket exists: " + err.Error())
}
if !found {
return errors.New("bucket not found")
Expand Down Expand Up @@ -153,19 +168,20 @@ func (s *s3Datastore) UploadFile(file io.ReadCloser, expectedLength int64, ctx r
go func() {
defer ws3.Close()
ctx.Log.Info("Calculating hash of stream...")
hash, hashErr = util.GetSha256HashOfStream(ioutil.NopCloser(tr))
hash, hashErr = util.GetSha256HashOfStream(io.NopCloser(tr))
ctx.Log.Info("Hash of file is ", hash)
done <- true
}()

uploadOpts := minio.PutObjectOptions{StorageClass: s.storageClass}
go func() {
if expectedLength <= 0 {
if s.tempPath != "" {
ctx.Log.Info("Buffering file to temp path due to unknown file size")
var f *os.File
f, uploadErr = ioutil.TempFile(s.tempPath, "mr*")
f, uploadErr = os.CreateTemp(s.tempPath, "mr*")
if uploadErr != nil {
io.Copy(ioutil.Discard, rs3)
io.Copy(io.Discard, rs3)
done <- true
return
}
Expand All @@ -186,7 +202,7 @@ func (s *s3Datastore) UploadFile(file io.ReadCloser, expectedLength int64, ctx r
}
ctx.Log.Info("Uploading file...")
metrics.S3Operations.With(prometheus.Labels{"operation": "PutObject"}).Inc()
sizeBytes, uploadErr = s.client.PutObjectWithContext(ctx, s.bucket, objectName, rs3, expectedLength, minio.PutObjectOptions{StorageClass: s.storageClass})
sizeBytes, uploadErr = s.client.PutObject(s.bucket, objectName, rs3, expectedLength, uploadOpts)
ctx.Log.Info("Uploaded ", sizeBytes, " bytes to s3")
done <- true
}()
Expand Down Expand Up @@ -222,12 +238,14 @@ func (s *s3Datastore) DeleteObject(location string) error {
func (s *s3Datastore) DownloadObject(location string) (io.ReadCloser, error) {
logrus.Info("Downloading object from bucket ", s.bucket, ": ", location)
metrics.S3Operations.With(prometheus.Labels{"operation": "GetObject"}).Inc()
return s.client.GetObject(s.bucket, location, minio.GetObjectOptions{})
opts := minio.GetObjectOptions{}
return s.client.GetObject(s.bucket, location, opts)
}

func (s *s3Datastore) ObjectExists(location string) bool {
metrics.S3Operations.With(prometheus.Labels{"operation": "StatObject"}).Inc()
stat, err := s.client.StatObject(s.bucket, location, minio.StatObjectOptions{})
opts := minio.StatObjectOptions{}
stat, err := s.client.StatObject(s.bucket, location, opts)
if err != nil {
return false
}
Expand All @@ -237,7 +255,8 @@ func (s *s3Datastore) ObjectExists(location string) bool {
func (s *s3Datastore) OverwriteObject(location string, stream io.ReadCloser) error {
defer cleanup.DumpAndCloseStream(stream)
metrics.S3Operations.With(prometheus.Labels{"operation": "PutObject"}).Inc()
_, err := s.client.PutObject(s.bucket, location, stream, -1, minio.PutObjectOptions{StorageClass: s.storageClass})
opts := minio.PutObjectOptions{StorageClass: s.storageClass}
_, err := s.client.PutObject(s.bucket, location, stream, -1, opts)
return err
}

Expand Down

0 comments on commit a27c7cc

Please sign in to comment.