Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: 1.0.0 API #40

Merged
merged 20 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 89 additions & 77 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,19 @@ CeresDB is a high-performance, distributed, schema-less, cloud native time-serie

## Init CeresDB client
```java
// CeresDB options
final CeresDBOptions opts = CeresDBOptions.newBuilder("127.0.0.1", 8831) //// CeresDB default grpc port 8831
final CeresDBOptions opts = CeresDBOptions.newBuilder("127.0.0.1", 8831, DIRECT) // CeresDB default grpc port 8831,use DIRECT RouteMode
.tenant("public", "sub_test", "test_token") // tenant info
// maximum retry times when write fails
// (only some error codes will be retried, such as the routing table failure)
.writeMaxRetries(1)
// maximum retry times when read fails
// (only some error codes will be retried, such as the routing table failure)
.readMaxRetries(1)
.build();
.readMaxRetries(1).build();

final CeresDBClient client = new CeresDBClient();
if (!client.init(this.opts)) {
if (!client.init(opts)) {
throw new IllegalStateException("Fail to start CeresDBClient");
}
}
```
For more configuration options, see [configuration](docs/configuration.md)

Expand All @@ -132,105 +130,119 @@ CeresDB is a Schema-less time-series database, so creating table schema ahead of
The following table creation statement(using the SQL API included in SDK )shows all field types supported by CeresDB:

```java
SqlResult result = client.management().executeSql("CREATE TABLE MY_FIRST_TABL(" +
"ts TIMESTAMP NOT NULL," +
"c1 STRING TAG NOT NULL," +
"c2 STRING TAG NOT NULL," +
"c3 DOUBLE NULL," +
"c4 STRING NULL," +
"c5 INT64 NULL," +
"c6 FLOAT NULL," +
"c7 INT32 NULL," +
"c8 INT16 NULL," +
"c9 INT8 NULL," +
"c10 BOOLEAN NULL,"
"c11 UINT64 NULL,"
"c12 UINT32 NULL,"
"c13 UINT16 NULL,"
"c14 UINT8 NULL,"
"c15 TIMESTAMP NULL,"
"c16 VARBINARY NULL,"
"TIMESTAMP KEY(ts)) ENGINE=Analytic"
);
// Create table manually, creating table schema ahead of data ingestion is not required
String createTableSql = "CREATE TABLE IF NOT EXISTS machine_table(" + "ts TIMESTAMP NOT NULL," + //
"ts TIMESTAMP NOT NULL," +
"city STRING TAG NOT NULL," +
"ip STRING TAG NOT NULL," +
"cpu DOUBLE NULL," +
"mem DOUBLE NULL," +
"TIMESTAMP KEY(ts)" + // timestamp column must be specified
") ENGINE=Analytic";

Result<SqlQueryOk, Err> createResult = client.sqlQuery(new SqlQueryRequest(createTableSql)).get();
if (!createResult.isOk()) {
throw new IllegalStateException("Fail to create table");
}
```

## Write data example
```java
final long t0 = System.currentTimeMillis();
final long t1 = t0 + 1000;
final long t2 = t1 + 1000;
final Rows data = Series.newBuilder("machine_metric")
.tag("city", "Singapore")
.tag("ip", "127.0.0.1")
.toRowsBuilder()
// codes below organizes 3 lines data (3 timestamps) for the `cpu` and `mem` column, this will just transport once through network. CeresDB encourage practices like this, because the SDK could use efficient compression algorithm to reduce network traffic and also be friendly to the sever side.
.field(t0, "cpu", FieldValue.withDouble(0.23)) // first row, first column
.field(t0, "mem", FieldValue.withDouble(0.55)) // first row, second column
.field(t1, "cpu", FieldValue.withDouble(0.25)) // second row, first column
.field(t1, "mem", FieldValue.withDouble(0.56)) // second row, second column
.field(t2, "cpu", FieldValue.withDouble(0.21)) // third row, first column
.field(t2, "mem", FieldValue.withDouble(0.52)) // third row, second column
final List<Point> data = Point.newPointsBuilder("machine_table")
.addPoint() // first point
.setTimestamp(t0)
.addTag("city", "Singapore")
.addTag("ip", "10.0.0.1")
.addField("cpu", Value.withDouble(0.23))
.addField("mem", Value.withDouble(0.55))
.build()
.addPoint() // second point
.setTimestamp(t1)
.addTag("city", "Singapore")
.addTag("ip", "10.0.0.1")
.addField("cpu", Value.withDouble(0.25))
.addField("mem", Value.withDouble(0.56))
.build()
.addPoint()// third point
.setTimestamp(t1)
.addTag("city", "Shanghai")
.addTag("ip", "10.0.0.2")
.addField("cpu", Value.withDouble(0.21))
.addField("mem", Value.withDouble(0.52))
.build()
.build();

final CompletableFuture<Result<WriteOk, Err>> wf = client.write(data);
final CompletableFuture<Result<WriteOk, Err>> wf = client.write(new WriteRequest(data));
// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture API
final Result<WriteOk, Err> wr = wf.get();

Assert.assertTrue(wr.isOk());
Assert.assertEquals(3, wr.getOk().getSuccess());
// `Result` class referenced the Rust language practice, provides rich functions (such as mapXXX, andThen) transforming the result value to improve programming efficiency. You can refer to the API docs for detail usage.
Assert.assertEquals(3, wr.mapOr(0, WriteOk::getSuccess).intValue());
Assert.assertEquals(0, wr.getOk().getFailed());
Assert.assertEquals(0, wr.mapOr(-1, WriteOk::getFailed).intValue());
final Result<WriteOk, Err> writeResult = wf.get();
Assert.assertTrue(writeResult.isOk());
// `Result` class referenced the Rust language practice, provides rich functions (such as mapXXX, andThen) transforming the result value to improve programming efficiency. You can refer to the API docs for detail usage.
Assert.assertEquals(3, writeResult.getOk().getSuccess());
Assert.assertEquals(3, writeResult.mapOr(0, WriteOk::getSuccess).intValue());
Assert.assertEquals(0, writeResult.getOk().getFailed());
Assert.assertEquals(0, writeResult.mapOr(-1, WriteOk::getFailed).intValue());
```
See [write](docs/write.md)

