Skip to content

Commit

Permalink
Add info about tasks in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
sgaragagghu committed Dec 22, 2018
1 parent e1a923f commit 9818b49
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 4 deletions.
17 changes: 17 additions & 0 deletions client/commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ void processMinerCommand(char *nextParam, FILE *out);
void processMinersCommand(char *nextParam, FILE *out);
void processMiningCommand(char *nextParam, FILE *out);
void processNetCommand(char *nextParam, FILE *out);
void processTransportCommand(char *nextParam, FILE *out);
void processPoolCommand(char *nextParam, FILE *out);
void processStatsCommand(FILE *out);
void processInternalStatsCommand(FILE *out);
Expand Down Expand Up @@ -86,6 +87,7 @@ int xdag_com_miner(char *, FILE*);
int xdag_com_miners(char *, FILE*);
int xdag_com_mining(char *, FILE*);
int xdag_com_net(char *, FILE*);
int xdag_com_transport(char *, FILE*);
int xdag_com_pool(char *, FILE*);
int xdag_com_stats(char *, FILE*);
int xdag_com_state(char *, FILE*);
Expand Down Expand Up @@ -115,6 +117,7 @@ XDAG_COMMAND commands[] = {
{ "miners" , 2, xdag_com_miners },
{ "mining" , 1, xdag_com_mining },
{ "net" , 0, xdag_com_net },
{ "transport" , 0, xdag_com_transport },
{ "pool" , 2, xdag_com_pool },
{ "run" , 0, xdag_com_run },
{ "state" , 0, xdag_com_state },
Expand Down Expand Up @@ -197,6 +200,12 @@ int xdag_com_net(char * args, FILE* out)
return 0;
}

int xdag_com_transport(char * args, FILE* out)
{
processTransportCommand(args, out);
return 0;
}

int xdag_com_pool(char * args, FILE* out)
{
processPoolCommand(args, out);
Expand Down Expand Up @@ -492,6 +501,14 @@ void processNetCommand(char *nextParam, FILE *out)
xdag_net_command(netcmd, out);
}

void processTransportCommand(char *nextParam, FILE *out)
{
char *cmd = strtok_r(nextParam, " \t\r\n", &nextParam);
if(cmd != NULL && !strcmp(cmd, "info")) {
xdag_print_transport_task_info(out);
}
}

void processRPCCommand(char *nextParam, FILE *out)
{
char *cmd = NULL;
Expand Down
48 changes: 46 additions & 2 deletions client/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <inttypes.h>
#include "transport.h"
#include "storage.h"
#include "block.h"
Expand Down Expand Up @@ -33,12 +34,24 @@ static int reply_rcvd;
static uint64_t reply_id_private;
static int64_t reply_result;
static void *xdag_update_rip_thread(void *);
static struct xdag_task_info *g_task_info;

struct xdag_send_data {
struct xdag_block b;
struct send_parameters send_parameters;
};

struct xdag_task_info {
uint8_t task_type;
uint32_t block_req_counter;
};

enum task_type {
TASK_REQBLOCKS = 0x01,
TASK_REQBLOCKS_THREAD = 0x02,
TASK_REQSUM = 0x04
};

#define add_main_timestamp(a) ((a)->main_time = xdag_get_frame())

static void *xdag_send_thread(void *arg)
Expand All @@ -56,8 +69,9 @@ static void *xdag_send_thread(void *arg)

dnet_send_xdag_packet(&d->b, &d->send_parameters);

g_task_info[dnet_get_nconnection(d->send_parameters.connection)].task_type &= ~(TASK_REQBLOCKS | TASK_REQBLOCKS_THREAD);
free(d);

return 0;
}

