-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathdataSimulate.dos
108 lines (95 loc) · 4.17 KB
/
dataSimulate.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/*
step1:create Database and Table
*/
//sensor state
if(existsDatabase("dfs://test_state")) {dropDatabase("dfs://test_state")}
create database "dfs://test_state" partitioned by VALUE(2024.01.01..2024.01.31),HASH([SYMBOL,6]),engine = "TSDB"
create table "dfs://test_state"."state"(
ts TIMESTAMP [compress = "delta"],
deviceId SYMBOL,
temp DOUBLE,
amp DOUBLE,
hydra DOUBLE
)
partitioned by ts,deviceId,
sortColumns = ["deviceId","ts"]
//operate log
if(existsDatabase("dfs://test_log")) {dropDatabase("dfs://test_log")}
create database "dfs://test_log" partitioned by RANGE(yearBegin(2000.01.01..2035.01.01).distinct().sort()),engine="TSDB"
create table "dfs://test_log"."log"(
ts TIMESTAMP [compress="delta"],
deviceId SYMBOL,
actionType SYMBOL
)
partitioned by ts,
sortColumns = ["deviceId","ts"]
//anomaly infomation
if(existsDatabase("dfs://test_anomaly")) {dropDatabase("dfs://test_anomaly")}
create database "dfs://test_anomaly" partitioned by RANGE(yearBegin(2000.01.01..2035.01.01).distinct().sort()),engine = "TSDB"
create table "dfs://test_anomaly"."anomaly"(
deviceId SYMBOL,
ts TIMESTAMP [compress="delta"],
persistTime FLOAT
)
partitioned by ts,
sortColumns = ["deviceId","ts"]
/*
step2:simulate data
*/
//simulate sensor state
def writeState(){
ts = 2024.01.01T00:00:00.000+(0..86399)*1000
temp1 = ((sin(1..1200\4)+2)*4+85) join (rand(80..82,60*5)) join ((sin(1..600\4)+2)*4+85) join (rand(106..107,60*3)) join ((sin(1..1320\4)+2)*4+85)
amp1 = ((sin(1..1200\6)+2)*6+65) join (rand(60..62,60*5)) join ((sin(1..600\6)+2)*6+65) join (rand(94..95,60*3)) join ((sin(1..600\6)+2)*6+65) join (rand(55..56,60*3)) join ((sin(1..540\6)+2)*6+65)
hydra1 = ((sin(1..1200\4.5)+2)*4.5+93) join (rand(116..117,60*5)) join ((sin(1..1380\4.5)+2)*4.5+93) join (rand(81..82,60*3)) join ((sin(1..540\4.5)+2)*4.5+93)
temp = temp1 join ((sin(1..3600\4)+2)*4+85) join temp1 join ((sin(1..7200\4)+2)*4+85) join temp1 join ((sin(1..64800\4)+2)*4+85)
amp = amp1 join ((sin(1..3600\6)+2)*6+65) join amp1 join ((sin(1..7200\6)+2)*6+65) join amp1 join ((sin(1..64800\6)+2)*6+65)
hydra = hydra1 join ((sin(1..3600\4.5)+2)*4.5+93) join hydra1 join ((sin(1..7200\4.5)+2)*4.5+93) join hydra1 join ((sin(1..64800\4.5)+2)*4.5+93)
for(i in 1..1000){
deviceId = take(`device+lpad(string(i),4,"0"),86400)
t = table(ts,deviceId,temp,amp,hydra)
pt = loadTable("dfs://test_state","state")
pt.append!(t)
}
}
//simulate operate log
def writelog(){
time = 2024.01.01T00:00:00.000+(0..1439)*1000*60
actionType1 = rand(`F + string(4..10),20) join take(`F1,5) join rand(`F + string(4..10),10) join take(`F2,3) join rand(`F + string(4..10),10) join take(`F3,3) join rand(`F + string(4..10),9)
actionType = actionType1 join rand(`F+string(4..10),60) join actionType1 join rand(`F+string(4..10),120) join actionType1 join rand(`F+string(4..10),60*18)
index1 = 20..24 join 35..37 join 48..50 join 140..144 join 155..157 join 168..170 join 320..324 join 335..337 join 348..350
t1 = table(time as ts,actionType)
t = table(1:0,`ts`deviceId`actionType,`TIMESTAMP`SYMBOL`SYMBOL)
needDeleteIndex = rand(1439,1200)
do{
needDeleteIndex = needDeleteIndex.append!(rand(1439,1200-needDeleteIndex.size())).distinct()
}while(needDeleteIndex.size() < 1200)
for(x in needDeleteIndex){
if(!(x in index1)){
delete from t1 where ts = time[x]
}
}
for(i in 1..1000){
deviceId = take(`device+lpad(string(i),4,"0"),t1.size())
t.tableInsert(select ts,deviceId,actionType from t1)
}
pt = loadTable("dfs://test_log","log")
pt.append!(t)
}
//simulate anomaly infomation
def writeAnomaly(){
time1 = 2024.01.01T00:00:00.000+(0..1439)*1000*60
index1 = 20 35 48 140 155 168 320 335 348
ts = time1[index1]
persistTime = 5 3 3 5 3 3 5 3 3
t = table(1:0,`deviceId`ts`persistTime,`SYMBOL`TIMESTAMP`FLOAT)
for(i in 1..1000){
deviceId = take(`device+lpad(string(i),4,"0"),ts.size())
t.tableInsert(table(deviceId,ts,persistTime))
}
pt = loadTable("dfs://test_anomaly","anomaly")
pt.append!(t)
}
writeAnomaly();
writelog();
writeState();