From b09bcb77969c61c9519213f17aeb41d9464e5645 Mon Sep 17 00:00:00 2001 From: Jan Kryl Date: Tue, 1 May 2018 00:35:22 +0200 Subject: [PATCH] [cstor#125] Adding an option for zfs create command to take IP address as input Signed-off-by: Jan Kryl --- cmd/uzfs_test/zrepl_utest.c | 13 +- cmd/zrepl/Makefile.am | 6 +- cmd/zrepl/data_conn.c | 395 +++++++++++ cmd/zrepl/data_conn.h | 46 ++ cmd/zrepl/mgmt_conn.c | 928 ++++++++++++++++++++++++++ cmd/zrepl/mgmt_conn.h | 36 + cmd/zrepl/zrepl.c | 893 +------------------------ include/uzfs_io.h | 1 + include/zrepl_mgmt.h | 32 +- include/zrepl_prot.h | 5 +- lib/libzpool/uzfs_io.c | 2 - lib/libzpool/uzfs_mgmt.c | 6 +- lib/libzpool/zrepl_mgmt.c | 20 +- module/zfs/zfs_ioctl.c | 2 +- tests/cbtest/gtest/test_zrepl_prot.cc | 473 ++++++++++--- 15 files changed, 1858 insertions(+), 1000 deletions(-) create mode 100644 cmd/zrepl/data_conn.c create mode 100644 cmd/zrepl/data_conn.h create mode 100644 cmd/zrepl/mgmt_conn.c create mode 100644 cmd/zrepl/mgmt_conn.h diff --git a/cmd/uzfs_test/zrepl_utest.c b/cmd/uzfs_test/zrepl_utest.c index d3f94ffd635d..c727d8f8cee6 100644 --- a/cmd/uzfs_test/zrepl_utest.c +++ b/cmd/uzfs_test/zrepl_utest.c @@ -347,7 +347,7 @@ zrepl_utest(void *arg) reader_args.rebuild_test = B_FALSE; - sfd = create_and_bind(tgt_port, B_TRUE); + sfd = create_and_bind(tgt_port, B_TRUE, B_FALSE); if (sfd == -1) { return; } @@ -421,7 +421,7 @@ zrepl_utest(void *arg) replica_io_addr.sin_addr.s_addr = inet_addr(mgmt_ack.ip); replica_io_addr.sin_port = htons(mgmt_ack.port); retry: - io_sfd = create_and_bind("", B_FALSE); + io_sfd = create_and_bind("", B_FALSE, B_FALSE); if (io_sfd == -1) { printf("Socket creation failed with errno:%d\n", errno); goto start; @@ -432,6 +432,7 @@ zrepl_utest(void *arg) printf("Failed to connect to replica-IO port" " with errno:%d\n", errno); close(io_sfd); + sleep(1); goto retry; } printf("Connect to replica IO port is successfully\n"); @@ -520,7 +521,7 @@ zrepl_rebuild_test(void *arg) reader_args[1].max_iops = max_iops/2; reader_args[1].rebuild_test = B_TRUE; - sfd = create_and_bind(tgt_port, B_TRUE); + sfd = create_and_bind(tgt_port, B_TRUE, B_FALSE); if (sfd == -1) { return; } @@ -596,7 +597,7 @@ zrepl_rebuild_test(void *arg) replica_io_addr.sin_addr.s_addr = inet_addr(mgmt_ack->ip); replica_io_addr.sin_port = htons(mgmt_ack->port); retry: - io_sfd = create_and_bind("", B_FALSE); + io_sfd = create_and_bind("", B_FALSE, B_FALSE); if (io_sfd == -1) { printf("Socket creation failed with errno:%d\n", errno); goto start; @@ -607,13 +608,14 @@ zrepl_rebuild_test(void *arg) printf("Failed to connect to replica-IO port" " with errno:%d\n", errno); close(io_sfd); + sleep(1); goto retry; } printf("Connect to replica IO port is successfully\n"); writer_args.sfd[0] = reader_args[0].sfd[0] = io_sfd; - io_sfd1 = create_and_bind("", B_FALSE); + io_sfd1 = create_and_bind("", B_FALSE, B_FALSE); if (io_sfd1 == -1) { printf("Socket creation failed with errno:%d\n", errno); goto start; @@ -623,6 +625,7 @@ zrepl_rebuild_test(void *arg) if (rc == -1) { printf("Failed to connect to replica-IO port" " with errno:%d\n", errno); + sleep(1); close(io_sfd1); goto retry; } diff --git a/cmd/zrepl/Makefile.am b/cmd/zrepl/Makefile.am index 4e07c9c1a49d..bf704521036d 100644 --- a/cmd/zrepl/Makefile.am +++ b/cmd/zrepl/Makefile.am @@ -12,7 +12,9 @@ DEFAULT_INCLUDES += \ sbin_PROGRAMS = zrepl zrepl_SOURCES = \ - zrepl.c + zrepl.c \ + mgmt_conn.c \ + data_conn.c zrepl_LDADD = \ $(top_builddir)/lib/libnvpair/libnvpair.la \ @@ -20,3 +22,5 @@ zrepl_LDADD = \ $(top_builddir)/lib/libzpool/libzpool.la \ $(top_builddir)/lib/libzfs/libzfs.la \ $(top_builddir)/lib/libzfs_core/libzfs_core.la + +EXTRA_DIST = data_conn.h mgmt_conn.h diff --git a/cmd/zrepl/data_conn.c b/cmd/zrepl/data_conn.c new file mode 100644 index 000000000000..e78c4b211682 --- /dev/null +++ b/cmd/zrepl/data_conn.c @@ -0,0 +1,395 @@ +/* + * CDDL HEADER START + * + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. + * + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. + * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ +/* + * Copyright (c) 2018 Cloudbyte. All rights reserved. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include "data_conn.h" + +#define ZVOL_REBUILD_STEP_SIZE (128 * 1024 * 1024) // 128MB + +/* + * Allocate zio command along with + * buffer needed for IO completion. + */ +zvol_io_cmd_t * +zio_cmd_alloc(zvol_io_hdr_t *hdr, int fd) +{ + zvol_io_cmd_t *zio_cmd = kmem_zalloc( + sizeof (zvol_io_cmd_t), KM_SLEEP); + + bcopy(hdr, &zio_cmd->hdr, sizeof (zio_cmd->hdr)); + if ((hdr->opcode == ZVOL_OPCODE_READ) || + (hdr->opcode == ZVOL_OPCODE_WRITE) || + (hdr->opcode == ZVOL_OPCODE_HANDSHAKE)) { + zio_cmd->buf = kmem_zalloc(sizeof (char) * hdr->len, KM_SLEEP); + } + + zio_cmd->conn = fd; + return (zio_cmd); +} + +/* + * Free zio command along with buffer. + */ +void +zio_cmd_free(zvol_io_cmd_t **cmd) +{ + zvol_io_cmd_t *zio_cmd = *cmd; + zvol_op_code_t opcode = zio_cmd->hdr.opcode; + switch (opcode) { + case ZVOL_OPCODE_READ: + case ZVOL_OPCODE_WRITE: + case ZVOL_OPCODE_HANDSHAKE: + if (zio_cmd->buf != NULL) { + kmem_free(zio_cmd->buf, zio_cmd->hdr.len); + } + break; + + case ZVOL_OPCODE_SYNC: + case ZVOL_OPCODE_REBUILD_STEP_DONE: + /* Nothing to do */ + break; + + default: + VERIFY(!"Should be a valid opcode"); + break; + } + + kmem_free(zio_cmd, sizeof (zvol_io_cmd_t)); + *cmd = NULL; +} + +int +uzfs_zvol_socket_read(int fd, char *buf, uint64_t nbytes) +{ + ssize_t count = 0; + char *p = buf; + while (nbytes) { + count = read(fd, (void *)p, nbytes); + if (count <= 0) { + ZREPL_ERRLOG("Read error:%d\n", errno); + return (-1); + } + p += count; + nbytes -= count; + } + return (0); +} + +int +uzfs_zvol_socket_write(int fd, char *buf, uint64_t nbytes) +{ + ssize_t count = 0; + char *p = buf; + while (nbytes) { + count = write(fd, (void *)p, nbytes); + if (count <= 0) { + ZREPL_ERRLOG("Write error:%d\n", errno); + return (-1); + } + p += count; + nbytes -= count; + } + return (0); +} + +/* + * We expect only one chunk of data with meta header in write request. + * Nevertheless the code is general to handle even more of them. + */ +static int +uzfs_submit_writes(zvol_info_t *zinfo, zvol_io_cmd_t *zio_cmd) +{ + blk_metadata_t metadata; + boolean_t is_rebuild = B_FALSE; + zvol_io_hdr_t *hdr = &zio_cmd->hdr; + struct zvol_io_rw_hdr *write_hdr; + char *datap = (char *)zio_cmd->buf; + size_t data_offset = hdr->offset; + size_t remain = hdr->len; + int rc = 0; + is_rebuild = hdr->flags & ZVOL_OP_FLAG_REBUILD; + + while (remain > 0) { + if (remain < sizeof (*write_hdr)) + return (-1); + + write_hdr = (struct zvol_io_rw_hdr *)datap; + metadata.io_num = write_hdr->io_num; + + datap += sizeof (*write_hdr); + remain -= sizeof (*write_hdr); + if (remain < write_hdr->len) + return (-1); + + rc = uzfs_write_data(zinfo->zv, datap, data_offset, + write_hdr->len, &metadata, is_rebuild); + if (rc != 0) + break; + + datap += write_hdr->len; + remain -= write_hdr->len; + data_offset += write_hdr->len; + } + + return (rc); +} + +/* + * zvol worker is responsible for actual work. + * It execute read/write/sync command to uzfs. + * It enqueue command to completion queue and + * send signal to ack-sender thread. + */ +void +uzfs_zvol_worker(void *arg) +{ + zvol_io_cmd_t *zio_cmd; + zvol_info_t *zinfo; + zvol_state_t *zvol_state; + zvol_io_hdr_t *hdr; + metadata_desc_t **metadata_desc; + int rc = 0; + int write = 0; + boolean_t rebuild_cmd_req; + + zio_cmd = (zvol_io_cmd_t *)arg; + hdr = &zio_cmd->hdr; + zinfo = zio_cmd->zv; + zvol_state = zinfo->zv; + rebuild_cmd_req = hdr->flags & ZVOL_OP_FLAG_REBUILD; + + /* + * If zvol hasn't passed rebuild phase or if read + * is meant for rebuild then we need the metadata + */ + if (!rebuild_cmd_req && ZVOL_IS_REBUILDED(zvol_state)) { + metadata_desc = NULL; + zio_cmd->metadata_desc = NULL; + } else { + metadata_desc = &zio_cmd->metadata_desc; + } + switch (hdr->opcode) { + case ZVOL_OPCODE_READ: + rc = uzfs_read_data(zinfo->zv, + (char *)zio_cmd->buf, + hdr->offset, hdr->len, + metadata_desc); + break; + + case ZVOL_OPCODE_WRITE: + write = 1; + rc = uzfs_submit_writes(zinfo, zio_cmd); + zinfo->checkpointed_io_seq = + zio_cmd->hdr.checkpointed_io_seq; + break; + + case ZVOL_OPCODE_SYNC: + uzfs_flush_data(zinfo->zv); + break; + case ZVOL_OPCODE_REBUILD_STEP_DONE: + break; + default: + VERIFY(!"Should be a valid opcode"); + break; + } + + if (rc < 0) { + ZREPL_ERRLOG("Zvol op_code :%d failed with " + "error: %d\n", hdr->opcode, errno); + hdr->status = ZVOL_OP_STATUS_FAILED; + } else { + hdr->status = ZVOL_OP_STATUS_OK; + } + + /* + * We are not sending ACK for writes meant for rebuild + */ + if (rebuild_cmd_req && (hdr->opcode == ZVOL_OPCODE_WRITE)) { + zio_cmd_free(&zio_cmd); + goto drop_refcount; + } + + (void) pthread_mutex_lock(&zinfo->complete_queue_mutex); + STAILQ_INSERT_TAIL(&zinfo->complete_queue, zio_cmd, cmd_link); + if (write) { + zinfo->write_req_received_cnt++; + } else { + zinfo->read_req_received_cnt++; + } + + if (zinfo->io_ack_waiting) { + rc = pthread_cond_signal(&zinfo->io_ack_cond); + } + + (void) pthread_mutex_unlock(&zinfo->complete_queue_mutex); + +drop_refcount: + uzfs_zinfo_drop_refcnt(zinfo, B_FALSE); +} + +void +uzfs_zvol_rebuild_dw_replica(void *arg) +{ + rebuild_thread_arg_t *rebuild_args = arg; + struct sockaddr_in replica_ip; + + int rc, sfd = -1; + uint64_t offset = 0; + uint64_t checkpointed_io_seq; + zvol_info_t *zinfo = NULL; + zvol_state_t *zvol_state; + zvol_io_cmd_t *zio_cmd = NULL; + zvol_io_hdr_t hdr; + + sfd = rebuild_args->fd; + zinfo = rebuild_args->zinfo; + + bzero(&replica_ip, sizeof (replica_ip)); + replica_ip.sin_family = AF_INET; + replica_ip.sin_addr.s_addr = inet_addr(rebuild_args->ip); + replica_ip.sin_port = htons(rebuild_args->port); + + if (connect(sfd, (struct sockaddr *)&replica_ip, + sizeof (replica_ip)) != 0) { + perror("connect"); + goto exit; + } + + /* Set state in-progess state now */ + uzfs_zvol_set_rebuild_status(zinfo->zv, ZVOL_REBUILDING_IN_PROGRESS); + uzfs_zvol_get_last_committed_io_no(zinfo->zv, &checkpointed_io_seq); + zvol_state = zinfo->zv; + bzero(&hdr, sizeof (hdr)); + hdr.status = ZVOL_OP_STATUS_OK; + hdr.version = REPLICA_VERSION; + hdr.opcode = ZVOL_OPCODE_HANDSHAKE; + hdr.len = strlen(rebuild_args->zvol_name) + 1; + + rc = uzfs_zvol_socket_write(sfd, (char *)&hdr, sizeof (hdr)); + if (rc == -1) { + ZREPL_ERRLOG("Socket write failed, err: %d\n", errno); + goto exit; + } + + rc = uzfs_zvol_socket_write(sfd, (void *)rebuild_args->zvol_name, + hdr.len); + if (rc == -1) { + ZREPL_ERRLOG("Socket write failed, err: %d\n", errno); + goto exit; + } + +next_step: + if (offset >= ZVOL_VOLUME_SIZE(zvol_state)) { + hdr.opcode = ZVOL_OPCODE_REBUILD_COMPLETE; + rc = uzfs_zvol_socket_write(sfd, (char *)&hdr, sizeof (hdr)); + if (rc != 0) { + ZREPL_ERRLOG("Socket write failed, err: %d\n", errno); + goto exit; + } + atomic_dec_16(&zinfo->zv->rebuild_info.rebuild_cnt); + if (!zinfo->zv->rebuild_info.rebuild_cnt) { + /* Mark replica healthy now */ + uzfs_zvol_set_rebuild_status(zinfo->zv, + ZVOL_REBUILDING_DONE); + uzfs_zvol_set_status(zinfo->zv, ZVOL_STATUS_HEALTHY); + } + ZREPL_ERRLOG("Rebuilding on Replica:%s completed\n", + zinfo->name); + goto exit; + } else { + bzero(&hdr, sizeof (hdr)); + hdr.status = ZVOL_OP_STATUS_OK; + hdr.version = REPLICA_VERSION; + hdr.opcode = ZVOL_OPCODE_REBUILD_STEP; + hdr.checkpointed_io_seq = checkpointed_io_seq; + hdr.offset = offset; + hdr.len = ZVOL_REBUILD_STEP_SIZE; + rc = uzfs_zvol_socket_write(sfd, (char *)&hdr, sizeof (hdr)); + if (rc != 0) { + ZREPL_ERRLOG("Socket write failed, err: %d\n", errno); + goto exit; + } + } + + while (1) { + rc = uzfs_zvol_socket_read(sfd, (char *)&hdr, sizeof (hdr)); + if (rc != 0) { + ZREPL_ERRLOG("Socket read failed, err: %d\n", errno); + goto exit; + } + + if (hdr.opcode == ZVOL_OPCODE_REBUILD_STEP_DONE) { + offset += ZVOL_REBUILD_STEP_SIZE; + printf("ZVOL_OPCODE_REBUILD_STEP_DONE received\n"); + goto next_step; + } + + ASSERT((hdr.opcode == ZVOL_OPCODE_READ) && + (hdr.flags & ZVOL_OP_FLAG_REBUILD)); + hdr.opcode = ZVOL_OPCODE_WRITE; + + zio_cmd = zio_cmd_alloc(&hdr, sfd); + rc = uzfs_zvol_socket_read(sfd, zio_cmd->buf, hdr.len); + if (rc != 0) { + zio_cmd_free(&zio_cmd); + ZREPL_ERRLOG("Socket read failed with " + "error: %d\n", errno); + goto exit; + } + + /* + * Take refcount for uzfs_zvol_worker to work on it. + * Will dropped by uzfs_zvol_worker once cmd is executed. + */ + uzfs_zinfo_take_refcnt(zinfo, B_FALSE); + zio_cmd->zv = zinfo; + uzfs_zvol_worker(zio_cmd); + zio_cmd = NULL; + } + +exit: + kmem_free(arg, sizeof (rebuild_thread_arg_t)); + if (zio_cmd != NULL) + zio_cmd_free(&zio_cmd); + if (sfd != -1) + close(sfd); + + if (ZVOL_IS_DEGRADED(zinfo->zv)) + uzfs_zvol_set_rebuild_status(zinfo->zv, ZVOL_REBUILDING_INIT); + /* + * Parent thread have taken refcount, drop it now. + */ + uzfs_zinfo_drop_refcnt(zinfo, B_FALSE); + + printf("uzfs_zvol_rebuild_dw_replica thread exiting\n"); + zk_thread_exit(); +} diff --git a/cmd/zrepl/data_conn.h b/cmd/zrepl/data_conn.h new file mode 100644 index 000000000000..3bb499a701a2 --- /dev/null +++ b/cmd/zrepl/data_conn.h @@ -0,0 +1,46 @@ +/* + * CDDL HEADER START + * + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. + * + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. + * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ +/* + * Copyright (c) 2018 Cloudbyte. All rights reserved. + */ + +#ifndef _REBUILD_H +#define _REBUILD_H + +#include +#include + +typedef struct rebuild_thread_arg { + zvol_info_t *zinfo; + char zvol_name[MAX_NAME_LEN]; + int fd; + char ip[MAX_IP_LEN]; + uint16_t port; +} rebuild_thread_arg_t; + +zvol_io_cmd_t *zio_cmd_alloc(zvol_io_hdr_t *hdr, int fd); +void zio_cmd_free(zvol_io_cmd_t **cmd); +int uzfs_zvol_socket_read(int fd, char *buf, uint64_t nbytes); +int uzfs_zvol_socket_write(int fd, char *buf, uint64_t nbytes); +void uzfs_zvol_worker(void *arg); +void uzfs_zvol_rebuild_dw_replica(void *arg); + +#endif /* _REBUILD_H */ diff --git a/cmd/zrepl/mgmt_conn.c b/cmd/zrepl/mgmt_conn.c new file mode 100644 index 000000000000..1233f251eacf --- /dev/null +++ b/cmd/zrepl/mgmt_conn.c @@ -0,0 +1,928 @@ +/* + * CDDL HEADER START + * + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. + * + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. + * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ +/* + * Copyright (c) 2018 Cloudbyte. All rights reserved. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "mgmt_conn.h" +#include "data_conn.h" + +/* + * This file contains implementation of event loop (uzfs_zvol_mgmt_thread). + * Event loop is run by a single thread and it has exclusive access to + * file descriptors which simplifies locking. The only synchronization + * problem which needs to be taken care of is adding new connections and + * removing/closing existing ones, which is done by other threads. + * For that purpose there is: + * + * list of connections + * eventfd file descriptor for signaling changes in connection list + * connection list mutex which protects both entities mentioned above + * + * zinfo_create_cb - uzfs callback which adds entry to connection list + * (connect is async - it does not block creation) + * zinfo_destroy_cb - uzfs callback which removes entry from connection list + * (it blocks until the connection FD is really closed + * to guarantee no activity related to zinfo after it + * is destroyed) + * event loop thread never adds or removes list entries but only updates + * their state. + */ + +/* To print verbose debug messages uncomment the following line */ +#define MGMT_CONN_DEBUG 1 + +#ifdef MGMT_CONN_DEBUG +static const char * +gettimestamp(void) +{ + struct timeval tv; + static char buf[20]; + struct tm *timeinfo; + unsigned int ms; + + gettimeofday(&tv, NULL); + timeinfo = localtime(&tv.tv_sec); + ms = tv.tv_usec / 1000; + + strftime(buf, sizeof (buf), "%H:%M:%S.", timeinfo); + snprintf(buf + 9, sizeof (buf) - 9, "%u", ms); + + return (buf); +} +#define DBGCONN(c, fmt, ...) fprintf(stderr, "%s [tgt %s:%u]: " fmt "\n", \ + gettimestamp(), \ + (c)->conn_host, (c)->conn_port, ##__VA_ARGS__) +#else +#define DBGCONN(c, fmt, ...) +#endif + +/* Max # of events from epoll processed at once */ +#define MAX_EVENTS 10 +#define MGMT_PORT "12000" +#define RECONNECT_DELAY 4 // 4 seconds + +/* + * Mgmt connection states. + */ +enum conn_state { + CS_CONNECT, // tcp connect is in progress + CS_INIT, // initial state or state after sending reply + CS_READ_VERSION, // reading request version + CS_READ_HEADER, // reading request header + CS_READ_PAYLOAD, // reading request payload + CS_CLOSE, // closing connection - final state +}; + +/* + * Structure representing mgmt connection and all its reading/writing state. + */ +typedef struct uzfs_mgmt_conn { + SLIST_ENTRY(uzfs_mgmt_conn) conn_next; + int conn_fd; // network socket FD + int conn_refcount; // should be 0 or 1 + char conn_host[MAX_IP_LEN]; + uint16_t conn_port; + enum conn_state conn_state; + void *conn_buf; // buffer to hold network data + int conn_bufsiz; // bytes to read/write in total + int conn_procn; // bytes already read/written + zvol_io_hdr_t *conn_hdr; // header of currently processed cmd + time_t conn_last_connect; // time of last attempted connect() +} uzfs_mgmt_conn_t; + +/* conn list can be traversed or changed only when holding the mutex */ +kmutex_t conn_list_mtx; +SLIST_HEAD(, uzfs_mgmt_conn) uzfs_mgmt_conns; + +/* event FD for waking up event loop thread blocked in epoll_wait */ +int mgmt_eventfd = -1; +int epollfd = -1; +/* default iSCSI target IP address */ +char *target_addr; + +static int move_to_next_state(uzfs_mgmt_conn_t *conn); + +/* + * Remove connection FD from poll set and close the FD. + */ +static int +close_conn(uzfs_mgmt_conn_t *conn) +{ + /* Release resources tight to the conn */ + if (conn->conn_buf != NULL) { + kmem_free(conn->conn_buf, conn->conn_bufsiz); + conn->conn_buf = NULL; + } + conn->conn_bufsiz = 0; + conn->conn_procn = 0; + if (conn->conn_hdr != NULL) { + kmem_free(conn->conn_hdr, sizeof (zvol_io_hdr_t)); + conn->conn_hdr = NULL; + } + + if (epoll_ctl(epollfd, EPOLL_CTL_DEL, conn->conn_fd, NULL) == -1) { + perror("epoll_ctl del"); + return (-1); + } + (void) close(conn->conn_fd); + conn->conn_fd = -1; + return (0); +} + +/* + * Create non-blocking socket and initiate connection to the target. + * Returns the new FD or -1. + */ +static int +connect_to_tgt(uzfs_mgmt_conn_t *conn) +{ + struct sockaddr_in istgt_addr; + int sfd, rc; + + conn->conn_last_connect = time(NULL); + + bzero((char *)&istgt_addr, sizeof (istgt_addr)); + istgt_addr.sin_family = AF_INET; + istgt_addr.sin_addr.s_addr = inet_addr(conn->conn_host); + istgt_addr.sin_port = htons(conn->conn_port); + + sfd = create_and_bind(MGMT_PORT, B_FALSE, B_TRUE); + if (sfd < 0) + return (-1); + + rc = connect(sfd, (struct sockaddr *)&istgt_addr, sizeof (istgt_addr)); + /* EINPROGRESS means that EPOLLOUT will tell us when connect is done */ + if (rc != 0 && errno != EINPROGRESS) { + close(sfd); + DBGCONN(conn, "Failed to connect"); + return (-1); + } + return (sfd); +} + +/* + * Scan mgmt connection list and create new connections or close unused ones + * as needed. + */ +static int +scan_conn_list(void) +{ + uzfs_mgmt_conn_t *conn; + struct epoll_event ev; + int rc = 0; + + mutex_enter(&conn_list_mtx); + SLIST_FOREACH(conn, &uzfs_mgmt_conns, conn_next) { + /* we need to create new connection */ + if (conn->conn_refcount > 0 && conn->conn_fd < 0 && + time(NULL) - conn->conn_last_connect >= RECONNECT_DELAY) { + conn->conn_fd = connect_to_tgt(conn); + if (conn->conn_fd >= 0) { + conn->conn_state = CS_CONNECT; + ev.events = EPOLLOUT; + ev.data.ptr = conn; + if (epoll_ctl(epollfd, EPOLL_CTL_ADD, + conn->conn_fd, &ev) == -1) { + perror("epoll_ctl add"); + close(conn->conn_fd); + conn->conn_fd = -1; + rc = -1; + break; + } + } + /* we need to close unused connection */ + } else if (conn->conn_refcount == 0 && conn->conn_fd >= 0) { + DBGCONN(conn, "Closing the connection"); + if (close_conn(conn) != 0) { + rc = -1; + break; + } + } + } + mutex_exit(&conn_list_mtx); + + return (rc); +} + +/* + * Try to obtain controller address from zfs property. + */ +static int +get_controller_ip(objset_t *os, char *buf, int len) +{ + nvlist_t *props, *propval; + char *ip; + int error; + dsl_pool_t *dp = spa_get_dsl(os->os_spa); + + dsl_pool_config_enter(dp, FTAG); + error = dsl_prop_get_all(os, &props); + dsl_pool_config_exit(dp, FTAG); + if (error != 0) + return (error); + if (nvlist_lookup_nvlist(props, ZFS_PROP_TARGET_IP, &propval) != 0) + return (ENOENT); + if (nvlist_lookup_string(propval, ZPROP_VALUE, &ip) != 0) + return (EINVAL); + + strncpy(buf, ip, len); + nvlist_free(props); + return (0); +} + +/* + * This gets called whenever a new zinfo is created. We might need to create + * a new mgmt connection to iscsi target in response to this event. + */ +void +zinfo_create_cb(zvol_info_t *zinfo, nvlist_t *create_props) +{ + char target_host[256]; + uint16_t target_port; + uzfs_mgmt_conn_t *mgmt_conn, *new_mgmt_conn; + zvol_state_t *zv = zinfo->zv; + char *delim, *ip; + uint64_t val = 1; + int rc; + + /* if zvol is being created the zvol property does not exist yet */ + if (create_props != NULL && + nvlist_lookup_string(create_props, ZFS_PROP_TARGET_IP, &ip) == 0) { + strncpy(target_host, ip, sizeof (target_host)); + } else { + /* get it from zvol properties */ + if (get_controller_ip(zv->zv_objset, target_host, + sizeof (target_host)) != 0) { + /* in case of missing property take the default IP */ + strncpy(target_host, target_addr, sizeof (target_host)); + target_port = TARGET_PORT; + } + } + + delim = strchr(target_host, ':'); + if (delim == NULL) { + target_port = TARGET_PORT; + } else { + *delim = '\0'; + target_port = atoi(++delim); + } + + /* + * It is allocated before we enter the mutex even if it might not be + * used because, because in 99% of cases it will be needed (normally + * each zvol has a different iSCSI target). + */ + new_mgmt_conn = kmem_zalloc(sizeof (*new_mgmt_conn), KM_SLEEP); + + mutex_enter(&conn_list_mtx); + SLIST_FOREACH(mgmt_conn, &uzfs_mgmt_conns, conn_next) { + if (strcmp(mgmt_conn->conn_host, target_host) == 0 && + mgmt_conn->conn_port == target_port) { + /* we already have conn for this target */ + mgmt_conn->conn_refcount++; + zinfo->mgmt_conn = mgmt_conn; + mutex_exit(&conn_list_mtx); + kmem_free(new_mgmt_conn, sizeof (*new_mgmt_conn)); + return; + } + } + + new_mgmt_conn->conn_fd = -1; + new_mgmt_conn->conn_refcount = 1; + new_mgmt_conn->conn_port = target_port; + strncpy(new_mgmt_conn->conn_host, target_host, + sizeof (new_mgmt_conn->conn_host)); + + zinfo->mgmt_conn = new_mgmt_conn; + SLIST_INSERT_HEAD(&uzfs_mgmt_conns, new_mgmt_conn, conn_next); + /* signal the event loop thread */ + if (mgmt_eventfd >= 0) { + rc = write(mgmt_eventfd, &val, sizeof (val)); + ASSERT3P(rc, ==, sizeof (val)); + } + mutex_exit(&conn_list_mtx); +} + +/* + * This gets called whenever a zinfo is destroyed. We might need to close + * the mgmt connection to iscsi target if this was the last zinfo using it. + */ +void +zinfo_destroy_cb(zvol_info_t *zinfo) +{ + uzfs_mgmt_conn_t *conn; + uint64_t val = 1; + int rc; + + mutex_enter(&conn_list_mtx); + SLIST_FOREACH(conn, &uzfs_mgmt_conns, conn_next) { + if (conn == (uzfs_mgmt_conn_t *)zinfo->mgmt_conn) + break; + } + ASSERT3P(conn, !=, NULL); + zinfo->mgmt_conn = NULL; + + if (--conn->conn_refcount == 0) { + /* signal the event loop thread to close FD */ + if (conn->conn_fd >= 0) { + ASSERT3P(mgmt_eventfd, >=, 0); + rc = write(mgmt_eventfd, &val, sizeof (val)); + ASSERT3P(rc, ==, sizeof (val)); + mutex_exit(&conn_list_mtx); + /* wait for event loop thread to close the FD */ + /* TODO: too rough algorithm with sleep */ + while (1) { + usleep(1000); + mutex_enter(&conn_list_mtx); + if (conn->conn_refcount > 0 || + conn->conn_fd < 0) + break; + mutex_exit(&conn_list_mtx); + } + /* someone else reused the conn while waiting - ok */ + if (conn->conn_refcount > 0) { + mutex_exit(&conn_list_mtx); + return; + } + } + SLIST_REMOVE(&uzfs_mgmt_conns, conn, uzfs_mgmt_conn, + conn_next); + kmem_free(conn, sizeof (*conn)); + } + mutex_exit(&conn_list_mtx); +} + +/* + * Send handshake reply with error status to the client. + */ +static int +reply_error(uzfs_mgmt_conn_t *conn, zvol_op_status_t status, + int opcode, uint64_t io_seq, enum conn_state next_state) +{ + zvol_io_hdr_t *hdrp; + struct epoll_event ev; + + DBGCONN(conn, "Error reply with status %d for OP %d", + status, opcode); + + hdrp = kmem_zalloc(sizeof (*hdrp), KM_SLEEP); + hdrp->version = REPLICA_VERSION; + hdrp->opcode = opcode; + hdrp->io_seq = io_seq; + hdrp->status = status; + conn->conn_buf = hdrp; + conn->conn_bufsiz = sizeof (*hdrp); + conn->conn_procn = 0; + conn->conn_state = next_state; + + ev.events = EPOLLOUT; + ev.data.ptr = conn; + return (epoll_ctl(epollfd, EPOLL_CTL_MOD, conn->conn_fd, &ev)); +} + +/* + * Send reply to client which consists of a header and opaque payload. + */ +static int +reply_data(uzfs_mgmt_conn_t *conn, zvol_io_hdr_t *hdrp, void *buf, int size) +{ + struct epoll_event ev; + + DBGCONN(conn, "Data reply"); + + conn->conn_bufsiz = sizeof (*hdrp) + size; + conn->conn_procn = 0; + conn->conn_state = CS_INIT; + conn->conn_buf = kmem_zalloc(conn->conn_bufsiz, KM_SLEEP); + memcpy(conn->conn_buf, hdrp, sizeof (*hdrp)); + memcpy((char *)conn->conn_buf + sizeof (*hdrp), buf, size); + + ev.events = EPOLLOUT; + ev.data.ptr = conn; + return (epoll_ctl(epollfd, EPOLL_CTL_MOD, conn->conn_fd, &ev)); +} + +/* + * Get IP address of first external network interface we encounter. + */ +static int +uzfs_zvol_get_ip(char *host) +{ + struct ifaddrs *ifaddr, *ifa; + int family, n; + int rc = -1; + + if (getifaddrs(&ifaddr) == -1) { + perror("getifaddrs"); + return (-1); + } + + /* + * Walk through linked list, maintaining head + * pointer so we can free list later + */ + for (ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) { + if (ifa->ifa_addr == NULL) + continue; + + family = ifa->ifa_addr->sa_family; + + if (family == AF_INET || family == AF_INET6) { + rc = getnameinfo(ifa->ifa_addr, (family == AF_INET) ? + sizeof (struct sockaddr_in) : + sizeof (struct sockaddr_in6), + host, NI_MAXHOST, + NULL, 0, NI_NUMERICHOST); + if (rc != 0) { + perror("getnameinfo"); + break; + } + + if (family == AF_INET) { + if (strcmp(host, "127.0.0.1") == 0) + continue; + break; + } + } + } + + freeifaddrs(ifaddr); + return (rc); +} + +/* + * This function suppose to lookup into zvol list to find if LUN presented for + * identification is available/online or not. This function also need to send + * back IP address of replica along with port so that ISTGT controller can open + * a connection for IOs. + */ +static int +uzfs_zvol_mgmt_do_handshake(uzfs_mgmt_conn_t *conn, zvol_io_hdr_t *hdrp, + const char *name, zvol_info_t *zinfo) +{ + zvol_state_t *zv = zinfo->zv; + mgmt_ack_t mgmt_ack; + zvol_io_hdr_t hdr; + + printf("Volume %s sent for enq\n", name); + + bzero(&mgmt_ack, sizeof (mgmt_ack)); + if (uzfs_zvol_get_ip(mgmt_ack.ip) == -1) { + fprintf(stderr, "Unable to get IP with err: %d\n", errno); + return (reply_error(conn, ZVOL_OP_STATUS_FAILED, hdrp->opcode, + hdrp->io_seq, CS_INIT)); + } + + strncpy(mgmt_ack.volname, name, sizeof (mgmt_ack.volname)); + mgmt_ack.port = atoi((hdrp->opcode == ZVOL_OPCODE_PREPARE_FOR_REBUILD) ? + REBUILD_IO_SERVER_PORT : IO_SERVER_PORT); + mgmt_ack.pool_guid = spa_guid(zv->zv_spa); + /* + * We don't use fsid_guid because that one is not guaranteed + * to stay the same (it is changed in case of conflicts). + */ + mgmt_ack.zvol_guid = dsl_dataset_phys( + zv->zv_objset->os_dsl_dataset)->ds_guid; + + bzero(&hdr, sizeof (hdr)); + hdr.version = REPLICA_VERSION; + hdr.opcode = hdrp->opcode; // HANDSHAKE or PREPARE_FOR_REBUILD + hdr.io_seq = hdrp->io_seq; + hdr.len = sizeof (mgmt_ack); + hdr.status = ZVOL_OP_STATUS_OK; + uzfs_zvol_get_last_committed_io_no(zv, &hdr.checkpointed_io_seq); + + return (reply_data(conn, &hdr, &mgmt_ack, sizeof (mgmt_ack))); +} + +static int +uzfs_zvol_rebuild_status(uzfs_mgmt_conn_t *conn, zvol_io_hdr_t *hdrp, + const char *name, zvol_info_t *zinfo) +{ + zrepl_status_ack_t status_ack; + zvol_io_hdr_t hdr; + + status_ack.state = uzfs_zvol_get_status(zinfo->zv); + status_ack.rebuild_status = uzfs_zvol_get_rebuild_status(zinfo->zv); + + bzero(&hdr, sizeof (hdr)); + hdr.version = REPLICA_VERSION; + hdr.opcode = hdrp->opcode; + hdr.io_seq = hdrp->io_seq; + hdr.len = sizeof (status_ack); + hdr.status = ZVOL_OP_STATUS_OK; + + return (reply_data(conn, &hdr, &status_ack, sizeof (status_ack))); +} + +static int +uzfs_zvol_rebuild_dw_replica_start(uzfs_mgmt_conn_t *conn, zvol_io_hdr_t *hdrp, + mgmt_ack_t *mack, int rebuild_op_cnt) +{ + int io_sfd = -1; + rebuild_thread_arg_t *thrd_arg; + kthread_t *thrd_info; + zvol_info_t *zinfo = NULL; + + for (; rebuild_op_cnt > 0; rebuild_op_cnt--, mack++) { + printf("Replica %s at %s:%u helping in rebuild\n", + mack->volname, mack->ip, mack->port); + if (zinfo == NULL) { + zinfo = uzfs_zinfo_lookup(mack->dw_volname); + if (zinfo == NULL) { + printf("Replica %s not found\n", + mack->dw_volname); + return (reply_error(conn, ZVOL_OP_STATUS_FAILED, + hdrp->opcode, hdrp->io_seq, CS_INIT)); + } + /* Track # of rebuilds we are initializing on replica */ + zinfo->zv->rebuild_info.rebuild_cnt = rebuild_op_cnt; + + /* + * Case where just one replica is being used by customer + */ + if ((strcmp(mack->volname, "")) == 0) { + zinfo->zv->rebuild_info.rebuild_cnt = 0; + /* Mark replica healthy now */ + uzfs_zvol_set_rebuild_status(zinfo->zv, + ZVOL_REBUILDING_DONE); + uzfs_zvol_set_status(zinfo->zv, + ZVOL_STATUS_HEALTHY); + printf("Rebuild of replica %s completed\n", + zinfo->name); + uzfs_zinfo_drop_refcnt(zinfo, B_FALSE); + break; + } + } else { + uzfs_zinfo_take_refcnt(zinfo, B_FALSE); + } + + io_sfd = create_and_bind("", B_FALSE, B_FALSE); + if (io_sfd < 0) { + printf("Rebuild IO socket create and bind failed\n"); + uzfs_zinfo_drop_refcnt(zinfo, B_FALSE); + continue; + } + + thrd_arg = kmem_alloc(sizeof (rebuild_thread_arg_t), KM_SLEEP); + thrd_arg->zinfo = zinfo; + thrd_arg->fd = io_sfd; + strlcpy(thrd_arg->zvol_name, mack->volname, MAXNAMELEN); + thrd_info = zk_thread_create(NULL, 0, + uzfs_zvol_rebuild_dw_replica, thrd_arg, 0, NULL, TS_RUN, 0, + PTHREAD_CREATE_DETACHED); + VERIFY3P(thrd_info, !=, NULL); + } + + conn->conn_state = CS_INIT; + return (move_to_next_state(conn)); +} + +/* + * Process the whole message consisting of message header and optional payload. + */ +static int +process_message(uzfs_mgmt_conn_t *conn) +{ + char zvol_name[MAX_NAME_LEN + 1]; + zvol_io_hdr_t *hdrp = conn->conn_hdr; + void *payload = conn->conn_buf; + size_t payload_size = conn->conn_bufsiz; + zvol_info_t *zinfo; + int rc = 0; + + conn->conn_hdr = NULL; + conn->conn_buf = NULL; + conn->conn_bufsiz = 0; + conn->conn_procn = 0; + + switch (hdrp->opcode) { + case ZVOL_OPCODE_HANDSHAKE: + case ZVOL_OPCODE_PREPARE_FOR_REBUILD: + case ZVOL_OPCODE_REPLICA_STATUS: + if (payload_size == 0 || payload_size > MAX_NAME_LEN) { + rc = reply_error(conn, ZVOL_OP_STATUS_FAILED, + hdrp->opcode, hdrp->io_seq, CS_INIT); + break; + } + strncpy(zvol_name, payload, payload_size); + zvol_name[payload_size] = '\0'; + + if ((zinfo = uzfs_zinfo_lookup(zvol_name)) == NULL) { + fprintf(stderr, "Unknown zvol: %s\n", zvol_name); + rc = reply_error(conn, ZVOL_OP_STATUS_FAILED, + hdrp->opcode, hdrp->io_seq, CS_INIT); + break; + } + + if (hdrp->opcode == ZVOL_OPCODE_HANDSHAKE) { + DBGCONN(conn, "Handshake command for zvol %s", + zvol_name); + rc = uzfs_zvol_mgmt_do_handshake(conn, hdrp, zvol_name, + zinfo); + } else if (hdrp->opcode == ZVOL_OPCODE_PREPARE_FOR_REBUILD) { + DBGCONN(conn, "Prepare for rebuild command for zvol %s", + zvol_name); + rc = uzfs_zvol_mgmt_do_handshake(conn, hdrp, zvol_name, + zinfo); + } else if (hdrp->opcode == ZVOL_OPCODE_REPLICA_STATUS) { + DBGCONN(conn, "Replica status command for zvol %s", + zvol_name); + rc = uzfs_zvol_rebuild_status(conn, hdrp, zvol_name, + zinfo); + } else { + ASSERT(0); + } + uzfs_zinfo_drop_refcnt(zinfo, B_FALSE); + break; + + + case ZVOL_OPCODE_START_REBUILD: + /* iSCSI controller will send this msg to downgraded replica */ + if (payload_size < sizeof (mgmt_ack_t)) { + rc = reply_error(conn, ZVOL_OP_STATUS_FAILED, + hdrp->opcode, hdrp->io_seq, CS_INIT); + break; + } + DBGCONN(conn, "Rebuild start command"); + rc = uzfs_zvol_rebuild_dw_replica_start(conn, hdrp, payload, + payload_size / sizeof (mgmt_ack_t)); + break; + + default: + DBGCONN(conn, "Message with unknown OP code %d", hdrp->opcode); + rc = reply_error(conn, ZVOL_OP_STATUS_FAILED, hdrp->opcode, + hdrp->io_seq, CS_INIT); + break; + } + kmem_free(hdrp, sizeof (*hdrp)); + if (payload != NULL) + kmem_free(payload, payload_size); + + return (rc); +} + +/* + * Transition to the next state. This is called only if IO buffer was fully + * read or written. + */ +static int +move_to_next_state(uzfs_mgmt_conn_t *conn) +{ + struct epoll_event ev; + zvol_io_hdr_t *hdrp; + uint16_t vers; + int rc = 0; + + ASSERT3P(conn->conn_bufsiz, ==, conn->conn_procn); + + switch (conn->conn_state) { + case CS_CONNECT: + DBGCONN(conn, "Connected"); + /* Fall-through */ + case CS_INIT: + DBGCONN(conn, "Reading version.."); + if (conn->conn_buf != NULL) + kmem_free(conn->conn_buf, conn->conn_bufsiz); + conn->conn_buf = kmem_alloc(sizeof (uint16_t), KM_SLEEP); + conn->conn_bufsiz = sizeof (uint16_t); + conn->conn_procn = 0; + ev.events = EPOLLIN; + ev.data.ptr = conn; + rc = epoll_ctl(epollfd, EPOLL_CTL_MOD, conn->conn_fd, &ev); + conn->conn_state = CS_READ_VERSION; + break; + case CS_READ_VERSION: + vers = *((uint16_t *)conn->conn_buf); + kmem_free(conn->conn_buf, sizeof (uint16_t)); + if (vers != REPLICA_VERSION) { + fprintf(stderr, "invalid replica protocol version %d\n", + vers); + rc = reply_error(conn, ZVOL_OP_STATUS_VERSION_MISMATCH, + 0, 0, CS_CLOSE); + } else { + DBGCONN(conn, "Reading header.."); + hdrp = kmem_zalloc(sizeof (*hdrp), KM_SLEEP); + hdrp->version = vers; + conn->conn_buf = hdrp; + conn->conn_bufsiz = sizeof (*hdrp); + conn->conn_procn = sizeof (uint16_t); // skip version + conn->conn_state = CS_READ_HEADER; + } + break; + case CS_READ_HEADER: + hdrp = conn->conn_buf; + conn->conn_hdr = hdrp; + if (hdrp->len > 0) { + DBGCONN(conn, "Reading payload (%lu bytes)..", + hdrp->len); + conn->conn_buf = kmem_zalloc(hdrp->len, KM_SLEEP); + conn->conn_bufsiz = hdrp->len; + conn->conn_procn = 0; + conn->conn_state = CS_READ_PAYLOAD; + } else { + conn->conn_buf = NULL; + conn->conn_bufsiz = 0; + rc = process_message(conn); + } + break; + case CS_READ_PAYLOAD: + rc = process_message(conn); + break; + default: + ASSERT(0); + /* Fall-through */ + case CS_CLOSE: + rc = close_conn(conn); + break; + } + + return (rc); +} + +/* + * One thread to serve all management connections operating in non-blocking + * event driven style. + */ +void +uzfs_zvol_mgmt_thread(void *arg) +{ + char *buf; + uzfs_mgmt_conn_t *conn; + struct epoll_event ev, events[MAX_EVENTS]; + int nfds, i, rc; + boolean_t do_scan; + + mutex_init(&conn_list_mtx, NULL, MUTEX_DEFAULT, NULL); + + mgmt_eventfd = eventfd(0, EFD_NONBLOCK); + if (mgmt_eventfd < 0) { + perror("eventfd"); + zk_thread_exit(); + return; + } + epollfd = epoll_create1(0); + if (epollfd < 0) { + perror("epoll_create1"); + zk_thread_exit(); + return; + } + ev.events = EPOLLIN; + ev.data.ptr = NULL; + if (epoll_ctl(epollfd, EPOLL_CTL_ADD, mgmt_eventfd, &ev) == -1) { + perror("epoll_ctl"); + zk_thread_exit(); + } + + prctl(PR_SET_NAME, "mgmt_conn", 0, 0, 0); + + /* + * The only reason to break from this loop is a failure to update FDs + * in poll set. In that case we cannot guarantee consistent state. + * Any other failure should be handled gracefully. + */ + while (1) { + do_scan = B_FALSE; + nfds = epoll_wait(epollfd, events, MAX_EVENTS, + 1000 * RECONNECT_DELAY / 2); + if (nfds == -1) { + if (errno == EINTR) + continue; + perror("epoll_wait"); + goto exit; + } + + for (i = 0; i < nfds; i++) { + conn = events[i].data.ptr; + + /* + * data.ptr is null only for eventfd. In that case + * zinfo was created or deleted -> scan the list. + */ + if (conn == NULL) { + uint64_t value; + + do_scan = B_TRUE; + /* consume the event */ + rc = read(mgmt_eventfd, &value, sizeof (value)); + ASSERT3P(rc, ==, sizeof (value)); + continue; + } + + if (events[i].events & EPOLLERR) { + if (conn->conn_state == CS_CONNECT) { + DBGCONN(conn, "Failed to connect"); + } else { + fprintf(stderr, "Error on connection " + "to %s:%d\n", + conn->conn_host, conn->conn_port); + } + if (close_conn(conn) != 0) + goto exit; + /* tcp connected event */ + } else if ((events[i].events & EPOLLOUT) && + conn->conn_state == CS_CONNECT) { + move_to_next_state(conn); + /* data IO */ + } else if ((events[i].events & EPOLLIN) || + (events[i].events & EPOLLOUT)) { + ssize_t cnt; + int nbytes; + + /* restore reading/writing state */ + buf = (char *)conn->conn_buf + conn->conn_procn; + nbytes = conn->conn_bufsiz - conn->conn_procn; + + if (events[i].events & EPOLLIN) { + cnt = read(conn->conn_fd, buf, nbytes); + DBGCONN(conn, "Read %ld bytes", cnt); + } else { + cnt = write(conn->conn_fd, buf, nbytes); + DBGCONN(conn, "Written %ld bytes", cnt); + } + + if (cnt == 0) { + /* the other peer closed the conn */ + if (events[i].events & EPOLLIN) { + if (close_conn(conn) != 0) + goto exit; + } + } else if (cnt < 0) { + if (errno == EAGAIN || + errno == EWOULDBLOCK || + errno == EINTR) { + continue; + } + perror("read/write"); + if (close_conn(conn) != 0) + goto exit; + } else if (cnt <= nbytes) { + conn->conn_procn += cnt; + /* + * If we read/write the full buffer, + * move to the next state. + */ + if (cnt == nbytes && + move_to_next_state(conn) != 0) + goto exit; + } + } + } + /* + * Scan the list either if signalled or timed out waiting + * for event + */ + if (nfds == 0 || do_scan) { + if (scan_conn_list() != 0) + goto exit; + } + } + +exit: + (void) close(epollfd); + epollfd = -1; + (void) close(mgmt_eventfd); + mgmt_eventfd = -1; + mutex_enter(&conn_list_mtx); + SLIST_FOREACH(conn, &uzfs_mgmt_conns, conn_next) { + close_conn(conn); + } + mutex_exit(&conn_list_mtx); + mutex_destroy(&conn_list_mtx); + fprintf(stderr, "uzfs_zvol_mgmt_thread thread exiting\n"); + zk_thread_exit(); +} diff --git a/cmd/zrepl/mgmt_conn.h b/cmd/zrepl/mgmt_conn.h new file mode 100644 index 000000000000..07a1cc4e9095 --- /dev/null +++ b/cmd/zrepl/mgmt_conn.h @@ -0,0 +1,36 @@ +/* + * CDDL HEADER START + * + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. + * + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. + * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ +/* + * Copyright (c) 2018 Cloudbyte. All rights reserved. + */ + +#ifndef _MGMT_CONN_H +#define _MGMT_CONN_H + +#include + +extern char *target_addr; + +void zinfo_create_cb(zvol_info_t *zinfo, nvlist_t *create_props); +void zinfo_destroy_cb(zvol_info_t *zinfo); +void uzfs_zvol_mgmt_thread(void *arg); + +#endif /* _MGMT_CONN_H */ diff --git a/cmd/zrepl/zrepl.c b/cmd/zrepl/zrepl.c index 08d755a88f2e..2afba60e2292 100644 --- a/cmd/zrepl/zrepl.c +++ b/cmd/zrepl/zrepl.c @@ -1,40 +1,33 @@ #include #include -#include #include #include -#include -#include +#include +#include #include #include #include #include #include -#include #include #include +#include "mgmt_conn.h" +#include "data_conn.h" + #define MAXEVENTS 64 #define ZAP_UPDATE_TIME_INTERVAL 2 -#define ZVOL_REBUILD_STEP_SIZE (128 * 1024 * 1024) // 128MB - -char *io_server_port = "3232"; -char *rebuild_io_server_port = "3233"; -char *mgmt_port = "12000"; extern unsigned long zfs_arc_max; extern unsigned long zfs_arc_min; extern int zfs_autoimport_disable; -__thread char tinfo[20] = {0}; static void uzfs_zvol_io_ack_sender(void *arg); -static int get_controller_ip_address(char *buf, int len); -kthread_t *conn_accpt_thrd; -kthread_t *uzfs_mgmt_thread; -kthread_t *uzfs_timer_thread; -char *target_addr = NULL; +kthread_t *conn_accpt_thread; +kthread_t *uzfs_timer_thread; +kthread_t *mgmt_conn_thread; char *pool_name = NULL; struct in_addr addr = {0}; int zrepl_import(int argc, char **argv); @@ -77,301 +70,6 @@ help(void) printf("zrepl command args ... \nwhere 'command' is one of:\n\n"); printf("\t import [-t ip address)]\n"); printf("\t start [-t ip address)]\n"); - - -} - - -static int -make_socket_non_blocking(int sfd) -{ - int flags, s; - - flags = fcntl(sfd, F_GETFL, 0); - if (flags == -1) { - ZREPL_ERRLOG("fcntl() failed errno:%d\n", errno); - return (-1); - } - - flags |= O_NONBLOCK; - s = fcntl(sfd, F_SETFL, flags); - if (s == -1) { - ZREPL_ERRLOG("fcntl() failed errno:%d\n", errno); - return (-1); - } - return (0); -} - -static int -uzfs_zvol_get_ip(char *host) -{ - struct ifaddrs *ifaddr, *ifa; - int family, s, n; - - if (getifaddrs(&ifaddr) == -1) { - ZREPL_ERRLOG("getifaddrs() failed errno:%d\n", errno); - return (-1); - } - - /* - * Walk through linked list, maintaining head - * pointer so we can free list later - */ - - for (ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) { - if (ifa->ifa_addr == NULL) { - continue; - } - - family = ifa->ifa_addr->sa_family; - - if (family == AF_INET || family == AF_INET6) { - s = getnameinfo(ifa->ifa_addr, (family == AF_INET) ? - sizeof (struct sockaddr_in) : - sizeof (struct sockaddr_in6), - host, NI_MAXHOST, - NULL, 0, NI_NUMERICHOST); - if (s != 0) { - ZREPL_ERRLOG("getnameinfo() failed: %d\n", - errno); - s = -1; - goto exit; - } - - if (family == AF_INET) { - if (strcmp(host, "127.0.0.1") == 0) { - continue; - } - ZREPL_LOG("IP address: %s\n", host); - break; - } - } - } -exit: - freeifaddrs(ifaddr); - return (s); -} -/* - * Allocate zio command along with - * buffer needed for IO completion. - */ -static zvol_io_cmd_t * -zio_cmd_alloc(zvol_io_hdr_t *hdr, int fd) -{ - zvol_io_cmd_t *zio_cmd = kmem_zalloc( - sizeof (zvol_io_cmd_t), KM_SLEEP); - - bcopy(hdr, &zio_cmd->hdr, sizeof (zio_cmd->hdr)); - if ((hdr->opcode == ZVOL_OPCODE_READ) || - (hdr->opcode == ZVOL_OPCODE_WRITE) || - (hdr->opcode == ZVOL_OPCODE_HANDSHAKE)) { - zio_cmd->buf = kmem_zalloc(sizeof (char) * hdr->len, KM_SLEEP); - } - - zio_cmd->conn = fd; - return (zio_cmd); -} - -/* - * Free zio command along with buffer. - */ -static void -zio_cmd_free(zvol_io_cmd_t **cmd) -{ - zvol_io_cmd_t *zio_cmd = *cmd; - zvol_op_code_t opcode = zio_cmd->hdr.opcode; - switch (opcode) { - case ZVOL_OPCODE_READ: - case ZVOL_OPCODE_WRITE: - case ZVOL_OPCODE_HANDSHAKE: - if (zio_cmd->buf != NULL) { - kmem_free(zio_cmd->buf, zio_cmd->hdr.len); - } - break; - - case ZVOL_OPCODE_SYNC: - case ZVOL_OPCODE_REBUILD_STEP_DONE: - /* Nothing to do */ - break; - - default: - VERIFY(!"Should be a valid opcode"); - break; - } - - kmem_free(zio_cmd, sizeof (zvol_io_cmd_t)); - *cmd = NULL; -} - - -static int -uzfs_zvol_socket_read(int fd, char *buf, uint64_t nbytes) -{ - ssize_t count = 0; - char *p = buf; - while (nbytes) { - count = read(fd, (void *)p, nbytes); - if (count <= 0) { - ZREPL_ERRLOG("Read error:%d\n", errno); - return (-1); - } - p += count; - nbytes -= count; - } - return (0); -} - - -static inline int -uzfs_zvol_socket_write(int fd, char *buf, uint64_t nbytes) -{ - ssize_t count = 0; - char *p = buf; - while (nbytes) { - count = write(fd, (void *)p, nbytes); - if (count <= 0) { - ZREPL_ERRLOG("Write error:%d\n", errno); - return (-1); - } - p += count; - nbytes -= count; - } - return (0); -} - -/* - * We expect only one chunk of data with meta header in write request. - * Nevertheless the code is general to handle even more of them. - */ -static int -uzfs_submit_writes(zvol_info_t *zinfo, zvol_io_cmd_t *zio_cmd) -{ - blk_metadata_t metadata; - boolean_t is_rebuild = B_FALSE; - zvol_io_hdr_t *hdr = &zio_cmd->hdr; - struct zvol_io_rw_hdr *write_hdr; - char *datap = (char *)zio_cmd->buf; - size_t data_offset = hdr->offset; - size_t remain = hdr->len; - int rc = 0; - is_rebuild = hdr->flags & ZVOL_OP_FLAG_REBUILD; - - while (remain > 0) { - if (remain < sizeof (*write_hdr)) - return (-1); - - write_hdr = (struct zvol_io_rw_hdr *)datap; - metadata.io_num = write_hdr->io_num; - - datap += sizeof (*write_hdr); - remain -= sizeof (*write_hdr); - if (remain < write_hdr->len) - return (-1); - - rc = uzfs_write_data(zinfo->zv, datap, data_offset, - write_hdr->len, &metadata, is_rebuild); - if (rc != 0) - break; - - datap += write_hdr->len; - remain -= write_hdr->len; - data_offset += write_hdr->len; - } - - return (rc); -} - -/* - * zvol worker is responsible for actual work. - * It execute read/write/sync command to uzfs. - * It enqueue command to completion queue and - * send signal to ack-sender thread. - */ -static void -uzfs_zvol_worker(void *arg) -{ - zvol_io_cmd_t *zio_cmd; - zvol_info_t *zinfo; - zvol_state_t *zvol_state; - zvol_io_hdr_t *hdr; - metadata_desc_t **metadata_desc; - int rc = 0; - int write = 0; - boolean_t rebuild_cmd_req; - - zio_cmd = (zvol_io_cmd_t *)arg; - hdr = &zio_cmd->hdr; - zinfo = zio_cmd->zv; - zvol_state = zinfo->zv; - rebuild_cmd_req = hdr->flags & ZVOL_OP_FLAG_REBUILD; - - /* - * If zvol hasn't passed rebuild phase or if read - * is meant for rebuild then we need the metadata - */ - if (!rebuild_cmd_req && ZVOL_IS_REBUILDED(zvol_state)) { - metadata_desc = NULL; - zio_cmd->metadata_desc = NULL; - } else { - metadata_desc = &zio_cmd->metadata_desc; - } - switch (hdr->opcode) { - case ZVOL_OPCODE_READ: - rc = uzfs_read_data(zinfo->zv, - (char *)zio_cmd->buf, - hdr->offset, hdr->len, - metadata_desc); - break; - - case ZVOL_OPCODE_WRITE: - write = 1; - rc = uzfs_submit_writes(zinfo, zio_cmd); - zinfo->checkpointed_io_seq = - zio_cmd->hdr.checkpointed_io_seq; - break; - - case ZVOL_OPCODE_SYNC: - uzfs_flush_data(zinfo->zv); - break; - case ZVOL_OPCODE_REBUILD_STEP_DONE: - break; - default: - VERIFY(!"Should be a valid opcode"); - break; - } - - if (rc < 0) { - ZREPL_ERRLOG("Zvol op_code :%d failed with " - "error: %d\n", hdr->opcode, errno); - hdr->status = ZVOL_OP_STATUS_FAILED; - } else { - hdr->status = ZVOL_OP_STATUS_OK; - } - - /* - * We are not sending ACK for writes meant for rebuild - */ - if (rebuild_cmd_req && (hdr->opcode == ZVOL_OPCODE_WRITE)) { - zio_cmd_free(&zio_cmd); - goto drop_refcount; - } - - (void) pthread_mutex_lock(&zinfo->complete_queue_mutex); - STAILQ_INSERT_TAIL(&zinfo->complete_queue, zio_cmd, cmd_link); - if (write) { - zinfo->write_req_received_cnt++; - } else { - zinfo->read_req_received_cnt++; - } - - if (zinfo->io_ack_waiting) { - rc = pthread_cond_signal(&zinfo->io_ack_cond); - } - - (void) pthread_mutex_unlock(&zinfo->complete_queue_mutex); - -drop_refcount: - uzfs_zinfo_drop_refcnt(zinfo, B_FALSE); } /* @@ -425,6 +123,8 @@ uzfs_zvol_io_receiver(void *arg) fd = *(int *)arg; kmem_free(arg, sizeof (int)); + prctl(PR_SET_NAME, "io_receiver", 0, 0, 0); + while (1) { /* * if we don't know the version yet, be more careful when @@ -552,535 +252,6 @@ uzfs_zvol_io_receiver(void *arg) zk_thread_exit(); } -static int -uzfs_zvol_rebuild_status(zvol_io_hdr_t *hdr, int sfd, char *name) -{ - int rc = 0; - zvol_info_t *zinfo = NULL; - zrepl_status_ack_t status_ack; - - if ((zinfo = uzfs_zinfo_lookup(name)) == NULL) { - ZREPL_ERRLOG("Unknown zvol: %s\n", name); - hdr->status = ZVOL_OP_STATUS_FAILED; - } else { - hdr->status = ZVOL_OP_STATUS_OK; - hdr->len = sizeof (zrepl_status_ack_t); - } - - if (zinfo != NULL) { - status_ack.state = uzfs_zvol_get_status(zinfo->zv); - status_ack.rebuild_status = - uzfs_zvol_get_rebuild_status(zinfo->zv); - uzfs_zinfo_drop_refcnt(zinfo, B_FALSE); - } - - rc = uzfs_zvol_socket_write(sfd, (char *)hdr, sizeof (*hdr)); - if (rc != 0) { - ZREPL_ERRLOG("Write to socket failed with err: %d\n", errno); - return (-1); - } - - if (hdr->status != ZVOL_OP_STATUS_OK) { - return (0); - } - - rc = uzfs_zvol_socket_write(sfd, (char *)&status_ack, hdr->len); - if (rc != 0) { - ZREPL_ERRLOG("Write to socket failed with err: %d\n", errno); - rc = -1; - } - return (rc); -} - -/* - * This function suppose to lookup into zvol list - * to find if LUN presented for identification is - * available/online or not. This function also need - * to return IP address of replica along with port - * so that ISTGT controller can open a connection - * for IOs. - */ -static int -uzfs_zvol_mgmt_do_handshake(zvol_io_hdr_t *hdr, int sfd, char *name) -{ - int rc; - zvol_info_t *zinfo = NULL; - mgmt_ack_t mgmt_ack; - - printf("Volume: %s sent for enq\n", name); - - hdr->len = 0; - hdr->version = REPLICA_VERSION; - - bzero(&mgmt_ack, sizeof (mgmt_ack)); - strncpy(mgmt_ack.volname, name, sizeof (mgmt_ack.volname)); - if (hdr->opcode == ZVOL_OPCODE_PREPARE_FOR_REBUILD) { - /* - * Send rebuild socket IP and port - */ - mgmt_ack.port = atoi(rebuild_io_server_port); - } else { - /* - * Send normal IO socket IP and port - */ - mgmt_ack.port = atoi(io_server_port); - } - rc = uzfs_zvol_get_ip(mgmt_ack.ip); - - if (rc == -1) { - ZREPL_ERRLOG("Unable to get IP with err: %d\n", errno); - hdr->status = ZVOL_OP_STATUS_FAILED; - } else if ((zinfo = uzfs_zinfo_lookup(name)) == NULL) { - ZREPL_ERRLOG("Unknown zvol: %s\n", name); - hdr->status = ZVOL_OP_STATUS_FAILED; - } else { - hdr->status = ZVOL_OP_STATUS_OK; - hdr->len = sizeof (mgmt_ack_t); - } - - /* - * Retrieve checkpointed io_seq from ZAP - * and share it with iSCSI controller. - */ - if (zinfo != NULL) { - zvol_state_t *zv = zinfo->zv; - uzfs_zvol_get_last_committed_io_no(zv, - &hdr->checkpointed_io_seq); - mgmt_ack.pool_guid = spa_guid(zv->zv_spa); - /* - * We don't use fsid_guid because that one is not guaranteed - * to stay the same (it is changed in case of conflicts). - */ - mgmt_ack.zvol_guid = dsl_dataset_phys( - zv->zv_objset->os_dsl_dataset)->ds_guid; - uzfs_zinfo_drop_refcnt(zinfo, B_FALSE); - } - - rc = uzfs_zvol_socket_write(sfd, (char *)hdr, sizeof (*hdr)); - if (rc != 0) { - ZREPL_ERRLOG("Write to socket failed with err: %d\n", errno); - return (-1); - } - if (hdr->status != ZVOL_OP_STATUS_OK) { - return (-1); - } - - rc = uzfs_zvol_socket_write(sfd, (char *)&mgmt_ack, hdr->len); - if (rc != 0) { - ZREPL_ERRLOG("Write to socket failed with err: %d\n", errno); - rc = -1; - } - return (rc); -} - -static int -uzfs_zvol_mgmt_sync(zvol_io_hdr_t *hdr, int sfd, char *name) -{ - int rc = 0; - zvol_io_cmd_t *zio_cmd = NULL; - zvol_info_t *zinfo = NULL; - - ZREPL_LOG("Sync cmd received for Volume: %s\n", name); - if ((zinfo = uzfs_zinfo_lookup(name)) == NULL) { - ZREPL_ERRLOG("Unknown zvol: %s\n", name); - hdr->status = ZVOL_OP_STATUS_FAILED; - return (-1); - } - zio_cmd = zio_cmd_alloc(hdr, sfd); - zio_cmd->zv = zinfo; - taskq_dispatch(zinfo->uzfs_zvol_taskq, uzfs_zvol_worker, - zio_cmd, TQ_SLEEP); - return (rc); -} - -static int -uzfs_zvol_connect_to_tgt_controller(void *arg) -{ - char ip_buf[256]; - int sfd, rc; - struct sockaddr_in istgt_addr; - const char *target_addr = arg; - - if (target_addr == NULL) { - if (get_controller_ip_address(ip_buf, sizeof (ip_buf)) != 0) { - ZREPL_ERRLOG("parsing IP address did not work\n"); - return (-1); - } - target_addr = ip_buf; - } - - ZREPL_LOG("iSCSI controller IP address is: %s\n", target_addr); - bzero((char *)&istgt_addr, sizeof (istgt_addr)); - istgt_addr.sin_family = AF_INET; - istgt_addr.sin_addr.s_addr = inet_addr(target_addr); - istgt_addr.sin_port = htons(TARGET_PORT); -retry: - sfd = create_and_bind(mgmt_port, B_FALSE); - if (sfd == -1) { - return (-1); - } - - rc = connect(sfd, (struct sockaddr *)&istgt_addr, sizeof (istgt_addr)); - if (rc == -1) { - close(sfd); - sleep(2); - printf("Retrying ....\n"); - goto retry; - } else { - ZREPL_LOG("Connection to iSCSI controller is successful\n"); - } - return (sfd); -} - -/* - * TODO: This is throw away API. Side Car has to find - * a better way to pass iSCSI Controller IP address. - */ -static int -get_controller_ip_address(char *buf, int len) -{ - size_t nbytes; - - FILE *fp = fopen("/var/openebs/controllers.conf", "r"); - if (fp == NULL) { - printf("Error opening file\n"); - return (-1); - } - - nbytes = fread(buf, sizeof (char), len, fp); - - if (nbytes <= 0) { - printf("Read error\n"); - return (-1); - } - return (0); -} - -static void -uzfs_zvol_rebuild_dw_replica(void *arg) -{ - int rc, sfd = -1; - uint64_t offset = 0; - uint64_t checkpointed_io_seq; - thread_args_t *thrd_arg; - zvol_info_t *zinfo = NULL; - zvol_state_t *zvol_state; - zvol_io_cmd_t *zio_cmd = NULL; - zvol_io_hdr_t hdr; - - thrd_arg = (thread_args_t *)arg; - sfd = thrd_arg->fd; - zinfo = thrd_arg->zinfo; - - /* Set state in-progess state now */ - uzfs_zvol_set_rebuild_status(zinfo->zv, ZVOL_REBUILDING_IN_PROGRESS); - uzfs_zvol_get_last_committed_io_no(zinfo->zv, &checkpointed_io_seq); - zvol_state = zinfo->zv; - bzero(&hdr, sizeof (hdr)); - hdr.status = ZVOL_OP_STATUS_OK; - hdr.version = REPLICA_VERSION; - hdr.opcode = ZVOL_OPCODE_HANDSHAKE; - hdr.len = strlen(thrd_arg->zvol_name) + 1; - - rc = uzfs_zvol_socket_write(sfd, (char *)&hdr, sizeof (hdr)); - if (rc == -1) { - ZREPL_ERRLOG("Socket write failed, err: %d\n", errno); - goto exit; - } - - rc = uzfs_zvol_socket_write(sfd, (void *)thrd_arg->zvol_name, hdr.len); - if (rc == -1) { - ZREPL_ERRLOG("Socket write failed, err: %d\n", errno); - goto exit; - } - -next_step: - if (offset >= ZVOL_VOLUME_SIZE(zvol_state)) { - hdr.opcode = ZVOL_OPCODE_REBUILD_COMPLETE; - rc = uzfs_zvol_socket_write(sfd, (char *)&hdr, sizeof (hdr)); - if (rc != 0) { - ZREPL_ERRLOG("Socket write failed, err: %d\n", errno); - goto exit; - } - atomic_dec_16(&zinfo->zv->rebuild_info.rebuild_cnt); - if (!zinfo->zv->rebuild_info.rebuild_cnt) { - /* Mark replica healthy now */ - uzfs_zvol_set_rebuild_status(zinfo->zv, - ZVOL_REBUILDING_DONE); - uzfs_zvol_set_status(zinfo->zv, ZVOL_STATUS_HEALTHY); - } - ZREPL_ERRLOG("Rebuilding on Replica:%s completed\n", - zinfo->name); - goto exit; - } else { - bzero(&hdr, sizeof (hdr)); - hdr.status = ZVOL_OP_STATUS_OK; - hdr.version = REPLICA_VERSION; - hdr.opcode = ZVOL_OPCODE_REBUILD_STEP; - hdr.checkpointed_io_seq = checkpointed_io_seq; - hdr.offset = offset; - hdr.len = ZVOL_REBUILD_STEP_SIZE; - rc = uzfs_zvol_socket_write(sfd, (char *)&hdr, sizeof (hdr)); - if (rc != 0) { - ZREPL_ERRLOG("Socket write failed, err: %d\n", errno); - goto exit; - } - } - - while (1) { - rc = uzfs_zvol_socket_read(sfd, (char *)&hdr, sizeof (hdr)); - if (rc != 0) { - ZREPL_ERRLOG("Socket read failed, err: %d\n", errno); - goto exit; - } - - if (hdr.opcode == ZVOL_OPCODE_REBUILD_STEP_DONE) { - offset += ZVOL_REBUILD_STEP_SIZE; - printf("ZVOL_OPCODE_REBUILD_STEP_DONE received\n"); - goto next_step; - } - - ASSERT((hdr.opcode == ZVOL_OPCODE_READ) && - (hdr.flags & ZVOL_OP_FLAG_REBUILD)); - hdr.opcode = ZVOL_OPCODE_WRITE; - - zio_cmd = zio_cmd_alloc(&hdr, sfd); - rc = uzfs_zvol_socket_read(sfd, zio_cmd->buf, hdr.len); - if (rc != 0) { - zio_cmd_free(&zio_cmd); - ZREPL_ERRLOG("Socket read failed with " - "error: %d\n", errno); - goto exit; - } - - /* - * Take refcount for uzfs_zvol_worker to work on it. - * Will dropped by uzfs_zvol_worker once cmd is executed. - */ - uzfs_zinfo_take_refcnt(zinfo, B_FALSE); - zio_cmd->zv = zinfo; - uzfs_zvol_worker(zio_cmd); - zio_cmd = NULL; - } - -exit: - kmem_free(thrd_arg, sizeof (thread_args_t)); - if (zio_cmd != NULL) - zio_cmd_free(&zio_cmd); - if (sfd != -1) - close(sfd); - - if (ZVOL_IS_DEGRADED(zinfo->zv)) - uzfs_zvol_set_rebuild_status(zinfo->zv, ZVOL_REBUILDING_INIT); - /* - * Parent thread have taken refcount, drop it now. - */ - uzfs_zinfo_drop_refcnt(zinfo, B_FALSE); - - printf("uzfs_zvol_rebuild_dw_replica thread exiting\n"); - zk_thread_exit(); -} - -static int -uzfs_zvol_rebuild_dw_replica_start(zvol_io_hdr_t *hdr, int fd, char *buf) -{ - int rc = 0; - int io_sfd = -1; - int rebuild_op_cnt; - thread_args_t *thrd_arg; - mgmt_ack_t *mgmt_ack; - kthread_t *thrd_info; - zvol_info_t *zinfo = NULL; - struct sockaddr_in replica_ip; - - mgmt_ack = (mgmt_ack_t *)buf; - rebuild_op_cnt = hdr->len / sizeof (mgmt_ack_t); - ZREPL_LOG("Replica being rebuild:%s and rebuild ops requested:%d\n", - mgmt_ack->dw_volname, rebuild_op_cnt); - - while (rebuild_op_cnt) { - ZREPL_LOG("Replica:%s helping in rebuild with IP:%s and Port%d", - mgmt_ack->volname, mgmt_ack->ip, mgmt_ack->port); - if (zinfo == NULL) { - zinfo = uzfs_zinfo_lookup(mgmt_ack->dw_volname); - if (zinfo == NULL) { - ZREPL_ERRLOG("Replica being rebuilt:%s " - "not found\n", mgmt_ack->dw_volname); - return (-1); - } - - /* - * Count how many rebuilds we are - * initializing on this replica - */ - zinfo->zv->rebuild_info.rebuild_cnt = rebuild_op_cnt; - } else { - uzfs_zinfo_take_refcnt(zinfo, B_FALSE); - } - - /* - * Case where just one replica is being used by customer. - */ - if ((strcmp(mgmt_ack->volname, "")) == 0) { - zinfo->zv->rebuild_info.rebuild_cnt = 0; - /* Mark replica healthy now */ - uzfs_zvol_set_rebuild_status(zinfo->zv, - ZVOL_REBUILDING_DONE); - uzfs_zvol_set_status(zinfo->zv, ZVOL_STATUS_HEALTHY); - ZREPL_ERRLOG("Rebuilding on Replica:%s completed\n", - zinfo->name); - uzfs_zinfo_drop_refcnt(zinfo, B_FALSE); - goto exit; - } - - bzero((char *)&replica_ip, sizeof (replica_ip)); - replica_ip.sin_family = AF_INET; - replica_ip.sin_addr.s_addr = inet_addr(mgmt_ack->ip); - replica_ip.sin_port = htons(mgmt_ack->port); - io_sfd = create_and_bind("", B_FALSE); - if (io_sfd == -1) { - ZREPL_ERRLOG("Rebuild IO socket create " - "and bind failed\n"); - rc = -1; - goto exit; - } - - rc = connect(io_sfd, (struct sockaddr *)&replica_ip, - sizeof (replica_ip)); - if (rc == -1) { - printf("Failed to connect to port\n"); - rc = -1; - goto exit; - } - - thrd_arg = kmem_alloc(sizeof (thread_args_t), KM_SLEEP); - thrd_arg->zinfo = zinfo; - thrd_arg->fd = io_sfd; - strlcpy(thrd_arg->zvol_name, mgmt_ack->volname, MAXNAMELEN); - thrd_info = zk_thread_create(NULL, 0, - (thread_func_t)uzfs_zvol_rebuild_dw_replica, - (void *)thrd_arg, 0, NULL, TS_RUN, 0, - PTHREAD_CREATE_DETACHED); - VERIFY3P(thrd_info, !=, NULL); - rebuild_op_cnt--; - mgmt_ack++; - } -exit: - if (rc == -1) - uzfs_zinfo_drop_refcnt(zinfo, B_FALSE); - return (0); -} - -/* - * One thread per replica, which will be - * responsible for initial handshake and - * exchanging info like IP add, port etc. - */ -static void -uzfs_zvol_mgmt_thread(void *arg) -{ - int rc; - char *buf; - int sfd = -1; - zvol_io_hdr_t hdr = {0, }; - - sfd = uzfs_zvol_connect_to_tgt_controller(arg); - if (sfd == -1) { - goto exit; - } - - while (1) { - rc = uzfs_zvol_read_header(sfd, &hdr); - if (rc < 0) { - ZREPL_ERRLOG("Management connection " - "disconnected\n"); - /* - * Error has occurred on this socket - * close it and open a new socket after - * 5 sec of sleep. - */ -close_conn: - close(sfd); - sfd = uzfs_zvol_connect_to_tgt_controller(arg); - if (sfd == -1) { - goto exit; - } - continue; - } else if (rc > 0) { - /* Send to target the correct version */ - hdr.version = REPLICA_VERSION; - hdr.status = ZVOL_OP_STATUS_VERSION_MISMATCH; - hdr.opcode = ZVOL_OPCODE_HANDSHAKE; - hdr.len = 0; - (void) uzfs_zvol_socket_write(sfd, - (char *)&hdr, sizeof (hdr)); - goto close_conn; - } - - buf = kmem_alloc(hdr.len * sizeof (char), KM_SLEEP); - rc = uzfs_zvol_socket_read(sfd, buf, hdr.len); - if (rc != 0) { - kmem_free(buf, hdr.len); - goto close_conn; - } - - switch (hdr.opcode) { - case ZVOL_OPCODE_HANDSHAKE: - case ZVOL_OPCODE_PREPARE_FOR_REBUILD: - rc = uzfs_zvol_mgmt_do_handshake(&hdr, sfd, buf); - if (rc != 0) { - ZREPL_ERRLOG("Handshake failed\n"); - } - break; - - case ZVOL_OPCODE_START_REBUILD: - /* - * iSCSI controller will send this - * message to a downgraded replica - */ - rc = uzfs_zvol_rebuild_dw_replica_start(&hdr, sfd, buf); - if (rc == -1) { - ZREPL_ERRLOG("Rebuild start failed errno:%d\n", - errno); - } - break; - - case ZVOL_OPCODE_REPLICA_STATUS: - rc = uzfs_zvol_rebuild_status(&hdr, sfd, buf); - if (rc != 0) { - ZREPL_ERRLOG("Rebuild status enq failed\n"); - } - break; - - case ZVOL_OPCODE_SYNC: - uzfs_zvol_mgmt_sync(&hdr, sfd, buf); - if (rc == -1) { - ZREPL_ERRLOG("Sync failed errno:%d\n", - errno); - } - break; - - /* More management commands will come here in future */ - default: - kmem_free(buf, hdr.len); - /* Command yet to be implemented */ - hdr.status = ZVOL_OP_STATUS_FAILED; - hdr.len = 0; - (void) uzfs_zvol_socket_write(sfd, - (char *)&hdr, sizeof (hdr)); - goto close_conn; - break; /* Should not be reached */ - } - kmem_free(buf, hdr.len); - } -exit: - if (sfd < 0) - close(sfd); - ZREPL_LOG("uzfs_zvol_mgmt_thread thread exiting\n"); - zk_thread_exit(); -} - - static int uzfs_zvol_rebuild_scanner_callback(off_t offset, size_t len, blk_metadata_t *metadata, zvol_state_t *zv, void *args) @@ -1253,16 +424,11 @@ uzfs_zvol_io_conn_acceptor(void) io_sfd = rebuild_fd = efd = -1; flags = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP | EPOLLRDHUP; /* Create IO connection acceptor fd first */ - io_sfd = create_and_bind(io_server_port, B_TRUE); + io_sfd = create_and_bind(IO_SERVER_PORT, B_TRUE, B_TRUE); if (io_sfd == -1) { goto exit; } - rc = make_socket_non_blocking(io_sfd); - if (rc == -1) { - goto exit; - } - rc = listen(io_sfd, SOMAXCONN); if (rc == -1) { ZREPL_ERRLOG("listen() on IO_SFD failed with errno:%d\n", @@ -1270,16 +436,11 @@ uzfs_zvol_io_conn_acceptor(void) goto exit; } - rebuild_fd = create_and_bind(rebuild_io_server_port, B_TRUE); + rebuild_fd = create_and_bind(REBUILD_IO_SERVER_PORT, B_TRUE, B_TRUE); if (rebuild_fd == -1) { goto exit; } - rc = make_socket_non_blocking(rebuild_fd); - if (rc == -1) { - goto exit; - } - rc = listen(rebuild_fd, SOMAXCONN); if (rc == -1) { ZREPL_ERRLOG("listen() on REBUILD_FD failed with errno:%d\n", @@ -1314,6 +475,8 @@ uzfs_zvol_io_conn_acceptor(void) /* Buffer where events are returned */ events = calloc(MAXEVENTS, sizeof (event)); + prctl(PR_SET_NAME, "acceptor", 0, 0, 0); + /* The event loop */ while (1) { n = epoll_wait(efd, events, MAXEVENTS, -1); @@ -1410,6 +573,8 @@ uzfs_zvol_io_conn_acceptor(void) static void uzfs_zvol_timer_thread(void) { + prctl(PR_SET_NAME, "zvol_timer", 0, 0, 0); + while (1) { sleep(ZAP_UPDATE_TIME_INTERVAL); uzfs_zinfo_update_io_seq_for_all_volumes(); @@ -1492,6 +657,9 @@ uzfs_zvol_io_ack_sender(void *arg) fd = thrd_arg->fd; zinfo = uzfs_zinfo_lookup(thrd_arg->zvol_name); kmem_free(arg, sizeof (thread_args_t)); + + prctl(PR_SET_NAME, "ack_sender", 0, 0, 0); + while (1) { int rc = 0; (void) pthread_mutex_lock(&zinfo->complete_queue_mutex); @@ -1634,7 +802,7 @@ zrepl_import(int argc, char **argv) return (1); } - fprintf(stdout, "import pool %s target addr %s\n", pool_name, + fprintf(stdout, "import pool %s default target addr %s\n", pool_name, target_addr); libzfs_handle_t *hdl = libzfs_init(); @@ -1698,16 +866,15 @@ zrepl_start(int argc, char **argv) void zrepl_svc_run(void) { + mgmt_conn_thread = zk_thread_create(NULL, 0, + (thread_func_t)uzfs_zvol_mgmt_thread, target_addr, 0, NULL, + TS_RUN, 0, PTHREAD_CREATE_DETACHED); + VERIFY3P(mgmt_conn_thread, !=, NULL); - conn_accpt_thrd = zk_thread_create(NULL, 0, + conn_accpt_thread = zk_thread_create(NULL, 0, (thread_func_t)uzfs_zvol_io_conn_acceptor, NULL, 0, NULL, TS_RUN, 0, PTHREAD_CREATE_DETACHED); - VERIFY3P(conn_accpt_thrd, !=, NULL); - - uzfs_mgmt_thread = zk_thread_create(NULL, 0, - (thread_func_t)uzfs_zvol_mgmt_thread, target_addr, 0, NULL, - TS_RUN, 0, PTHREAD_CREATE_DETACHED); - VERIFY3P(uzfs_mgmt_thread, !=, NULL); + VERIFY3P(conn_accpt_thread, !=, NULL); uzfs_timer_thread = zk_thread_create(NULL, 0, (thread_func_t)uzfs_zvol_timer_thread, NULL, 0, NULL, TS_RUN, @@ -1724,7 +891,7 @@ main(int argc, char **argv) int rc; int i = 0; - const char *cmd_name = NULL; + const char *cmd_name = NULL; if (argc < 2) { help(); @@ -1738,10 +905,6 @@ main(int argc, char **argv) return (1); } - pthread_t slf = pthread_self(); - snprintf(tinfo, sizeof (tinfo), "m#%d.%d", - (int)(((uint64_t *)slf)[0]), getpid()); - if (getenv("CONFIG_LOAD_DISABLE") != NULL) { printf("disabled auto import (reading of zpool.cache)\n"); zfs_autoimport_disable = 1; @@ -1750,6 +913,8 @@ main(int argc, char **argv) zfs_autoimport_disable = 0; } + zinfo_create_hook = &zinfo_create_cb; + zinfo_destroy_hook = &zinfo_destroy_cb; rc = uzfs_init(); uzfs_zrepl_open_log(); if (rc != 0) { diff --git a/include/uzfs_io.h b/include/uzfs_io.h index 4a900c32c6c2..f8e0a42e09b7 100644 --- a/include/uzfs_io.h +++ b/include/uzfs_io.h @@ -23,6 +23,7 @@ #define _UZFS_IO_H #include +#include #ifdef __cplusplus extern "C" { diff --git a/include/zrepl_mgmt.h b/include/zrepl_mgmt.h index 17ad20c71fa0..0449caaec117 100644 --- a/include/zrepl_mgmt.h +++ b/include/zrepl_mgmt.h @@ -26,6 +26,7 @@ #define ZREPL_MGMT_H #include +#include #include #include #include "zrepl_prot.h" @@ -37,6 +38,10 @@ extern "C" { #define uZFS_ZVOL_WORKERS_MAX 128 #define uZFS_ZVOL_WORKERS_DEFAULT 6 +#define ZFS_PROP_TARGET_IP "com.cloudbyte:targetip" + +#define REBUILD_IO_SERVER_PORT "3233" +#define IO_SERVER_PORT "3232" extern kmutex_t zvol_list_mutex; struct zvol_io_cmd_s; @@ -77,6 +82,8 @@ typedef struct zvol_info_s { /* Will be used to singal ack-sender to exit */ uint8_t conn_closed; + /* Pointer to mgmt connection for this zinfo */ + void *mgmt_conn; /* Perfromance counter */ @@ -93,6 +100,9 @@ typedef struct thread_args_s { int fd; } thread_args_t; +extern void (*zinfo_create_hook)(zvol_info_t *, nvlist_t *); +extern void (*zinfo_destroy_hook)(zvol_info_t *); + typedef struct zvol_io_cmd_s { STAILQ_ENTRY(zvol_io_cmd_s) cmd_link; zvol_io_hdr_t hdr; @@ -107,7 +117,8 @@ typedef struct zvol_rebuild_s { int fd; } zvol_rebuild_t; -extern int uzfs_zinfo_init(void *zv, const char *ds_name); +extern int uzfs_zinfo_init(void *zv, const char *ds_name, + nvlist_t *create_props); extern zvol_info_t *uzfs_zinfo_lookup(const char *name); extern void uzfs_zinfo_drop_refcnt(zvol_info_t *zinfo, int locked); extern void uzfs_zinfo_take_refcnt(zvol_info_t *zinfo, int locked); @@ -118,28 +129,25 @@ void uzfs_zvol_get_last_committed_io_no(zvol_state_t *zv, uint64_t *io_seq); void uzfs_zvol_store_last_committed_io_no(zvol_state_t *zv, uint64_t io_seq); -extern int create_and_bind(const char *port, int bind_needed); +extern int create_and_bind(const char *port, int bind_needed, + boolean_t nonblocking); #define ZREPL_LOG(fmt, ...) syslog(LOG_NOTICE, \ - "%-18.18s:%4d: %-20.20s: " fmt, __func__, __LINE__, \ - tinfo, ##__VA_ARGS__) + "%-18.18s:%4d: " fmt, __func__, __LINE__, ##__VA_ARGS__) #define ZREPL_NOTICELOG(fmt, ...) syslog(LOG_NOTICE, \ - "%-18.18s:%4d: %-20.20s: " fmt, __func__, __LINE__, \ - tinfo, ##__VA_ARGS__) + "%-18.18s:%4d: " fmt, __func__, __LINE__, ##__VA_ARGS__) #define ZREPL_ERRLOG(fmt, ...) syslog(LOG_ERR, \ - "%-18.18s:%4d: %-20.20s: " fmt, __func__, __LINE__, \ - tinfo, ##__VA_ARGS__) + "%-18.18s:%4d: " fmt, __func__, __LINE__, ##__VA_ARGS__) #define ZREPL_WARNLOG(fmt, ...) syslog(LOG_ERR, \ - "%-18.18s:%4d: %-20.20s: " fmt, __func__, __LINE__, \ - tinfo, ##__VA_ARGS__) + "%-18.18s:%4d: " fmt, __func__, __LINE__, ##__VA_ARGS__) #define ZREPL_TRACELOG(FLAG, fmt, ...) \ do { \ - syslog(LOG_NOTICE, "%-18.18s:%4d: %-20.20s: " \ - fmt, __func__, __LINE__, tinfo, ##__VA_ARGS__); \ + syslog(LOG_NOTICE, "%-18.18s:%4d: " \ + fmt, __func__, __LINE__, ##__VA_ARGS__); \ } while (0) #ifdef __cplusplus diff --git a/include/zrepl_prot.h b/include/zrepl_prot.h index 756e4a61a316..189f49a04bdb 100644 --- a/include/zrepl_prot.h +++ b/include/zrepl_prot.h @@ -82,6 +82,9 @@ typedef enum zvol_op_status zvol_op_status_t; struct zvol_io_hdr { uint16_t version; zvol_op_code_t opcode; + zvol_op_status_t status; + uint8_t flags; + uint8_t padding[3]; uint64_t io_seq; /* only used for read/write */ uint64_t offset; @@ -91,8 +94,6 @@ struct zvol_io_hdr { */ uint64_t len; uint64_t checkpointed_io_seq; - uint8_t flags; - zvol_op_status_t status; } __attribute__((packed)); typedef struct zvol_io_hdr zvol_io_hdr_t; diff --git a/lib/libzpool/uzfs_io.c b/lib/libzpool/uzfs_io.c index c58b1f57705d..a13111543eae 100644 --- a/lib/libzpool/uzfs_io.c +++ b/lib/libzpool/uzfs_io.c @@ -297,8 +297,6 @@ uzfs_read_data(zvol_state_t *zv, char *buf, uint64_t offset, uint64_t len, md_ent = uzfs_metadata_append(zv, metadata, nmetas, md_head, md_ent); kmem_free(metadata, metablk.m_len); - if (error != 0) - goto exit; } offset += bytes; read += bytes; diff --git a/lib/libzpool/uzfs_mgmt.c b/lib/libzpool/uzfs_mgmt.c index 3ec5bba80c17..060ed9d72cbb 100644 --- a/lib/libzpool/uzfs_mgmt.c +++ b/lib/libzpool/uzfs_mgmt.c @@ -435,6 +435,7 @@ uzfs_zvol_create_cb(const char *ds_name, void *arg) zvol_state_t *zv = NULL; int error = -1; + nvlist_t *nvprops = arg; printf("ds_name %s\n", ds_name); @@ -444,7 +445,7 @@ uzfs_zvol_create_cb(const char *ds_name, void *arg) return (error); } - if (uzfs_zinfo_init(zv, ds_name) != 0) { + if (uzfs_zinfo_init(zv, ds_name, nvprops) != 0) { printf("Failed in uzfs_zinfo_init\n"); return (error); } @@ -498,10 +499,13 @@ uzfs_spa_init(spa_t *spa) cv_init(&us->cv, NULL, CV_DEFAULT, NULL); spa->spa_us = us; mutex_enter(&us->mtx); + spa->spa_us = us; us->update_txg_tid = zk_thread_create(NULL, 0, (thread_func_t)uzfs_update_txg_zap_thread, spa, 0, NULL, TS_RUN, 0, PTHREAD_CREATE_DETACHED); mutex_exit(&us->mtx); + } else { + spa->spa_us = us; } } diff --git a/lib/libzpool/zrepl_mgmt.c b/lib/libzpool/zrepl_mgmt.c index 2d0dc84dc7f8..8692ac1d2642 100644 --- a/lib/libzpool/zrepl_mgmt.c +++ b/lib/libzpool/zrepl_mgmt.c @@ -14,6 +14,8 @@ __thread char tinfo[20] = {0}; clockid_t clockid; +void (*zinfo_create_hook)(zvol_info_t *, nvlist_t *); +void (*zinfo_destroy_hook)(zvol_info_t *); SLIST_HEAD(, zvol_info_s) zvol_list; SLIST_HEAD(, zvol_info_s) stale_zv_list; @@ -26,7 +28,7 @@ SLIST_HEAD(, zvol_info_s) stale_zv_list; static int uzfs_zinfo_free(zvol_info_t *zinfo); int -create_and_bind(const char *port, int bind_needed) +create_and_bind(const char *port, int bind_needed, boolean_t nonblock) { int s, sfd; struct addrinfo hints = {0, }; @@ -38,12 +40,16 @@ create_and_bind(const char *port, int bind_needed) s = getaddrinfo(NULL, port, &hints, &result); if (s != 0) { - printf("getaddrinfo failed with error\n"); + perror("getaddrinfo"); return (-1); } for (rp = result; rp != NULL; rp = rp->ai_next) { - sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + int flags = rp->ai_socktype; + + if (nonblock) + flags |= SOCK_NONBLOCK; + sfd = socket(rp->ai_family, flags, rp->ai_protocol); if (sfd == -1) { continue; } @@ -232,7 +238,7 @@ uzfs_zinfo_destroy(const char *name, spa_t *spa) } int -uzfs_zinfo_init(void *zv, const char *ds_name) +uzfs_zinfo_init(void *zv, const char *ds_name, nvlist_t *create_props) { zvol_info_t *zinfo; @@ -253,6 +259,9 @@ uzfs_zinfo_init(void *zv, const char *ds_name) /* Update zvol list */ uzfs_insert_zinfo_list(zinfo); + if (zinfo_create_hook) + (*zinfo_create_hook)(zinfo, create_props); + printf("uzfs_zinfo_init in success path\n"); return (0); } @@ -260,6 +269,9 @@ uzfs_zinfo_init(void *zv, const char *ds_name) static int uzfs_zinfo_free(zvol_info_t *zinfo) { + if (zinfo_destroy_hook) + (*zinfo_destroy_hook)(zinfo); + taskq_destroy(zinfo->uzfs_zvol_taskq); (void) uzfs_zinfo_destroy_mutex(zinfo); ASSERT(STAILQ_EMPTY(&zinfo->complete_queue)); diff --git a/module/zfs/zfs_ioctl.c b/module/zfs/zfs_ioctl.c index 323318157c54..e76f01b37e31 100644 --- a/module/zfs/zfs_ioctl.c +++ b/module/zfs/zfs_ioctl.c @@ -3399,7 +3399,7 @@ zfs_ioc_create(const char *fsname, nvlist_t *innvl, nvlist_t *outnvl) nvlist_free(zct.zct_zplprops); #if !defined(_KERNEL) - (void) uzfs_zvol_create_cb((char *)fsname, NULL); + (void) uzfs_zvol_create_cb((char *)fsname, nvprops); #endif /* * It would be nice to do this atomically. diff --git a/tests/cbtest/gtest/test_zrepl_prot.cc b/tests/cbtest/gtest/test_zrepl_prot.cc index cc3f45f6a9ad..ade32a72ae49 100644 --- a/tests/cbtest/gtest/test_zrepl_prot.cc +++ b/tests/cbtest/gtest/test_zrepl_prot.cc @@ -40,93 +40,196 @@ using namespace GtestUtils; -pid_t start_zrepl() { - std::string zrepl_path = getCmdPath("zrepl"); - pid_t pid; - int i = 0; - - pid = fork(); - if (pid == 0) { - execl(zrepl_path.c_str(), zrepl_path.c_str(), - "start", "-t", "127.0.0.1", NULL); - } - /* wait for zrepl to come up - is there a better way? */ - while (i < 10) { - try { - execCmd("zpool", "list"); - return pid; - } catch (std::runtime_error &) { - sleep(1); - i++; - } +/* + * Return either when the socket is readable or when timeout expires. + */ +static int ready_for_read(int fd, int timeout) { + struct timeval tv = {.tv_sec = timeout, .tv_usec = 0}; + fd_set rfds; + int rc; + + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + + rc = select(fd + 1, &rfds, NULL, NULL, (timeout >= 0) ? &tv : NULL); + if (rc == -1) { + perror("select"); + return (-1); } - throw std::runtime_error( - std::string("Timed out waiting for zrepl to come up")); + return ((rc > 0) ? 1 : 0); } /* - * Listen for incoming connection from replica and return new connection fd. + * zrepl program wrapper. + * + * The main benefits are: + * 1) when zrepl goes out of C++ scope it is automatically terminated, + * 2) special care is taken when starting and stopping the process to + * make sure it is fully operation respectively fully terminated + * to avoid various races. */ -int setup_control_connection() { - struct sockaddr_in addr; - int listenfd, fd; - int opt = 1; - int rc; +class Zrepl { +public: + Zrepl() { + m_pid = 0; + } - listenfd = socket(AF_INET, SOCK_STREAM, 0); - if (listenfd < 0) { - perror("socket"); - return (-1); + ~Zrepl() { + kill(); } - setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, - sizeof (opt)); - memset(&addr, 0, sizeof (addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(INADDR_ANY); - addr.sin_port = htons(TARGET_PORT); - rc = bind(listenfd, (struct sockaddr *) &addr, sizeof (addr)); - if (rc != 0) { - perror("bind"); - close(listenfd); - return (-1); + + void start() { + std::string zrepl_path = getCmdPath("zrepl"); + int i = 0; + + if (m_pid != 0) { + throw std::runtime_error( + std::string("zrepl has been already started")); + } + m_pid = fork(); + if (m_pid == 0) { + execl(zrepl_path.c_str(), zrepl_path.c_str(), + "start", "-t", "127.0.0.1", NULL); + } + /* wait for zrepl to come up - is there a better way? */ + while (i < 10) { + try { + execCmd("zpool", "list"); + return; + } catch (std::runtime_error &) { + sleep(1); + i++; + } + } + throw std::runtime_error( + std::string("Timed out waiting for zrepl to come up")); } - rc = listen(listenfd, 1); - if (rc != 0) { - perror("listen"); - close(listenfd); - return (-1); + + void kill() { + int rc; + + if (m_pid != 0) { + rc = ::kill(m_pid, SIGTERM); + while (rc == 0) { + (void) waitpid(m_pid, NULL, 0); + rc = ::kill(m_pid, 0); + } + m_pid = 0; + } } - fd = accept(listenfd, NULL, NULL); - if (rc < 0) { - perror("accept"); - close(listenfd); + + pid_t m_pid; +}; + +/* + * Object simulating iSCSI target. It has listen and accept methods. + * Listening port is automatically closed when object goes out of scope. + */ +class Target { +public: + Target() { + m_listenfd = -1; + } + + ~Target() { + if (m_listenfd >= 0) { + close(m_listenfd); + m_listenfd = -1; + } + } + + /* + * Listen for incoming connection from replica. + */ + int listen(uint16_t port = TARGET_PORT) { + struct sockaddr_in addr; + int fd; + int opt = 1; + int rc; + + m_listenfd = socket(AF_INET, SOCK_STREAM, 0); + if (m_listenfd < 0) { + perror("socket"); + return (-1); + } + setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, + sizeof (opt)); + memset(&addr, 0, sizeof (addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + addr.sin_port = htons(port); + rc = bind(m_listenfd, (struct sockaddr *) &addr, sizeof (addr)); + if (rc != 0) { + perror("bind"); + close(m_listenfd); + return (-1); + } + rc = ::listen(m_listenfd, 1); + if (rc != 0) { + perror("listen"); + close(m_listenfd); + return (-1); + } + return (m_listenfd); + } + + /* + * Accept new connection from replica and return its FD (timeout is in + * seconds). + */ + int accept(int timeout) { + fd_set rfds; + struct timeval tv = {.tv_sec = timeout, .tv_usec = 0}; + int fd; + int rc; + + FD_ZERO(&rfds); + FD_SET(m_listenfd, &rfds); + + rc = select(m_listenfd + 1, &rfds, NULL, NULL, + (timeout >= 0) ? &tv : NULL); + if (rc == -1) { + perror("select"); + return (-1); + } + if (rc > 0) { + fd = ::accept(m_listenfd, NULL, NULL); + if (rc < 0) { + perror("accept"); + return (-1); + } + return (fd); + } return (-1); } - close(listenfd); - return fd; -} -class TestZvol { + int m_listenfd; +}; + +/* + * Class simplifying test zfs pool creation and creation of zvols on it. + * Automatic pool destruction takes place when object goes out of scope. + */ +class TestPool { public: - TestZvol(std::string poolname) { - pool = poolname; - path = std::string("/tmp/") + pool; - name = pool + "/vol"; + TestPool(std::string poolname) { + m_name = poolname; + m_path = std::string("/tmp/") + m_name; } - ~TestZvol() { + ~TestPool() { try { - execCmd("zpool", std::string("destroy -f ") + pool); + execCmd("zpool", std::string("destroy -f ") + m_name); } catch (std::runtime_error re) { ; } - unlink(path.c_str()); + unlink(m_path.c_str()); } void create() { int fd, rc; - fd = open(path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666); + fd = open(m_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666); if (fd < 0) throw std::system_error(errno, std::system_category(), "Cannot create vdev file"); @@ -136,37 +239,51 @@ class TestZvol { if (rc != 0) throw std::system_error(errno, std::system_category(), "Cannot truncate vdev file"); - execCmd("zpool", std::string("create ") + pool + " " + path); - execCmd("zfs", std::string("create -sV 10m -o volblocksize=4k ") - + name); + execCmd("zpool", std::string("create ") + m_name + " " + + m_path); + } + + void createZvol(std::string name, std::string arg = "") { + execCmd("zfs", + std::string("create -sV 10m -o volblocksize=4k ") + + arg + " " + m_name + "/" + name); } - std::string name; - std::string pool; - std::string path; + void destroyZvol(std::string name) { + execCmd("zfs", std::string("destroy ") + m_name + "/" + name); + } + + std::string getZvolName(std::string name) { + return (m_name + "/" + name); + } + + std::string m_name; + std::string m_path; }; class ZreplHandshakeTest : public testing::Test { protected: /* Shared setup hook for all zrepl handshake tests - called just once */ static void SetUpTestCase() { - m_pid = start_zrepl(); - m_zvol = new TestZvol("handshake"); - m_zvol->create(); + m_zrepl = new Zrepl(); + m_pool = new TestPool("handshake"); + m_zrepl->start(); + m_pool->create(); + m_pool->createZvol("vol1"); + m_zvol_name = m_pool->getZvolName("vol1"); } static void TearDownTestCase() { - delete m_zvol; - if (m_pid > 0) - kill(m_pid, SIGTERM); - } - - ZreplHandshakeTest() { - m_control_fd = -1; + delete m_pool; + delete m_zrepl; } virtual void SetUp() override { - m_control_fd = setup_control_connection(); + int rc; + + rc = m_target.listen(); + ASSERT_GE(rc, 0); + m_control_fd = m_target.accept(-1); ASSERT_GE(m_control_fd, 0); } @@ -175,14 +292,17 @@ class ZreplHandshakeTest : public testing::Test { close(m_control_fd); } - static pid_t m_pid; - static TestZvol *m_zvol; + static Zrepl *m_zrepl; + static TestPool *m_pool; + static std::string m_zvol_name; int m_control_fd; + Target m_target; }; -pid_t ZreplHandshakeTest::m_pid = 0; -TestZvol *ZreplHandshakeTest::m_zvol = nullptr; +Zrepl *ZreplHandshakeTest::m_zrepl = nullptr; +TestPool *ZreplHandshakeTest::m_pool = nullptr; +std::string ZreplHandshakeTest::m_zvol_name = ""; class ZreplDataTest : public testing::Test { protected: @@ -190,25 +310,31 @@ class ZreplDataTest : public testing::Test { static void SetUpTestCase() { zvol_io_hdr_t hdr_out, hdr_in; mgmt_ack_t mgmt_ack; - m_zvol = new TestZvol("handshake"); + Target target; + m_pool = new TestPool("handshake"); + m_zrepl = new Zrepl(); int rc; - m_pid = start_zrepl(); - m_control_fd = setup_control_connection(); - ASSERT_GE(m_control_fd, 0); + m_zrepl->start(); + m_pool->create(); + m_pool->createZvol("vol1"); + m_zvol_name = m_pool->getZvolName("vol1"); - m_zvol->create(); + rc = target.listen(); + ASSERT_GE(rc, 0); + m_control_fd = target.accept(-1); + ASSERT_GE(m_control_fd, 0); hdr_out.version = REPLICA_VERSION; hdr_out.opcode = ZVOL_OPCODE_HANDSHAKE; hdr_out.status = ZVOL_OP_STATUS_OK; hdr_out.io_seq = 0; hdr_out.offset = 0; - hdr_out.len = m_zvol->name.length() + 1; + hdr_out.len = m_zvol_name.length() + 1; rc = write(m_control_fd, &hdr_out, sizeof (hdr_out)); ASSERT_EQ(rc, sizeof (hdr_out)); - rc = write(m_control_fd, m_zvol->name.c_str(), hdr_out.len); + rc = write(m_control_fd, m_zvol_name.c_str(), hdr_out.len); ASSERT_EQ(rc, hdr_out.len); rc = read(m_control_fd, &hdr_in, sizeof (hdr_in)); @@ -220,17 +346,16 @@ class ZreplDataTest : public testing::Test { ASSERT_EQ(hdr_in.len, sizeof (mgmt_ack)); rc = read(m_control_fd, &mgmt_ack, sizeof (mgmt_ack)); ASSERT_EQ(rc, sizeof (mgmt_ack)); - EXPECT_STREQ(mgmt_ack.volname, m_zvol->name.c_str()); + EXPECT_STREQ(mgmt_ack.volname, m_zvol_name.c_str()); m_host = std::string(mgmt_ack.ip); m_port = mgmt_ack.port; } static void TearDownTestCase() { - delete m_zvol; + delete m_pool; if (m_control_fd >= 0) close(m_control_fd); - if (m_pid > 0) - kill(m_pid, SIGTERM); + delete m_zrepl; } ZreplDataTest() { @@ -264,11 +389,11 @@ class ZreplDataTest : public testing::Test { hdr_out.status = ZVOL_OP_STATUS_OK; hdr_out.io_seq = 0; hdr_out.offset = 0; - hdr_out.len = m_zvol->name.length() + 1; + hdr_out.len = m_zvol_name.length() + 1; rc = write(m_data_fd, &hdr_out, sizeof (hdr_out)); ASSERT_EQ(rc, sizeof (hdr_out)); - rc = write(m_data_fd, m_zvol->name.c_str(), hdr_out.len); + rc = write(m_data_fd, m_zvol_name.c_str(), hdr_out.len); ASSERT_EQ(rc, hdr_out.len); } @@ -349,21 +474,23 @@ class ZreplDataTest : public testing::Test { ASSERT_EQ(hdr_inp->offset, offset); } - static pid_t m_pid; static int m_control_fd; static uint16_t m_port; static std::string m_host; - static TestZvol *m_zvol; + static Zrepl *m_zrepl; + static TestPool *m_pool; + static std::string m_zvol_name; int m_data_fd; int m_ioseq; }; -pid_t ZreplDataTest::m_pid = 0; int ZreplDataTest::m_control_fd = -1; uint16_t ZreplDataTest::m_port = 0; std::string ZreplDataTest::m_host = ""; -TestZvol *ZreplDataTest::m_zvol = nullptr; +std::string ZreplDataTest::m_zvol_name = ""; +TestPool *ZreplDataTest::m_pool = nullptr; +Zrepl *ZreplDataTest::m_zrepl = nullptr; TEST_F(ZreplHandshakeTest, HandshakeOk) { zvol_io_hdr_t hdr_out, hdr_in; @@ -376,11 +503,11 @@ TEST_F(ZreplHandshakeTest, HandshakeOk) { hdr_out.status = ZVOL_OP_STATUS_OK; hdr_out.io_seq = 0; hdr_out.offset = 0; - hdr_out.len = m_zvol->name.length() + 1; + hdr_out.len = m_zvol_name.length() + 1; rc = write(m_control_fd, &hdr_out, sizeof (hdr_out)); ASSERT_EQ(rc, sizeof (hdr_out)); - rc = write(m_control_fd, m_zvol->name.c_str(), hdr_out.len); + rc = write(m_control_fd, m_zvol_name.c_str(), hdr_out.len); ASSERT_EQ(rc, hdr_out.len); rc = read(m_control_fd, &hdr_in, sizeof (hdr_in)); @@ -393,12 +520,12 @@ TEST_F(ZreplHandshakeTest, HandshakeOk) { ASSERT_EQ(hdr_in.len, sizeof (mgmt_ack)); rc = read(m_control_fd, &mgmt_ack, sizeof (mgmt_ack)); ASSERT_EQ(rc, sizeof (mgmt_ack)); - EXPECT_STREQ(mgmt_ack.volname, m_zvol->name.c_str()); + EXPECT_STREQ(mgmt_ack.volname, m_zvol_name.c_str()); output = execCmd("zpool", std::string("get guid -Hpo value ") + - m_zvol->pool); + m_pool->m_name); EXPECT_EQ(mgmt_ack.pool_guid, std::stoul(output)); output = execCmd("zfs", std::string("get guid -Hpo value ") + - m_zvol->name); + m_zvol_name); EXPECT_EQ(mgmt_ack.zvol_guid, std::stoul(output)); } @@ -412,7 +539,7 @@ TEST_F(ZreplHandshakeTest, HandshakeWrongVersion) { hdr_out.status = ZVOL_OP_STATUS_OK; hdr_out.io_seq = 0; hdr_out.offset = 0; - hdr_out.len = m_zvol->name.length() + 1; + hdr_out.len = m_zvol_name.length() + 1; /* * It must be set in one chunk so that server does not close the @@ -420,7 +547,7 @@ TEST_F(ZreplHandshakeTest, HandshakeWrongVersion) { */ msg = (char *)malloc(sizeof (hdr_out) + hdr_out.len); memcpy(msg, &hdr_out, sizeof (hdr_out)); - memcpy(msg + sizeof (hdr_out), m_zvol->name.c_str(), hdr_out.len); + memcpy(msg + sizeof (hdr_out), m_zvol_name.c_str(), hdr_out.len); rc = write(m_control_fd, msg, sizeof (hdr_out) + hdr_out.len); ASSERT_EQ(rc, sizeof (hdr_out) + hdr_out.len); free(msg); @@ -571,3 +698,133 @@ TEST_F(ZreplDataTest, ReadBlockWithoutMeta) { offset += sizeof (buf); } } + +/* + * This test has many steps. If it proves to be too complicated, then split it + * into multiple smaller tests. It creates: + * + * 1 zvol with default target IP + * 1 zvol with explicit target IP + * - restart zrepl - + * 1 zvol with default target IP + * 1 zvol with explicit target IP + * destroy all zvols + * + * Verify that zrepl establishes and tears down connections as appropriate. + */ +TEST(TargetIPTest, CreateAndDestroy) { + Zrepl zrepl; + TestPool pool("handshake"); + Target targetImpl, targetExpl; + int fdImpl, fdExpl; + char buf[1]; + int rc; + + zrepl.start(); + pool.create(); + pool.createZvol("implicit1"); + pool.createZvol("explicit1", "-o com.cloudbyte:targetip=127.0.0.1:12345"); + zrepl.kill(); + + rc = targetImpl.listen(); + ASSERT_GE(rc, 0); + rc = targetExpl.listen(12345); + ASSERT_GE(rc, 0); + + zrepl.start(); + + // two new connections (one for each target) + fdImpl = targetImpl.accept(5); + ASSERT_GE(fdImpl, 0); + fdExpl = targetExpl.accept(5); + ASSERT_GE(fdExpl, 0); + + pool.createZvol("implicit2"); + pool.createZvol("explicit2", "-o com.cloudbyte:targetip=127.0.0.1:12345"); + + // no new connections + rc = targetImpl.accept(5); + ASSERT_EQ(rc, -1); + rc = targetExpl.accept(5); + ASSERT_EQ(rc, -1); + + // nothing should happen if we destroy only one of the two zvols + // using the control connection + pool.destroyZvol("implicit1"); + rc = ready_for_read(fdImpl, 5); + ASSERT_EQ(rc, 0); + pool.destroyZvol("explicit1"); + rc = ready_for_read(fdExpl, 5); + ASSERT_EQ(rc, 0); + + // should close the connection + pool.destroyZvol("implicit2"); + rc = ready_for_read(fdImpl, 5); + ASSERT_EQ(rc, 1); + rc = read(fdImpl, buf, sizeof (buf)); + ASSERT_EQ(rc, 0); + close(fdImpl); + + // should close the connection + pool.destroyZvol("explicit2"); + rc = ready_for_read(fdExpl, 5); + ASSERT_EQ(rc, 1); + rc = read(fdExpl, buf, sizeof (buf)); + ASSERT_EQ(rc, 0); + close(fdExpl); +} + +/* + * Test that zrepl will try to reconnect when target restarts. + */ +TEST(TargetIPTest, Reconnect) { + zvol_io_hdr_t hdr_out, hdr_in; + Zrepl zrepl; + TestPool pool("handshake"); + std::string zvolname = pool.getZvolName("reconnect"); + Target target; + int fd; + char buf[1]; + int rc; + + zrepl.start(); + pool.create(); + pool.createZvol("reconnect"); + + // First we test that zrepl connects even if it could not connect + // first couple of times after start + sleep(5); + rc = target.listen(); + ASSERT_GE(rc, 0); + fd = target.accept(5); + ASSERT_GE(fd, 0); + + // Send a simple request to zrepl without waiting for reply + hdr_out.version = REPLICA_VERSION; + hdr_out.opcode = ZVOL_OPCODE_HANDSHAKE; + hdr_out.status = ZVOL_OP_STATUS_OK; + hdr_out.io_seq = 0; + hdr_out.offset = 0; + hdr_out.len = zvolname.length() + 1; + rc = write(fd, &hdr_out, sizeof (hdr_out)); + ASSERT_EQ(rc, sizeof (hdr_out)); + rc = write(fd, zvolname.c_str(), hdr_out.len); + ASSERT_EQ(rc, hdr_out.len); + + // simulate the target restart + close(target.m_listenfd); + target.m_listenfd = -1; + close(fd); + rc = target.listen(); + ASSERT_GE(rc, 0); + fd = target.accept(5); + ASSERT_GE(fd, 0); + + // should close the connection + pool.destroyZvol("reconnect"); + rc = ready_for_read(fd, 5); + ASSERT_EQ(rc, 1); + rc = read(fd, buf, sizeof (buf)); + ASSERT_EQ(rc, 0); + close(fd); +}