Skip to content

Commit

Permalink
Merge pull request #481 from sgaragagghu/time_limit
Browse files Browse the repository at this point in the history
Add expire time to packets in queue to be sent
  • Loading branch information
jonano614 authored Dec 14, 2018
2 parents c96c92d + 403c5e6 commit 8f91015
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 63 deletions.
29 changes: 15 additions & 14 deletions client/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ static void *xdag_update_rip_thread(void *);

struct xdag_send_data {
struct xdag_block b;
void *connection;
struct send_parameters send_parameters;
};

#define add_main_timestamp(a) ((a)->main_time = xdag_get_frame())
Expand All @@ -45,7 +45,7 @@ static void *xdag_send_thread(void *arg)
{
struct xdag_send_data *d = (struct xdag_send_data *)arg;

d->b.field[0].time = xdag_load_blocks(d->b.field[0].time, d->b.field[0].end_time, d->connection, &dnet_send_xdag_packet);
d->b.field[0].time = xdag_load_blocks(d->b.field[0].time, d->b.field[0].end_time, &d->send_parameters, &dnet_send_xdag_packet);
d->b.field[0].type = XDAG_FIELD_NONCE | XDAG_MESSAGE_BLOCKS_REPLY << 4;

memcpy(&d->b.field[2], &g_xdag_stats, sizeof(g_xdag_stats));
Expand All @@ -54,7 +54,7 @@ static void *xdag_send_thread(void *arg)
xdag_netdb_send((uint8_t*)&d->b.field[2] + sizeof(struct xdag_stats),
14 * sizeof(struct xdag_field) - sizeof(struct xdag_stats));

dnet_send_xdag_packet(&d->b, d->connection);
dnet_send_xdag_packet(&d->b, &d->send_parameters);

free(d);

Expand Down Expand Up @@ -98,8 +98,7 @@ static int process_transport_block(struct xdag_block *received_block, void *conn
if(!send_data) return -1;

memcpy(&send_data->b, received_block, sizeof(struct xdag_block));

send_data->connection = connection;
send_data->send_parameters = (struct send_parameters){connection, time(NULL) + REQUEST_WAIT};

if(received_block->field[0].end_time - received_block->field[0].time <= REQUEST_BLOCKS_MAX_TIME) {
xdag_send_thread(send_data);
Expand Down Expand Up @@ -150,8 +149,8 @@ static int process_transport_block(struct xdag_block *received_block, void *conn

xdag_netdb_send((uint8_t*)&received_block->field[2] + sizeof(struct xdag_stats),
6 * sizeof(struct xdag_field) - sizeof(struct xdag_stats));

dnet_send_xdag_packet(received_block, connection);
struct send_parameters send_parameters = {connection, time(NULL) + REQUEST_WAIT};
dnet_send_xdag_packet(received_block, &send_parameters);

break;
}
Expand Down Expand Up @@ -182,10 +181,12 @@ static int process_transport_block(struct xdag_block *received_block, void *conn
xtime_t t;
int64_t pos = xdag_get_block_pos(received_block->field[1].hash, &t, &buf);

struct send_parameters send_parameters = {connection, time(NULL) + REQUEST_WAIT};

if (pos == -2l) {
dnet_send_xdag_packet(&buf, connection);
dnet_send_xdag_packet(&buf, &send_parameters);
} else if (pos >= 0 && (blk = xdag_storage_load(received_block->field[1].hash, t, pos, &buf))) {
dnet_send_xdag_packet(blk, connection);
dnet_send_xdag_packet(blk, &send_parameters);
}

break;
Expand Down Expand Up @@ -342,13 +343,13 @@ static int do_request(int type, xtime_t start_time, xtime_t end_time, void *data
reply_callback = callback;

if (type == XDAG_MESSAGE_SUMS_REQUEST) {
reply_connection = dnet_send_xdag_packet(&b, 0);
reply_connection = dnet_send_xdag_packet(&b, &(struct send_parameters){0});
if (!reply_connection) {
pthread_mutex_unlock(&g_process_mutex);
return 0;
}
} else {
dnet_send_xdag_packet(&b, reply_connection);
dnet_send_xdag_packet(&b, &(struct send_parameters){reply_connection, 0});
}

time(&actual_time);
Expand Down Expand Up @@ -395,7 +396,7 @@ int xdag_request_sums(xtime_t start_time, xtime_t end_time, struct xdag_storage_
int xdag_send_new_block(struct xdag_block *b)
{
if(is_pool()) {
dnet_send_xdag_packet(b, (void*)(uintptr_t)NEW_BLOCK_TTL);
dnet_send_xdag_packet(b, &(struct send_parameters){(void*)(uintptr_t)NEW_BLOCK_TTL, 0});
} else {
xdag_send_block_via_pool(b);
}
Expand All @@ -415,7 +416,7 @@ int xdag_send_packet(struct xdag_block *b, void *conn)
conn = (void*)(uintptr_t)1l;
}

dnet_send_xdag_packet(b, conn);
dnet_send_xdag_packet(b, &(struct send_parameters){conn, 0});

return 0;
}
Expand All @@ -439,7 +440,7 @@ int xdag_request_block(xdag_hash_t hash, void *conn)
conn = (void*)(uintptr_t)1l;
}

dnet_send_xdag_packet(&b, conn);
dnet_send_xdag_packet(&b, &(struct send_parameters){conn, 0});

return 0;
}
Expand Down
5 changes: 5 additions & 0 deletions dnet/dnet_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
#ifndef DNET_MAIN_H_INCLUDED
#define DNET_MAIN_H_INCLUDED

struct send_parameters {
void *connection;
time_t time_limit;
};

#ifdef __cplusplus
extern "C" {
#endif
Expand Down
121 changes: 72 additions & 49 deletions dnet/dnet_xdag.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,22 @@

extern int g_xdag_sync_on;

#define XSECTOR \
union { \
uint8_t byte[SECTOR_SIZE]; \
uint32_t word[SECTOR_SIZE / sizeof(uint32_t)]; \
struct xsector_extended *next; \
struct dnet_packet_header head; \
};


struct xsector {
union {
uint8_t byte[SECTOR_SIZE];
uint32_t word[SECTOR_SIZE / sizeof(uint32_t)];
struct xsector *next;
struct dnet_packet_header head;
};
XSECTOR
};

struct xsector_extended {
XSECTOR
time_t time_limit;
};

struct xdnet_keys {
Expand All @@ -83,7 +92,7 @@ asm(
#endif

static struct dfslib_crypt *g_crypt;
static struct xsector *g_common_queue;
static struct xsector_extended *g_common_queue;
static uint64_t g_common_queue_pos, g_common_queue_reserve;
static int(*g_arrive_callback)(void *block, void *connection_from) = 0;
int(*dnet_connection_open_check)(uint32_t ip, uint16_t port) = 0;
Expand All @@ -98,15 +107,15 @@ struct xcrypt {

struct xpartbuf {
struct xsector read;
struct xsector write;
struct xsector_extended write;
int readlen;
int writelen;
};

struct xconnection {
pthread_mutex_t mutex;
struct xcrypt *crypt;
struct xsector *first_outbox, *last_outbox;
struct xsector_extended *first_outbox, *last_outbox;
struct xpartbuf *part;
uint64_t packets_in;
uint64_t packets_out;
Expand Down Expand Up @@ -237,7 +246,7 @@ int dnet_test_connection(void *connection)
static int close_connection(struct xconnection *conn, int error, const char *mess)
{
int nconn = conn - g_connections, nthread = nconn / MAX_CONNECTIONS_PER_THREAD, nfd, fd;
struct xsector *xs, *xs1;
struct xsector_extended *xse, *xse_tmp;
struct xthread *t = g_threads + nthread;
if(nconn < 0 || nconn >= g_nthreads * MAX_CONNECTIONS_PER_THREAD || conn != g_connections + nconn) {
return -1;
Expand Down Expand Up @@ -270,14 +279,14 @@ static int close_connection(struct xconnection *conn, int error, const char *mes
}
pthread_mutex_unlock(&t->mutex);
pthread_mutex_lock(&conn->mutex);
xs = conn->first_outbox;
xse = conn->first_outbox;
conn->first_outbox = conn->last_outbox = 0;
conn->out_queue_size = 0;
pthread_mutex_unlock(&conn->mutex);
while(xs) {
xs1 = xs->next;
free(xs);
xs = xs1;
while(xse) {
xse_tmp = xse->next;
free(xse);
xse = xse_tmp;
}
dnet_info("DNET : closed with %d.%d.%d.%d:%d, nthread=%d, nfd=%d, fd=%d, conn=%p, "
"in/out/drop=%ld/%ld/%ld, time=%ld, last_time=%d, err=%x, mess=%s",
Expand All @@ -296,10 +305,16 @@ static int close_connection(struct xconnection *conn, int error, const char *mes
return 0;
}

void *dnet_send_xdag_packet(void *block, void *connection_to)
void *dnet_send_xdag_packet(void *block, void *data)
{
struct xconnection *conn = (struct xconnection *)connection_to;
struct xsector *buf;
struct send_parameters *sp = data;
struct xconnection *conn = sp->connection;
struct xsector_extended *buf;

if (sp->time_limit && time(NULL) > sp->time_limit) {
return (void*)(intptr_t)-1;
}

if(!conn) {
int i, n, sum = 0, steps = 2 * g_nthreads;
for(i = 0; i < g_nthreads; ++i) {
Expand Down Expand Up @@ -333,6 +348,7 @@ void *dnet_send_xdag_packet(void *block, void *connection_to)
return 0;
}
buf = g_common_queue + ((ldus_atomic64_inc_return(&g_common_queue_reserve) - 1) & (COMMON_QUEUE_SIZE - 1));
buf->time_limit = sp->time_limit;
memcpy(buf->word, block, SECTOR_SIZE);
buf->head.type = DNET_PKT_XDAG;
if((uintptr_t)conn < 256) {
Expand All @@ -351,11 +367,12 @@ void *dnet_send_xdag_packet(void *block, void *connection_to)
ldus_atomic64_inc_return(&g_common_queue_pos);
return 0;
}
buf = malloc(SECTOR_SIZE);
buf = malloc(sizeof(struct xsector_extended));
if(!buf) {
dnet_err("malloc failed. size : %ld", SECTOR_SIZE);
return 0;
}
buf->time_limit = sp->time_limit;
memcpy(buf->word, block, SECTOR_SIZE);
buf->next = 0;
pthread_mutex_lock(&conn->mutex);
Expand All @@ -367,7 +384,7 @@ void *dnet_send_xdag_packet(void *block, void *connection_to)
conn->last_outbox = buf;
conn->out_queue_size++;
pthread_mutex_unlock(&conn->mutex);
return connection_to ? 0 : conn;
return sp->connection ? 0 : conn;
}

int dnet_set_xdag_callback(int(*callback)(void *block, void *connection_from))
Expand All @@ -381,6 +398,7 @@ static void *xthread_main(void *arg)
long nthread = (long)arg;
struct xthread *t = g_threads + nthread;
struct xsector buf, *xs;
struct xsector_extended bufe, *xse;
int res, ttl, size, err;
while(t->nconnections == 0) {
sleep(1);
Expand Down Expand Up @@ -475,7 +493,7 @@ static void *xthread_main(void *arg)
}
if(res > 0 && ttl > 2) {
xs->head.ttl = ttl;
dnet_send_xdag_packet(xs->word, (void *)((uintptr_t)conn | 1));
dnet_send_xdag_packet(xs->word, &(struct send_parameters){(void *)((uintptr_t)conn | 1), 0});
}
} else {
if(!conn->crypt) {
Expand Down Expand Up @@ -508,63 +526,68 @@ static void *xthread_main(void *arg)
if(t->fds[n].revents & POLLOUT) {
size = SECTOR_SIZE;
if(conn->part && conn->part->writelen) {
xs = &conn->part->write;
xse = &conn->part->write;
size = conn->part->writelen;
} else if(conn->packets_out >= FIRST_NSECTORS) {
if(conn->out_queue_size && (conn->packets_out & 1
|| conn->common_queue_pos == g_common_queue_pos)) {
pthread_mutex_lock(&conn->mutex);
if(conn->out_queue_size) {
xs = conn->first_outbox;
conn->first_outbox = xs->next;
xse = conn->first_outbox;
conn->first_outbox = xse->next;
if(!conn->first_outbox) {
conn->last_outbox = 0;
}
conn->out_queue_size--;
} else {
xs = 0;
xse = 0;
}
pthread_mutex_unlock(&conn->mutex);
if(!xs) {
if(!xse) {
continue;
}
xs->head.type = DNET_PKT_XDAG;
xs->head.ttl = 1;
xs->head.length = SECTOR_SIZE;
xs->head.crc32 = 0;
xs->head.crc32 = crc_of_array(xs->byte, SECTOR_SIZE);
if (xse->time_limit && time(NULL) > xse->time_limit) {
free(xse);
continue;
}
xse->head.type = DNET_PKT_XDAG;
xse->head.ttl = 1;
xse->head.length = SECTOR_SIZE;
xse->head.crc32 = 0;
xse->head.crc32 = crc_of_array(xse->byte, SECTOR_SIZE);
} else if(conn->common_queue_pos < g_common_queue_pos) {
if(g_common_queue_pos - conn->common_queue_pos > COMMON_QUEUE_SIZE) {
conn->common_queue_pos = g_common_queue_pos - COMMON_QUEUE_SIZE;
}
memcpy(buf.word, &g_common_queue[conn->common_queue_pos & (COMMON_QUEUE_SIZE - 1)],
SECTOR_SIZE);
xs = &buf;
memcpy(&bufe, &g_common_queue[conn->common_queue_pos & (COMMON_QUEUE_SIZE - 1)],
sizeof(struct xsector_extended));
xse = &bufe;
conn->common_queue_pos++;
if(conn - g_connections == (xs->head.length | xs->head.type << 16)) {
if(conn - g_connections == (xse->head.length | xse->head.type << 16) ||
(xse->time_limit && time(NULL) > xse->time_limit)) {
continue;
}
xs->head.type = DNET_PKT_XDAG;
xs->head.length = SECTOR_SIZE;
xse->head.type = DNET_PKT_XDAG;
xse->head.length = SECTOR_SIZE;
} else if(conn->crypt && time(0) >= conn->crypt->last_sent + OLD_DNET_TIMEOUT_SEC) {
memset(&buf, 0, SECTOR_SIZE);
xs = &buf;
memset(&bufe, 0, SECTOR_SIZE);
xse = &bufe;
} else {
t->fds[n].events &= ~POLLOUT;
continue;
}
dfslib_encrypt_sector(g_crypt, xs->word, conn->packets_out - FIRST_NSECTORS + 1);
dfslib_encrypt_sector(g_crypt, xse->word, conn->packets_out - FIRST_NSECTORS + 1);
} else if(conn->packets_out < FIRST_NSECTORS - 1 || (!conn->crypt && conn->packets_in)) {
xs = ((struct xsector *)&g_xkeys.pub) + conn->packets_out;
xse = (struct xsector_extended*)(((struct xsector *)&g_xkeys.pub) + conn->packets_out);
} else if(conn->crypt && conn->packets_in >= FIRST_NSECTORS - 1) {
memcpy(buf.word, g_xkeys.sect0.word, SECTOR_SIZE);
xs = &buf;
dfsrsa_crypt(xs->word, SECTOR_SIZE / sizeof(dfsrsa_t), conn->crypt->pub.key, DNET_KEYLEN);
memcpy(bufe.word, g_xkeys.sect0.word, SECTOR_SIZE);
xse = &bufe;
dfsrsa_crypt(xse->word, SECTOR_SIZE / sizeof(dfsrsa_t), conn->crypt->pub.key, DNET_KEYLEN);
} else {
continue;
}

res = write(t->fds[n].fd, xs->byte + SECTOR_SIZE - size, size);
res = write(t->fds[n].fd, xse->byte + SECTOR_SIZE - size, size);
if(conn->part && conn->part->writelen) {
if(res <= 0) {
err = res << 4 | 7;
Expand All @@ -584,11 +607,11 @@ static void *xthread_main(void *arg)
}
if(conn->part) {
conn->part->writelen = size - res;
memcpy(&conn->part->write, xs->byte, SECTOR_SIZE);
memcpy(&conn->part->write, xse->byte, SECTOR_SIZE);
}
}
if(conn->packets_out >= FIRST_NSECTORS && xs != &buf) {
free(xs);
if(conn->packets_out >= FIRST_NSECTORS && xse != &bufe) {
free(xse);
}
if(res != size) {
if(res > 0 && conn->part) {
Expand Down Expand Up @@ -770,7 +793,7 @@ int dnet_init(int argc, char **argv)
g_nthreads = nthreads;
g_threads = calloc(sizeof(struct xthread), nthreads);
g_connections = calloc(sizeof(struct xconnection), nthreads * MAX_CONNECTIONS_PER_THREAD);
g_common_queue = malloc(COMMON_QUEUE_SIZE * SECTOR_SIZE);
g_common_queue = malloc(COMMON_QUEUE_SIZE * sizeof(struct xsector_extended));
g_crypt = malloc(sizeof(struct dfslib_crypt));

if(!g_threads || !g_connections || !g_common_queue || !g_crypt) {
Expand Down

0 comments on commit 8f91015

Please sign in to comment.