-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkaldi-starter.go
122 lines (115 loc) · 3.69 KB
/
kaldi-starter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
import (
"bbb-kaldi-connector/bbb"
"fmt"
"io"
"io/ioutil"
"log"
"os/exec"
"syscall"
"github.com/gomodule/redigo/redis"
)
func main() {
listen()
}
func listen() {
defer func() {
if r := recover(); r != nil {
log.Println("Recovered in f", r)
listen()
}
}()
host := "134.100.15.197"
pubSubConnection, pubSub := bbb.SetupRedisPubSub(host)
defer pubSubConnection.Close()
kaldiProcessMap := make(map[string]chan bool)
redisConnection := bbb.NewRedisConnection(host)
defer redisConnection.Close()
for {
switch v := pubSub.Receive().(type) {
case redis.Message:
processMessage(v, kaldiProcessMap)
// case redis.Subscription:
// fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
log.Fatal(v)
}
}
}
func startKaldiForMeeting(kaldiProcessMap map[string]chan bool, meetingID string) {
defer func(kaldiProcessMap map[string]chan bool, meetingID string) {
if r := recover(); r != nil {
log.Println("Recovered in f, restarting", r)
startKaldiForMeeting(kaldiProcessMap, meetingID)
}
}(kaldiProcessMap, meetingID)
cmd := exec.Command(
"/home/3wille/pykaldi_env/bin/python3", "nnet3_model.py", "-m0", "-e", "-c1", "-t",
"-asinc_fastest", "-r 48000", "-ylibrispeech.yaml",
"--redis-audio=asr_audio_"+string(meetingID), "--redis-channel=asr_text_"+string(meetingID),
// "/home/3wille/pykaldi_env/bin/python3", "nnet3_model.py", "-m0", "-e", "-c1", "-t",
// "-asinc_fastest", "-r 48000", "-ykaldi_tuda_de_nnet3_chain2_new.yaml",
// "--redis-audio=asr_audio_"+string(meetingID), "--redis-channel=asr_text_"+string(meetingID),
)
log.Println(cmd.Args)
cmd.Env = append(cmd.Env, "LD_PRELOAD=/opt/intel/mkl/lib/intel64/libmkl_def.so:/opt/intel/mkl/lib/intel64/libmkl_avx2.so:/opt/intel/mkl/lib/intel64/libmkl_core.so:/opt/intel/mkl/lib/intel64/libmkl_intel_lp64.so:/opt/intel/mkl/lib/intel64/libmkl_intel_thread.so:/opt/intel/lib/intel64_lin/libiomp5.so")
cmd.Env = append(cmd.Env, "OMP_NUM_THREADS=1")
cmd.Dir = "/home/3wille/kaldi-model-server"
stderr, err := cmd.StderrPipe()
if err != nil {
log.Fatal(err)
}
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
stopChannel := make(chan bool)
go func(stopChannel chan bool, cmd exec.Cmd, stderr io.ReadCloser) {
ch := make(chan error)
go func() {
slurp, _ := ioutil.ReadAll(stderr)
fmt.Printf("%s\n", slurp)
if err := cmd.Wait(); err != nil {
log.Println("Kaldi exited, restarting: ", meetingID)
startKaldiForMeeting(kaldiProcessMap, meetingID)
}
}()
for {
select {
case _, _ = <-stopChannel:
log.Println("Killing meeting")
log.Println(cmd.Process.Pid)
err := cmd.Process.Signal(syscall.SIGINT)
// err := cmd.Process.Kill()
if err != nil {
log.Println("failed to send signal: ", err)
}
slurp, _ := ioutil.ReadAll(stderr)
fmt.Printf("%s\n", slurp)
return
case err := <-ch:
if err != nil {
log.Println("Kaldi exited, restarting: ", meetingID)
startKaldiForMeeting(kaldiProcessMap, meetingID)
}
}
}
}(stopChannel, *cmd, stderr)
log.Println("Started for meeting: ", meetingID)
kaldiProcessMap[meetingID] = stopChannel
}
func processMessage(redisMessage redis.Message, kaldiProcessMap map[string]chan bool) {
message := bbb.ParseMessage(redisMessage)
if message.Core.Header.Name == "CreateMeetingReqMsg" {
meetingID := message.Core.Body.Props.MeetingProp.IntID
log.Println(meetingID)
startKaldiForMeeting(kaldiProcessMap, meetingID)
} else if message.Core.Header.Name == "DestroyMeetingSysCmdMsg" {
meetingID := message.Core.Body.MeetingID
stopChannel, ok := kaldiProcessMap[meetingID]
if !ok {
log.Println("no kaldi process found for ending meeting: ", meetingID)
return
}
stopChannel <- true
}
}