Expand Down Expand Up @@ -101,9 +115,11 @@ static int process_transport_block(struct xdag_block *received_block, void *conn
send_data->send_parameters = (struct send_parameters){connection, time(NULL) + REQUEST_WAIT, 0, 0};

if(received_block->field[0].end_time - received_block->field[0].time <= REQUEST_BLOCKS_MAX_TIME) {
g_task_info[dnet_get_nconnection(connection)].task_type |= TASK_REQBLOCKS;
xdag_send_thread(send_data);
}
else {
g_task_info[dnet_get_nconnection(connection)].task_type |= TASK_REQBLOCKS_THREAD;
pthread_t t;
int err = pthread_create(&t, 0, xdag_send_thread, send_data);
if(err != 0) {
Expand Down Expand Up @@ -141,6 +157,7 @@ static int process_transport_block(struct xdag_block *received_block, void *conn

case XDAG_MESSAGE_SUMS_REQUEST:
{
g_task_info[dnet_get_nconnection(connection)].task_type |= TASK_REQSUM;
received_block->field[0].type = XDAG_FIELD_NONCE | XDAG_MESSAGE_SUMS_REPLY << 4;
received_block->field[0].time = xdag_load_sums(received_block->field[0].time, received_block->field[0].end_time,
(struct xdag_storage_sum *)&received_block->field[8]);
Expand All @@ -151,7 +168,7 @@ static int process_transport_block(struct xdag_block *received_block, void *conn
6 * sizeof(struct xdag_field) - sizeof(struct xdag_stats));
struct send_parameters send_parameters = {connection, time(NULL) + REQUEST_WAIT, 0, 0};
dnet_send_xdag_packet(received_block, &send_parameters);

g_task_info[dnet_get_nconnection(connection)].task_type &= ~TASK_REQSUM;
break;
}

Expand Down Expand Up @@ -189,6 +206,8 @@ static int process_transport_block(struct xdag_block *received_block, void *conn
dnet_send_xdag_packet(blk, &send_parameters);
}

++g_task_info[dnet_get_nconnection(connection)].block_req_counter;

break;
}

Expand Down Expand Up @@ -236,6 +255,8 @@ static int conn_open_check(uint32_t ip, uint16_t port)

static void conn_close_notify(void *conn)
{
g_task_info[dnet_get_nconnection(conn)] = (struct xdag_task_info){0};

if (reply_connection == conn)
reply_connection = 0;
}
Expand Down Expand Up @@ -300,6 +321,11 @@ int xdag_transport_start(int flags, int nthreads, const char *bindto, int npairs
return -1;
}

g_task_info = calloc(sizeof(struct xdag_task_info), dnet_get_maxconnections());
if (g_task_info == NULL) {
return -1;
}

return res;
}

Expand Down Expand Up @@ -464,3 +490,21 @@ static void *xdag_update_rip_thread(void *arg)
return 0;
}

static void *print_callback(void *file, void* conn) {
char buf[32];
dnet_stringify_conn_info(buf, sizeof(buf), conn);
size_t len = strlen(buf);
fprintf((FILE*)file, "%s %*s %33s %31s %21" PRIu32 "\n",
buf, (int)(sizeof(buf) + 4 - len), ((g_task_info[dnet_get_nconnection(conn)].task_type & TASK_REQBLOCKS) ? "yes" : "no"),
((g_task_info[dnet_get_nconnection(conn)].task_type & TASK_REQBLOCKS_THREAD) ? "yes" : "no"),
((g_task_info[dnet_get_nconnection(conn)].task_type & TASK_REQSUM) ? "yes" : "no"), g_task_info[dnet_get_nconnection(conn)].block_req_counter);
return NULL;
}

void xdag_print_transport_task_info(FILE *f) {
fprintf(f, "---------------------------------------------------------------------------------------------------------------------------------------\n"
" [ip:port] [Processing blocks request] [Processing blocks request w/ thread] [Processing sum request] [Blocks requested]\n");
dnet_for_each_conn(&print_callback, f);
fprintf(f, "---------------------------------------------------------------------------------------------------------------------------------------\n");
}

3 changes: 3 additions & 0 deletions client/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ extern int xdag_send_packet(struct xdag_block *b, void *conn, int broadcast);
/* see dnet_user_crypt_action */
extern int xdag_user_crypt_action(unsigned *data, unsigned long long data_id, unsigned size, int action);

// print tasks info for each connection
extern void xdag_print_transport_task_info(FILE *f);

extern pthread_mutex_t g_transport_mutex;
extern atomic_uint_least64_t g_xdag_last_received;

Expand Down
13 changes: 13 additions & 0 deletions dnet/dnet_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ struct send_parameters {
uint8_t time_to_live;
};

struct xconnection;

#ifdef __cplusplus
extern "C" {
#endif
Expand Down Expand Up @@ -51,6 +53,17 @@ extern void (*dnet_connection_close_notify)(void *conn);
*/
extern int dnet_user_crypt_action(unsigned *data, unsigned long long data_id, unsigned size, int action);

// get number of maximum nuber of connections
uint64_t dnet_get_maxconnections(void);

// get the representative "number" of the connection
long dnet_get_nconnection(struct xconnection*);

// get a string that contains connection info
void dnet_stringify_conn_info(char *buf, size_t size, struct xconnection *conn);

// executes callback for each connection
void dnet_for_each_conn(void *(*callback)(void*, void*), void* data);
#ifdef __cplusplus
};
#endif
Expand Down
35 changes: 33 additions & 2 deletions dnet/dnet_xdag.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ void *dnet_send_xdag_packet(void *block, void *data)
buf->head.crc32 = 0;
buf->head.crc32 = crc_of_array(buf->byte, SECTOR_SIZE);
if(conn != NULL) {
long nconn = (struct xconnection *)((uintptr_t)conn & ~(uintptr_t)1) - g_connections;
long nconn = dnet_get_nconnection(conn);
buf->head.length = (uint16_t)nconn;
buf->head.type = nconn >> 16;
}
Expand Down Expand Up @@ -860,7 +860,7 @@ int dnet_execute_command(const char *cmd, void *fileout)
"Connection list:\n");
for(i = 0; i < g_nthreads; ++i) for(j = 0; j < g_threads[i].nconnections; ++j) {
conn = g_threads[i].conn[j];
sprintf(buf, "%d.%d.%d.%d:%d", conn->ip & 0xFF, conn->ip >> 8 & 0xFF, conn->ip >> 16 & 0xFF, conn->ip >> 24 & 0xFF, conn->port);
dnet_stringify_conn_info(buf, sizeof(buf), conn);
len = strlen(buf);
fprintf(f, " %2d. %s%*s%d sec, [in/out] - %lld/%lld bytes, %lld/%lld packets, %lld/%lld dropped\n",
count++, buf, 24 - len, "", (int)(time(0) - conn->created),
Expand Down Expand Up @@ -909,3 +909,34 @@ static void dnet_help(FILE *fileout)
" connect ip:port - connect to this host\n"
" help - print this help\n");
}

inline uint64_t dnet_get_maxconnections(void)
{
return MAX_CONNECTIONS_PER_THREAD * g_nthreads;
}

inline long dnet_get_nconnection(struct xconnection* connection)
{
long nconn = connection - g_connections;
if (nconn >= dnet_get_maxconnections()) {
dnet_err("out of bounds nconnection requested [function: dnet_get_nconnection]");
nconn = 0;
}
return nconn;
}

inline void dnet_stringify_conn_info(char *buf, size_t size, struct xconnection *conn)
{
snprintf(buf, size, "%d.%d.%d.%d:%d", conn->ip & 0xFF, conn->ip >> 8 & 0xFF,
conn->ip >> 16 & 0xFF, conn->ip >> 24 & 0xFF, conn->port);
}

void dnet_for_each_conn(void *(*callback)(void*, void*), void* data)
{
if (callback != NULL) {
for (int i = 0; i < g_nthreads; ++i) for(int j = 0; j < g_threads[i].nconnections; ++j) {
callback(data, g_threads[i].conn[j]);
}
}
}

0 comments on commit 9818b49

Please sign in to comment.