Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ublk #1245

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Ublk #1245

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions app/cmd/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ func ControllerCmd() cli.Command {
Name: "snapshot-max-size",
Usage: "Maximum total snapshot size in bytes or human readable 42kb, 42mb, 42gb",
},
cli.IntFlag{
Name: "frontend-queues",
Required: false,
Value: 1,
Usage: "Number of frontend queues , only available in ublk frontend",
},
},
Action: func(c *cli.Context) {
if err := startController(c); err != nil {
Expand Down Expand Up @@ -123,6 +129,7 @@ func startController(c *cli.Context) error {
dataServerProtocol := c.String("data-server-protocol")
fileSyncHTTPClientTimeout := c.Int("file-sync-http-client-timeout")
engineInstanceName := c.GlobalString("engine-instance-name")
frontendQueues := c.Int("frontend-queues")

size := c.String("size")
if size == "" {
Expand Down Expand Up @@ -175,7 +182,7 @@ func startController(c *cli.Context) error {

var frontend types.Frontend
if frontendName != "" {
f, err := controller.NewFrontend(frontendName, iscsiTargetRequestTimeout)
f, err := controller.NewFrontend(frontendName, iscsiTargetRequestTimeout, frontendQueues)
if err != nil {
return errors.Wrapf(err, "failed to find frontend: %s", frontendName)
}
Expand All @@ -195,7 +202,7 @@ func startController(c *cli.Context) error {
control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter,
salvageRequested, unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeoutShort,
engineReplicaTimeoutLong, types.DataServerProtocol(dataServerProtocol), fileSyncHTTPClientTimeout,
snapshotMaxCount, snapshotMaxSize)
snapshotMaxCount, snapshotMaxSize, frontendQueues)

// need to wait for Shutdown() completion
control.ShutdownWG.Add(1)
Expand Down
18 changes: 18 additions & 0 deletions package/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
zypper -n addrepo --refresh https://download.opensuse.org/repositories/network:/utilities/SLE_15_SP5/network:utilities.repo && \
zypper --gpg-auto-import-keys ref

RUN zypper -n install autoconf automake libtool gcc-c++

Check warning on line 12 in package/Dockerfile

View check run for this annotation

codefactor.io / CodeFactor

package/Dockerfile#L12

Specify version with `zypper install -y <package>=<version>`. (DL3037)

Check warning on line 12 in package/Dockerfile

View check run for this annotation

codefactor.io / CodeFactor

package/Dockerfile#L12

`zypper clean` missing after zypper use. (DL3036)

RUN zypper -n install cmake curl git gcc wget xsltproc docbook-xsl-stylesheets && \
rm -rf /var/cache/zypp/*

Expand All @@ -30,6 +32,22 @@
make; \
make install

# Build ubdsrv
ENV UBD_COMMIT_ID 19d3b2133baf1af8ae3a5fe300c962567fb7b0ce
RUN git clone --depth 1 --branch liburing-2.5 https://github.com/axboe/liburing.git /usr/src/liburing && \
cd /usr/src/liburing && \
./configure --cc=gcc --cxx=g++ && \
make -j$(nproc) && \
make installgit clone https://github.com/Kampadais/ubdsrv.git && \
cd ubdsrv && \
git checkout ${UBD_COMMIT_ID} && \
export LIBURING_CFLAGS="-I/usr/include/liburing" && \
export LIBURING_LIBS="-Lusr/lib/pkgconfig -luring"cd /usr/src && \
ls;autoreconf -i && \
./configure && \
make; \
make install
Comment on lines +41 to +49
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix syntax errors and ordering in ubdsrv build steps

There are syntax errors and misordered commands in the ubdsrv build steps. For example, missing && \ at the end of lines, and misplaced commands. This will cause the Docker build to fail.

Apply this diff to correct the build steps:

RUN git clone --depth 1 --branch liburing-2.5 https://github.com/axboe/liburing.git /usr/src/liburing && \
    cd /usr/src/liburing && \
    ./configure --cc=gcc --cxx=g++ && \
    make -j$(nproc) && \
    make install
- git clone https://github.com/Kampadais/ubdsrv.git && \
-     cd ubdsrv && \
+ && git clone https://github.com/Kampadais/ubdsrv.git /usr/src/ubdsrv && \
+ cd /usr/src/ubdsrv && \
    git checkout ${UBD_COMMIT_ID} && \
    export LIBURING_CFLAGS="-I/usr/include/liburing" && \
    export LIBURING_LIBS="-L/usr/lib/pkgconfig -luring" && \
-     cd /usr/src && \
-     ls;autoreconf -i && \
+ autoreconf -i && \
    ./configure && \
    make && \
    make install

Committable suggestion skipped: line range outside the PR's diff.


# Install grpc_health_probe
RUN wget https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/v0.4.28/grpc_health_probe-linux-${ARCH} -O /usr/local/bin/grpc_health_probe && \
chmod +x /usr/local/bin/grpc_health_probe
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type Controller struct {
lastExpansionError string

fileSyncHTTPClientTimeout int

frontendQueues int
}

const (
Expand All @@ -75,7 +77,7 @@ const (
func NewController(name string, factory types.BackendFactory, frontend types.Frontend, isUpgrade, disableRevCounter,
salvageRequested, unmapMarkSnapChainRemoved bool, iscsiTargetRequestTimeout, engineReplicaTimeoutShort,
engineReplicaTimeoutLong time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout,
snapshotMaxCount int, snapshotMaxSize int64) *Controller {
snapshotMaxCount int, snapshotMaxSize int64, frontendQueues int) *Controller {
c := &Controller{
factory: factory,
VolumeName: name,
Expand All @@ -95,6 +97,7 @@ func NewController(name string, factory types.BackendFactory, frontend types.Fro
DataServerProtocol: dataServerProtocol,

fileSyncHTTPClientTimeout: fileSyncHTTPClientTimeout,
frontendQueues: frontendQueues,
}
c.reset()
c.metricsStart()
Expand Down Expand Up @@ -578,7 +581,7 @@ func (c *Controller) StartFrontend(frontend string) error {
}
}

f, err := NewFrontend(frontend, c.iscsiTargetRequestTimeout)
f, err := NewFrontend(frontend, c.iscsiTargetRequestTimeout, c.frontendQueues)
if err != nil {
return errors.Wrapf(err, "failed to find frontend: %s", frontend)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/init_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/longhorn/longhorn-engine/pkg/frontend/rest"
"github.com/longhorn/longhorn-engine/pkg/frontend/socket"
"github.com/longhorn/longhorn-engine/pkg/frontend/tgt"
"github.com/longhorn/longhorn-engine/pkg/frontend/ublk"
"github.com/longhorn/longhorn-engine/pkg/types"
"github.com/sirupsen/logrus"
)
Expand All @@ -21,7 +22,7 @@ const (
maxEngineReplicaTimeout = 30 * time.Second
)

func NewFrontend(frontendType string, iscsiTargetRequestTimeout time.Duration) (types.Frontend, error) {
func NewFrontend(frontendType string, iscsiTargetRequestTimeout time.Duration, frontendQueues int) (types.Frontend, error) {
switch frontendType {
case "rest":
return rest.New(), nil
Expand All @@ -31,6 +32,8 @@ func NewFrontend(frontendType string, iscsiTargetRequestTimeout time.Duration) (
return tgt.New(devtypes.FrontendTGTBlockDev, defaultScsiTimeout, defaultIscsiAbortTimeout, iscsiTargetRequestTimeout), nil
case devtypes.FrontendTGTISCSI:
return tgt.New(devtypes.FrontendTGTISCSI, defaultScsiTimeout, defaultIscsiAbortTimeout, iscsiTargetRequestTimeout), nil
case "ublk":
return ublk.New(frontendQueues), nil
default:
return nil, fmt.Errorf("unsupported frontend type: %v", frontendType)
}
Expand Down
250 changes: 250 additions & 0 deletions pkg/frontend/ublk/frontend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package ublk

import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io"
"net"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"

"github.com/longhorn/longhorn-engine/pkg/dataconn"
"github.com/longhorn/longhorn-engine/pkg/types"
)

const (
frontendName = "ublk"

SocketDirectory = "/var/run"
DevPath = "/dev/longhorn/"
)

func New(frontendQueues int) *Ublk {
return &Ublk{Queues: frontendQueues}
}

type Ublk struct {
Volume string
Size int64
UblkID int
Queues int
QueueDepth int
BlockSize int
DaemonPId int

isUp bool
socketPath string
socketServer *dataconn.Server
}

func (u *Ublk) FrontendName() string {
return frontendName
}

func (u *Ublk) Init(name string, size, sectorSize int64) error {
u.Volume = name
u.Size = size

return nil
}

func (u *Ublk) StartUblk() error {

command := "add"
args := []string{"-t", "longhorn", "-f", u.socketPath, "-s", strconv.FormatInt(u.Size, 10), "-d", "32", "-q", strconv.Itoa(u.Queues)}

cmd := exec.Command("ublk", append([]string{command}, args...)...)

output, err := cmd.CombinedOutput()
if err != nil {
logrus.Error("Error starting ublk:", err)
return nil
}
Comment on lines +64 to +67
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect error handling in StartUblk

When cmd.CombinedOutput() returns an error, the function logs the error but returns nil, which may mislead the caller into thinking the operation succeeded. It's important to return the actual error to allow proper error handling by the caller.

Apply this diff to fix the error handling:

if err != nil {
    logrus.Error("Error starting ublk:", err)
-   return nil
+   return err
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if err != nil {
logrus.Error("Error starting ublk:", err)
return nil
}
if err != nil {
logrus.Error("Error starting ublk:", err)
return err
}


logrus.Info("ublk started successfully")

var jsonOutput map[string]interface{}
err = json.Unmarshal(output, &jsonOutput)

if err != nil {
return err
}

u.UblkID = int(jsonOutput["dev_id"].(float64))
u.DaemonPId = int(jsonOutput["daemon_pid"].(float64))
u.Queues = int(jsonOutput["nr_hw_queues"].(float64))
u.QueueDepth = int(jsonOutput["queue_depth"].(float64))
u.BlockSize = int(jsonOutput["block_size"].(float64))

Comment on lines +71 to +83
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential panic due to unchecked type assertions

The code uses unchecked type assertions like jsonOutput["dev_id"].(float64) without verifying if the key exists or if the type assertion succeeds. This can lead to a runtime panic if the expected keys are missing or of an unexpected type. It's safer to use the two-value form of type assertions and handle the case where the assertion fails.

Apply this diff to safely handle type assertions:

- u.UblkID = int(jsonOutput["dev_id"].(float64))
+ devIDValue, ok := jsonOutput["dev_id"]
+ if !ok {
+     return fmt.Errorf("'dev_id' not found in ublk output")
+ }
+ devID, ok := devIDValue.(float64)
+ if !ok {
+     return fmt.Errorf("unexpected type for 'dev_id' in ublk output")
+ }
+ u.UblkID = int(devID)

- u.DaemonPId = int(jsonOutput["daemon_pid"].(float64))
+ daemonPIDValue, ok := jsonOutput["daemon_pid"]
+ if !ok {
+     return fmt.Errorf("'daemon_pid' not found in ublk output")
+ }
+ daemonPID, ok := daemonPIDValue.(float64)
+ if !ok {
+     return fmt.Errorf("unexpected type for 'daemon_pid' in ublk output")
+ }
+ u.DaemonPId = int(daemonPID)

# Repeat the pattern for 'nr_hw_queues', 'queue_depth', and 'block_size'

Committable suggestion skipped: line range outside the PR's diff.

u.isUp = true
return nil
}

func (u *Ublk) Startup(rwu types.ReaderWriterUnmapperAt) error {
if err := u.startSocketServer(rwu); err != nil {
return err
}
go func() {
err := u.StartUblk()
if err != nil {
logrus.Errorf("Failed to start ublk: %v", err)
}
}()

return nil
}
func (u *Ublk) ShutdownUblk() {
comm := "ublk"
args := []string{"del", strconv.Itoa(u.UblkID)}

cmd := exec.Command(comm, args...)
logrus.Infof("Running command: %v", cmd.Args)
output, err := cmd.CombinedOutput()
if err != nil {
logrus.Errorf("Error stopping ublk: %v", err)
return
}
logrus.Infof("ublk stopped successfully: %v", string(output))
}

func (u *Ublk) Shutdown() error {
_, file, no, ok := runtime.Caller(1)
if ok {
logrus.Infof("\ncalled from %s#%d\n\n", file, no)
}
if u.Volume != "" {
if u.socketServer != nil {
logrus.Infof("Shutting down TGT socket server for %v", u.Volume)
u.socketServer.Stop()
u.socketServer = nil
}
}
u.isUp = false

go func() {
u.ShutdownUblk()
}()
Comment on lines +129 to +131
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Possible race condition in Shutdown method

Spawning a goroutine to call ShutdownUblk() without synchronization might lead to unexpected behavior, especially if the program exits before the goroutine completes. Consider calling ShutdownUblk() directly or synchronizing the goroutine to ensure proper shutdown.

Apply this diff to call ShutdownUblk() directly:

- go func() {
-     u.ShutdownUblk()
- }()
+ u.ShutdownUblk()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
go func() {
u.ShutdownUblk()
}()
u.ShutdownUblk()


return nil
}

func (u *Ublk) State() types.State {
if u.isUp {
return types.StateUp
}
return types.StateDown
}

func (u *Ublk) Endpoint() string {
if u.isUp {
return u.GetSocketPath()
}
return ""
}

func (u *Ublk) GetSocketPath() string {
if u.Volume == "" {
panic("Invalid volume name")
}
Comment on lines +151 to +153
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid using panic for control flow

The GetSocketPath method calls panic("Invalid volume name") if u.Volume is empty. Using panic for control flow is discouraged as it can crash the program. Instead, return an error to be handled appropriately.

Apply this diff to return an error instead of panicking:

func (u *Ublk) GetSocketPath() (string, error) {
    if u.Volume == "" {
-       panic("Invalid volume name")
+       return "", fmt.Errorf("invalid volume name")
    }
    return filepath.Join(SocketDirectory, "longhorn-"+u.Volume+".sock"), nil
}

Update callers to handle the error:

- socketPath := u.GetSocketPath()
+ socketPath, err := u.GetSocketPath()
+ if err != nil {
+     return err
+ }

Committable suggestion skipped: line range outside the PR's diff.

return filepath.Join(SocketDirectory, "longhorn-"+u.Volume+".sock")
}

func (u *Ublk) startSocketServer(rwu types.ReaderWriterUnmapperAt) error {
socketPath := u.GetSocketPath()
if err := os.MkdirAll(filepath.Dir(socketPath), 0700); err != nil {
return errors.Wrapf(err, "cannot create directory %v", filepath.Dir(socketPath))
}

if st, err := os.Stat(socketPath); err == nil && !st.IsDir() {
if err := os.Remove(socketPath); err != nil {
return err
}
}

u.socketPath = socketPath
go func() {
err := u.startSocketServerListen(rwu)
if err != nil {
logrus.Errorf("Failed to start socket server: %v", err)
}
}()
return nil
}

func (u *Ublk) startSocketServerListen(rwu types.ReaderWriterUnmapperAt) error {
ln, err := net.Listen("unix", u.socketPath)
if err != nil {
return err
}
defer func(ln net.Listener) {
err := ln.Close()
if err != nil {
logrus.WithError(err).Error("Failed to close socket listener")
}
}(ln)

for {
conn, err := ln.Accept()
if err != nil {
logrus.WithError(err).Error("Failed to accept socket connection")
continue
}
go u.handleServerConnection(conn, rwu)
}
}

func (u *Ublk) handleServerConnection(c net.Conn, rwu types.ReaderWriterUnmapperAt) {
defer func(c net.Conn) {
err := c.Close()
if err != nil {
logrus.WithError(err).Error("Failed to close socket server connection")
}
}(c)

server := dataconn.NewServer(c, NewDataProcessorWrapper(rwu))
logrus.Info("New data socket connection established")
if err := server.Handle(); err != nil && err != io.EOF {
logrus.WithError(err).Errorf("Failed to handle socket server connection")
} else if err == io.EOF {
logrus.Warn("Socket server connection closed")
}
}

type DataProcessorWrapper struct {
rwu types.ReaderWriterUnmapperAt
}

func NewDataProcessorWrapper(rwu types.ReaderWriterUnmapperAt) DataProcessorWrapper {
return DataProcessorWrapper{
rwu: rwu,
}
}

func (d DataProcessorWrapper) ReadAt(p []byte, off int64) (n int, err error) {
return d.rwu.ReadAt(p, off)
}

func (d DataProcessorWrapper) WriteAt(p []byte, off int64) (n int, err error) {
return d.rwu.WriteAt(p, off)
}

func (d DataProcessorWrapper) UnmapAt(length uint32, off int64) (n int, err error) {
return d.rwu.UnmapAt(length, off)
}

func (d DataProcessorWrapper) PingResponse() error {
return nil
}

func (u *Ublk) Upgrade(name string, size, sectorSize int64, rwu types.ReaderWriterUnmapperAt) error {
return fmt.Errorf("upgrade is not supported")
}

func (u *Ublk) Expand(size int64) error {
return fmt.Errorf("expand is not supported")
}
Loading