Skip to content

Commit

Permalink
refactor: 1.0.0 API (#40)
Browse files Browse the repository at this point in the history
* feat: remove http mangement

* refactor: refactor sdk api

* chore: remove dependency arvo

* doc: update docs with new api

* chore: rename metric to table in RouteClient

* feat: supprt zstd batch decompress

* doc: update version to 1.0.0.alpha

* refactor: rename Value float64 to double

* fix: fix WriteOkTest assert

* chore: license header instread by simple one

* doc: update changelog 1.0.0.alpha

* oc: tranlate doc/* to en

* fix: use simple license header

* feat: support PointBuilder

* feat: support database

* feat: add database & remove tenant

* feat: remove TablePointsBuilder

* fix: fix problems in pr conversation

* doc: rename metrics metric to table
  • Loading branch information
mrrtree authored Feb 14, 2023
1 parent ba05e59 commit c0df97c
Show file tree
Hide file tree
Showing 237 changed files with 3,732 additions and 9,368 deletions.
146 changes: 63 additions & 83 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,25 @@ CeresDB is a high-performance, distributed, schema-less, cloud native time-serie
<dependency>
<groupId>io.ceresdb</groupId>
<artifactId>ceresdb-all</artifactId>
<version>0.1.0</version>
<version>1.0.0-alpha</version>
</dependency>
```

## Init CeresDB client
```java
// CeresDB options
final CeresDBOptions opts = CeresDBOptions.newBuilder("127.0.0.1", 8831) //// CeresDB default grpc port 8831
.tenant("public", "sub_test", "test_token") // tenant info
final CeresDBOptions opts = CeresDBOptions.newBuilder("127.0.0.1", 8831, DIRECT) // CeresDB default grpc port 8831,use DIRECT RouteMode
.database("public") // use database for client, can be overridden by the RequestContext in request
// maximum retry times when write fails
// (only some error codes will be retried, such as the routing table failure)
.writeMaxRetries(1)
// maximum retry times when read fails
// (only some error codes will be retried, such as the routing table failure)
.readMaxRetries(1)
.build();
.readMaxRetries(1).build();

final CeresDBClient client = new CeresDBClient();
if (!client.init(this.opts)) {
if (!client.init(opts)) {
throw new IllegalStateException("Fail to start CeresDBClient");
}
}
```
For more configuration options, see [configuration](docs/configuration.md)

Expand All @@ -132,105 +130,87 @@ CeresDB is a Schema-less time-series database, so creating table schema ahead of
The following table creation statement(using the SQL API included in SDK )shows all field types supported by CeresDB:

```java
SqlResult result = client.management().executeSql("CREATE TABLE MY_FIRST_TABL(" +
"ts TIMESTAMP NOT NULL," +
"c1 STRING TAG NOT NULL," +
"c2 STRING TAG NOT NULL," +
"c3 DOUBLE NULL," +
"c4 STRING NULL," +
"c5 INT64 NULL," +
"c6 FLOAT NULL," +
"c7 INT32 NULL," +
"c8 INT16 NULL," +
"c9 INT8 NULL," +
"c10 BOOLEAN NULL,"
"c11 UINT64 NULL,"
"c12 UINT32 NULL,"
"c13 UINT16 NULL,"
"c14 UINT8 NULL,"
"c15 TIMESTAMP NULL,"
"c16 VARBINARY NULL,"
"TIMESTAMP KEY(ts)) ENGINE=Analytic"
);
// Create table manually, creating table schema ahead of data ingestion is not required
String createTableSql = "CREATE TABLE IF NOT EXISTS machine_table(" + "ts TIMESTAMP NOT NULL," + //
"ts TIMESTAMP NOT NULL," +
"city STRING TAG NOT NULL," +
"ip STRING TAG NOT NULL," +
"cpu DOUBLE NULL," +
"mem DOUBLE NULL," +
"TIMESTAMP KEY(ts)" + // timestamp column must be specified
") ENGINE=Analytic";

Result<SqlQueryOk, Err> createResult = client.sqlQuery(new SqlQueryRequest(createTableSql)).get();
if (!createResult.isOk()) {
throw new IllegalStateException("Fail to create table");
}
```

## Write data example
## How to build write data
```java
final long t0 = System.currentTimeMillis();
final long t1 = t0 + 1000;
final long t2 = t1 + 1000;
final Rows data = Series.newBuilder("machine_metric")
.tag("city", "Singapore")
.tag("ip", "127.0.0.1")
.toRowsBuilder()
// codes below organizes 3 lines data (3 timestamps) for the `cpu` and `mem` column, this will just transport once through network. CeresDB encourage practices like this, because the SDK could use efficient compression algorithm to reduce network traffic and also be friendly to the sever side.
.field(t0, "cpu", FieldValue.withDouble(0.23)) // first row, first column
.field(t0, "mem", FieldValue.withDouble(0.55)) // first row, second column
.field(t1, "cpu", FieldValue.withDouble(0.25)) // second row, first column
.field(t1, "mem", FieldValue.withDouble(0.56)) // second row, second column
.field(t2, "cpu", FieldValue.withDouble(0.21)) // third row, first column
.field(t2, "mem", FieldValue.withDouble(0.52)) // third row, second column
final Point point = Point.newPointBuilder("machine_table")
.setTimestamp(t0)
.addTag("city", "Singapore")
.addTag("ip", "10.0.0.1")
.addField("cpu", Value.withDouble(0.23))
.addField("mem", Value.withDouble(0.55))
.build();
```

final CompletableFuture<Result<WriteOk, Err>> wf = client.write(data);
## Write data example
```java
final CompletableFuture<Result<WriteOk, Err>> wf = client.write(new WriteRequest(pointList));
// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture API
final Result<WriteOk, Err> wr = wf.get();

Assert.assertTrue(wr.isOk());
Assert.assertEquals(3, wr.getOk().getSuccess());
// `Result` class referenced the Rust language practice, provides rich functions (such as mapXXX, andThen) transforming the result value to improve programming efficiency. You can refer to the API docs for detail usage.
Assert.assertEquals(3, wr.mapOr(0, WriteOk::getSuccess).intValue());
Assert.assertEquals(0, wr.getOk().getFailed());
Assert.assertEquals(0, wr.mapOr(-1, WriteOk::getFailed).intValue());
final Result<WriteOk, Err> writeResult = wf.get();
Assert.assertTrue(writeResult.isOk());
// `Result` class referenced the Rust language practice, provides rich functions (such as mapXXX, andThen) transforming the result value to improve programming efficiency. You can refer to the API docs for detail usage.
Assert.assertEquals(3, writeResult.getOk().getSuccess());
Assert.assertEquals(3, writeResult.mapOr(0, WriteOk::getSuccess).intValue());
Assert.assertEquals(0, writeResult.mapOr(-1, WriteOk::getFailed).intValue());
```
See [write](docs/write.md)

## Query data example
```java
final QueryRequest queryRequest = QueryRequest.newBuilder()
.forMetrics("machine_metric") // table name is optional. If not provided, SQL parser will parse the `ql` to get the table name and do the routing automaticly
.ql("select timestamp, cpu, mem from machine_metric") //
final SqlQueryRequest queryRequest = SqlQueryRequest.newBuilder()
.forTables("machine_table") // table name is optional. If not provided, SQL parser will parse the `sql` to get the table name and do the routing automaticly
.sql("select * from machine_table where ts = %d", t0) //
.build();
final CompletableFuture<Result<QueryOk, Err>> qf = client.query(queryRequest);
final CompletableFuture<Result<SqlQueryOk, Err>> qf = client.sqlQuery(queryRequest);
// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture API
final Result<QueryOk, Err> qr = qf.get();
final Result<SqlQueryOk, Err> queryResult = qf.get();

Assert.assertTrue(queryResult.isOk());

Assert.assertTrue(qr.isOk());
final SqlQueryOk queryOk = queryResult.getOk();
Assert.assertEquals(1, queryOk.getRowCount());

final QueryOk queryOk = qr.getOk();
// get rows as list
final List<Row> rows = queryOk.getRowList();

final List<Record> records = queryOk.mapToRecord().collect(Collectors.toList())
// get rows as stream
final Stream<Row> rowStream = queryOk.stream();
rowStream.forEach(row -> System.out.println(row.toString()));
```
See [read](docs/read.md)

## stream write/read Example
CeresDB support streaming writing and reading,suitable for large-scale data reading and writing。
```java
final Calendar time = Calendar.getInstance();
final StreamWriteBuf<Rows, WriteOk> writeBuf = client.streamWrite("machine_metric");
for (int i = 0; i < 1000; i++) {
time.add(Calendar.MILLISECOND, 1);
Collection<Rows> rows = new ArrayList<>();
final long t0 = System.currentTimeMillis();
final long t1 = t0 + 1000;
final long t2 = t1 + 1000;
final Rows data = Series.newBuilder("machine_metric").tag("city", "Singapore").tag("ip", "127.0.0.1")
.toRowsBuilder()
.field(t0, "cpu", FieldValue.withDouble(0.23))
.field(t0, "mem", FieldValue.withDouble(0.55))
.field(t1, "cpu", FieldValue.withDouble(0.25))
.field(t1, "mem", FieldValue.withDouble(0.56))
.field(t2, "cpu", FieldValue.withDouble(0.21))
.field(t2, "mem", FieldValue.withDouble(0.52))
final StreamWriteBuf<Point, WriteOk> writeBuf = client.streamWrite("machine_table");
for (int i = 0; i < 1000; i++) {
final Point point = Point.newPointBuilder("machine_table")
.setTimestamp(timestamp)
.addTag("city", "Beijing")
.addTag("ip", "10.0.0.3")
.addField("cpu", Value.withDouble(0.42))
.addField("mem", Value.withDouble(0.67))
.build();
rows.add(data);
writeBuf.writeAndFlush(data);
}
final CompletableFuture<WriteOk> writeOk = writeBuf.completed();
Assert.assertEquals(1000, writeOk.join().getSuccess());
writeBuf.writeAndFlush(Arrays.asList(point));
timestamp = timestamp+1;
}

final QueryRequest req = QueryRequest.newBuilder().ql("select * from %s", "machine_metric").build();
final Iterator<Record> it = client.blockingStreamQuery(req, 3, TimeUnit.SECONDS);
final CompletableFuture<WriteOk> writeOk = writeBuf.completed();
```
See [streaming](docs/streaming.md)

Expand Down
Loading

0 comments on commit c0df97c

Please sign in to comment.