-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathzkwatcher.c
executable file
·184 lines (155 loc) · 4.86 KB
/
zkwatcher.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdbool.h>
#include <time.h>
#include "zookeeper/zookeeper.h"
static int
add_children_watch_on(zhandle_t *zh, const char *path, watcher_fn watcher, void *watcherCtx)
{
int ret = 0;
struct String_vector strings;
struct Stat stat;
printf("add_children_watch_on path: %s\n", path);
ret = zoo_wget_children2(zh, path, watcher, watcherCtx, &strings, &stat);
if (ret) {
fprintf(stderr, "zoo_wget_children2 error [%d]\n", ret);
}
return ret;
}
static int
cmpid(const void *p1, const void *p2)
{
return strcmp(* (char * const *) p1, * (char * const *) p2);
}
void zooEvent(int type) {
if (type == ZOO_CREATED_EVENT) {
printf("zoo create event, type[%d]\n", type);
} else if (type == ZOO_DELETED_EVENT) {
printf("zoo deleted event, type[%d]\n", type);
} else if (type == ZOO_CHANGED_EVENT) {
printf("zoo changed event, type[%d]\n", type);
} else if (type == ZOO_CHILD_EVENT) {
printf("zoo child event, type[%d]\n", type);
} else if (type == ZOO_SESSION_EVENT) {
printf("zoo session event, type[%d]\n", type);
} else if (type == ZOO_NOTWATCHING_EVENT) {
printf("zoo notwatching event, type[%d]\n", type);
}
}
void
ccs_children_watcher(zhandle_t* zh, int type, int state,
const char* path, void* watcherCtx)
{
printf("child event happened: type[%d], watcherCtx: %s\n", type, (char *)watcherCtx);
zooEvent(type);
/*
struct Stat {
int64_t czxid;
int64_t mzxid;
int64_t ctime; //use this
int64_t mtime;
int32_t version;
int32_t cversion;
int32_t aversion;
int64_t ephemeralOwner;
int32_t dataLength;
int32_t numChildren;
int64_t pzxid;
};
*/
int ret = 0;
struct String_vector strings;
struct Stat stat;
ret = zoo_wget_children2(zh, "/zkwatcher", ccs_children_watcher, watcherCtx, &strings, &stat);
if (ret) {
fprintf(stderr, "child: zoo_wget_children2 error [%d]\n", ret);
return;
}
if (strings.count == 0) return;
/* only care child event */
//if (type != ZOO_CHILD_EVENT) return;
/* routine for item creating */
char* *p = NULL;
p = malloc(strings.count * sizeof(char*));
if (p == NULL) {
fprintf(stderr, "child: malloc error\n");
return;
}
memset(p, 0, strings.count * sizeof(char*));
for (int i = 0; i < strings.count; i++) {
p[i] = strings.data[i];
}
qsort(&p[0], strings.count, sizeof(p[0]), cmpid);
/*
for (int i = 0; i < strings.count; i++) {
puts(p[i]);
printf("puts %d:%s\n", i, p[i]);
}
*/
for (int i = 0; i < strings.count; i++) {
char childpath[128] = {0};
char value[128] = {0};
int value_len = sizeof(value);
struct Stat item_stat;
sprintf(childpath, "/zkwatcher/%s", p[i]);
ret = zoo_wget(zh, childpath, ccs_children_watcher, childpath, value, &value_len, &stat);
if (ret) {
printf("zoo_wget %s error\n", childpath);
} else {
printf("zoo_wget %s:%s\n", childpath, value);
}
}
free(p);
return;
}
int create_root(zhandle_t *zkhandle, const char *node_name, const char *data) {
int ret;
struct Stat stat;
char node[512] = {0};
printf("zoo_create /zkwatcher node\n");
ret = zoo_exists(zkhandle, node_name, true, &stat);
if (ret == ZOK) {
printf("create_root %s already create\n", node_name);
} else if (ret == ZNONODE){
ret = zoo_create(zkhandle,
node_name,
data,
sizeof(data),
&ZOO_OPEN_ACL_UNSAFE, /* a completely open ACL */
0,
node,
sizeof(node)-1);
if (ret) {
fprintf(stderr, "zoo_create error [%d]\n", ret);
}
} else {
printf("create_root error\n");
}
return ret;
}
int
main(int argc, const char *argv[])
{
const char* host = "127.0.0.1:9001, 127.0.0.1:9002, 127.0.0.1:9003";
zhandle_t* zkhandle;
int timeout = 5000;
int ret = 0;
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
zkhandle = zookeeper_init(host, NULL, timeout,
0, "Zookeeper examples: config center services", 0);
if (zkhandle == NULL) {
fprintf(stderr, "Connecting to zookeeper servers error...\n");
exit(EXIT_FAILURE);
}
create_root(zkhandle, "/zkwatcher", "hello");
printf("Start watcher /zkwatcher children node\n");
ret = add_children_watch_on(zkhandle, "/zkwatcher", ccs_children_watcher, "/zkwatcher");
if (ret) {
fprintf(stderr, "add_children_watch_on error [%d]\n", ret);
exit(EXIT_FAILURE);
}
sleep(50000); // only for experiments
zookeeper_close(zkhandle);
}