## Query data example
```java
final QueryRequest queryRequest = QueryRequest.newBuilder()
.forMetrics("machine_metric") // table name is optional. If not provided, SQL parser will parse the `ql` to get the table name and do the routing automaticly
.ql("select timestamp, cpu, mem from machine_metric") //
final SqlQueryRequest queryRequest = SqlQueryRequest.newBuilder()
.forTables("machine_table") // table name is optional. If not provided, SQL parser will parse the `ssql` to get the table name and do the routing automaticly
.sql("select * from machine_table where ts = %d", t0) //
.build();
final CompletableFuture<Result<QueryOk, Err>> qf = client.query(queryRequest);
final CompletableFuture<Result<SqlQueryOk, Err>> qf = client.sqlQuery(queryRequest);
// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture API
final Result<QueryOk, Err> qr = qf.get();
final Result<SqlQueryOk, Err> queryResult = qf.get();

Assert.assertTrue(queryResult.isOk());

Assert.assertTrue(qr.isOk());
final SqlQueryOk queryOk = queryResult.getOk();
Assert.assertEquals(1, queryOk.getRowCount());

final QueryOk queryOk = qr.getOk();
// get rows as list
final List<Row> rows = queryOk.getRowList();
Assert.assertEquals(t0, rows.get(0).getColumnValue("ts").getTimestamp());
Assert.assertEquals("Singapore", rows.get(0).getColumnValue("city").getString());
Assert.assertEquals("10.0.0.1", rows.get(0).getColumnValue("ip").getString());
Assert.assertEquals(0.23, rows.get(0).getColumnValue("cpu").getDouble(), 0.0000001);
Assert.assertEquals(0.55, rows.get(0).getColumnValue("mem").getDouble(), 0.0000001);

final List<Record> records = queryOk.mapToRecord().collect(Collectors.toList())
// get rows as stream
final Stream<Row> rowStream = queryOk.stream();
rowStream.forEach(row -> System.out.println(row.toString()));
```
See [read](docs/read.md)

## stream write/read Example
CeresDB support streaming writing and reading,suitable for large-scale data reading and writing。
```java
final Calendar time = Calendar.getInstance();
final StreamWriteBuf<Rows, WriteOk> writeBuf = client.streamWrite("machine_metric");
for (int i = 0; i < 1000; i++) {
time.add(Calendar.MILLISECOND, 1);
Collection<Rows> rows = new ArrayList<>();
final long t0 = System.currentTimeMillis();
final long t1 = t0 + 1000;
final long t2 = t1 + 1000;
final Rows data = Series.newBuilder("machine_metric").tag("city", "Singapore").tag("ip", "127.0.0.1")
.toRowsBuilder()
.field(t0, "cpu", FieldValue.withDouble(0.23))
.field(t0, "mem", FieldValue.withDouble(0.55))
.field(t1, "cpu", FieldValue.withDouble(0.25))
.field(t1, "mem", FieldValue.withDouble(0.56))
.field(t2, "cpu", FieldValue.withDouble(0.21))
.field(t2, "mem", FieldValue.withDouble(0.52))
long start = System.currentTimeMillis();
long t = start;
final StreamWriteBuf<Point, WriteOk> writeBuf = client.streamWrite("machine_table");

for (int i = 0; i < 1000; i++) {
final List<Point> streamData = Point.newPointsBuilder("machine_table")
.addPoint()
.setTimestamp(t)
.addTag("city", "Beijing")
.addTag("ip", "10.0.0.3")
.addField("cpu", Value.withDouble(0.42))
.addField("mem", Value.withDouble(0.67))
.build()
.build();
rows.add(data);
writeBuf.writeAndFlush(data);
}
writeBuf.writeAndFlush(streamData);
t = t+1;
}
final CompletableFuture<WriteOk> writeOk = writeBuf.completed();
Assert.assertEquals(1000, writeOk.join().getSuccess());
Assert.assertEquals(1000, writeOk.join().getSuccess());

final QueryRequest req = QueryRequest.newBuilder().ql("select * from %s", "machine_metric").build();
final Iterator<Record> it = client.blockingStreamQuery(req, 3, TimeUnit.SECONDS);
final SqlQueryRequest streamQuerySql = SqlQueryRequest.newBuilder()
.sql("select * from %s where city = '%s' and ts >= %d and ts < %d", "machine_table", "Beijing", start, t).build();
final Result<SqlQueryOk, Err> streamQueryResult = client.sqlQuery(streamQuerySql).get();
Assert.assertTrue(streamQueryResult.isOk());
Assert.assertEquals(1000, streamQueryResult.getOk().getRowCount());
```
See [streaming](docs/streaming.md)

Expand Down
Loading