Skip to content

Commit

Permalink
Adding signal when chunk.Client closes
Browse files Browse the repository at this point in the history
  • Loading branch information
John Doak authored and John Doak committed Feb 7, 2021
1 parent 710a8cd commit b7b2acb
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions ipc/uds/highlevel/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ type Client struct {
writeVarInt []byte
readVarInt []byte

maxSize int64
biggestReadMsgSize int
biggestWriteMsgSize int
maxSize int64

closeOnce sync.Once
closed chan struct{}

readMu, writeMu sync.Mutex
}
Expand Down Expand Up @@ -125,10 +126,12 @@ func New(rwc io.ReadWriteCloser, options ...Option) (*Client, error) {
client = &Client{
rwc: rwc,
writeVarInt: make([]byte, 8),
closed: make(chan struct{}),
}
default:
return nil, fmt.Errorf("rwc was not a *uds.Client or *uds.Server, was %T", rwc)
}

for _, o := range options {
o(client)
}
Expand All @@ -148,8 +151,18 @@ func New(rwc io.ReadWriteCloser, options ...Option) (*Client, error) {
return client, nil
}

// ClosedSignal returns a channel that will be closed when this Client becomes closed.
func (c *Client) ClosedSignal() chan struct{} {
return c.closed
}

// Close closes the underlying io.ReadWriteCloser.
func (c *Client) Close() error {
defer c.closeOnce.Do(
func() {
close(c.closed)
},
)
return c.rwc.Close()
}

Expand All @@ -164,20 +177,20 @@ const (
oneMiB = 1000 * 1024
)

// Read reads the next message from the socket.
// Read reads the next message from the socket. Any error closes the client.
func (c *Client) Read() (*[]byte, error) {
c.readMu.Lock()
defer c.readMu.Unlock()

size, err := binary.ReadVarint(c.rwc.(io.ByteReader))
if err != nil {
c.rwc.Close()
c.Close()
return nil, err
}

if c.maxSize > 0 {
if size > c.maxSize {
c.rwc.Close()
c.Close()
return nil, fmt.Errorf("message is larger than maximum size allowed")
}
}
Expand All @@ -193,7 +206,7 @@ func (c *Client) Read() (*[]byte, error) {
return b, nil
}

// Write writes b as a chunk into the socket.
// Write writes b as a chunk into the socket. Any error closes the client.
func (c *Client) Write(b []byte) error {
c.writeMu.Lock()
defer c.writeMu.Unlock()
Expand All @@ -211,7 +224,7 @@ func (c *Client) Write(b []byte) error {
n := binary.PutVarint(c.writeVarInt, int64(len(b)))
_, err := c.rwc.Write(c.writeVarInt[:n])
if err != nil {
c.rwc.Close()
c.Close()
return err
}
n, _ = c.rwc.Write(b)
Expand Down

0 comments on commit b7b2acb

Please sign in to comment.