diff --git a/server/mqtt.go b/server/mqtt.go index 5263158f284..1b62bbcbc44 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -113,6 +113,10 @@ const ( mqttRetainedMsgsStreamName = mqttStreamNamePrefix + "rmsgs" mqttRetainedMsgsStreamSubject = "$MQTT.rmsgs" + // Stream name for MQTT sessions on a given account + mqttSessStreamName = mqttStreamNamePrefix + "sess" + mqttSessStreamSubjectPrefix = "$MQTT.sess." + // Stream name prefix for MQTT sessions on a given account mqttSessionsStreamNamePrefix = mqttStreamNamePrefix + "sess_" @@ -148,6 +152,7 @@ const ( mqttJSAMsgLoad = "ML" mqttJSASessPersist = "SP" mqttJSARetainedMsgDel = "RD" + mqttJSAStreamNames = "SN" // Name of the header key added to NATS message to carry mqtt PUBLISH information mqttNatsHeader = "Nmqtt-Pub" @@ -159,6 +164,9 @@ const ( // This is how frequently the timer to cleanup the sessions flappers map is firing. mqttSessFlappingCleanupInterval = 5 * time.Second + + // Default retry delay if transfer of old session streams to new one fails + mqttDefaultTransferRetry = 5 * time.Second ) var ( @@ -217,6 +225,20 @@ type mqttAccountSessionManager struct { replicas int rrmLastSeq uint64 // Restore retained messages expected last sequence rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded + sp sessPersist // Used for cluster-wide processing of session records being persisted + domainTk string // Domain (with trailing "."), or possibly empty. This is added to session subject. +} + +type sessPersist struct { + mu sync.Mutex + ch chan struct{} + head *sessPersistRecord + tail *sessPersistRecord +} + +type sessPersistRecord struct { + seq uint64 + next *sessPersistRecord } type mqttJSA struct { @@ -257,6 +279,7 @@ type mqttSession struct { maxp uint16 tmaxack int clean bool + domainTk string } type mqttPersistedSession struct { @@ -778,6 +801,7 @@ func (s *Server) mqttHandleClosedClient(c *client) { sess.mu.Lock() sess.c = nil doClean := sess.clean + seq := sess.seq sess.mu.Unlock() // If it was a clean session, then we remove from the account manager, // and we will call clear() outside of any lock. @@ -790,7 +814,7 @@ func (s *Server) mqttHandleClosedClient(c *client) { // This needs to be done outside of any lock. if doClean { - sess.clear(true) + sess.clear(true, seq) } // Now handle the "will". This function will be a no-op if there is no "will" to send. @@ -893,6 +917,13 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc nuid: nuid.New(), quitCh: quitCh, }, + sp: sessPersist{ + ch: make(chan struct{}, 1), + }, + } + // We need to include the domain in the subject prefix used to store sessions in the $MQTT_sess stream. + if d := s.getOpts().JetStreamDomain; d != _EMPTY_ { + as.domainTk = d + "." } var subs []*subscription @@ -954,8 +985,29 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc as.sendJSAPIrequests(s, c, accName, closeCh) }) - // Create the stream for the messages. + // Start the go routine that will handle network updates regarding sessions + s.startGoRoutine(func() { + defer s.grWG.Done() + as.sessPersistProcessing(closeCh) + }) + + // Create the stream for the sessions. cfg := &StreamConfig{ + Name: mqttSessStreamName, + Subjects: []string{mqttSessStreamSubjectPrefix + as.domainTk + ">"}, + Storage: FileStorage, + Retention: LimitsPolicy, + Replicas: as.replicas, + MaxMsgsPer: 1, + } + if _, err := jsa.createStream(cfg); err == nil { + as.transferUniqueSessStreamsToMuxed(s) + } else if isErrorOtherThan(err, JSStreamNameExistErr) { + return nil, fmt.Errorf("create sessions stream for account %q: %v", acc.GetName(), err) + } + + // Create the stream for the messages. + cfg = &StreamConfig{ Name: mqttStreamName, Subjects: []string{mqttStreamSubjectPrefix + ">"}, Storage: FileStorage, @@ -1080,7 +1132,12 @@ func (jsa *mqttJSA) newRequestEx(kind, subject string, hdr int, msg []byte, time // Either we use nuid.Next() which uses a global lock, or our own nuid object, but // then it needs to be "write" protected. This approach will reduce across account // contention since we won't use the global nuid's lock. - reply := jsa.rplyr + kind + "." + jsa.nuid.Next() + var sb strings.Builder + sb.WriteString(jsa.rplyr) + sb.WriteString(kind) + sb.WriteByte(btsep) + sb.WriteString(jsa.nuid.Next()) + reply := sb.String() jsa.mu.Unlock() ch := make(chan interface{}, 1) @@ -1170,6 +1227,20 @@ func (jsa *mqttJSA) deleteStream(name string) (bool, error) { return sdr.Success, sdr.ToError() } +func (jsa *mqttJSA) loadLastMsgFor(streamName string, subject string) (*StoredMsg, error) { + mreq := &JSApiMsgGetRequest{LastFor: subject} + req, err := json.Marshal(mreq) + if err != nil { + return nil, err + } + lmri, err := jsa.newRequest(mqttJSAMsgLoad, fmt.Sprintf(JSApiMsgGetT, streamName), 0, req) + if err != nil { + return nil, err + } + lmr := lmri.(*JSApiMsgGetResponse) + return lmr.Message, lmr.ToError() +} + func (jsa *mqttJSA) loadMsg(streamName string, seq uint64) (*StoredMsg, error) { mreq := &JSApiMsgGetRequest{Seq: seq} req, err := json.Marshal(mreq) @@ -1275,6 +1346,12 @@ func (as *mqttAccountSessionManager) processJSAPIReplies(_ *subscription, pc *cl resp.Error = NewJSInvalidJSONError() } ch <- resp + case mqttJSAStreamNames: + var resp = &JSApiStreamNamesResponse{} + if err := json.Unmarshal(msg, resp); err != nil { + resp.Error = NewJSInvalidJSONError() + } + ch <- resp default: pc.Warnf("Unknown reply code %q", token) } @@ -1350,7 +1427,34 @@ func (as *mqttAccountSessionManager) processSessionPersist(_ *subscription, pc * if err := par.Error; err != nil { return } - cIDHash := strings.TrimPrefix(par.Stream, mqttSessionsStreamNamePrefix) + // We would need to lookup the message that that is a request/reply + // that we can do in place here. So move that to a long-running routine + // that will process the session persist record. + as.mu.RLock() + sp := &as.sp + as.mu.RUnlock() + + spr := &sessPersistRecord{seq: par.Sequence} + sp.mu.Lock() + if sp.tail != nil { + sp.tail.next = spr + } else { + sp.head = spr + select { + case sp.ch <- struct{}{}: + default: + } + } + sp.tail = spr + sp.mu.Unlock() +} + +func (as *mqttAccountSessionManager) processSessPersistRecord(spr *sessPersistRecord) { + smsg, err := as.jsa.loadMsg(mqttSessStreamName, spr.seq) + if err != nil { + return + } + cIDHash := strings.TrimPrefix(smsg.Subject, mqttSessStreamSubjectPrefix+as.domainTk) as.mu.Lock() defer as.mu.Unlock() @@ -1360,7 +1464,10 @@ func (as *mqttAccountSessionManager) processSessionPersist(_ *subscription, pc * } // If our current session's stream sequence is higher, it means that this // update is stale, so we don't do anything here. - if par.Sequence < sess.seq { + sess.mu.Lock() + ignore := spr.seq < sess.seq + sess.mu.Unlock() + if ignore { return } as.removeSession(sess, false) @@ -1378,6 +1485,32 @@ func (as *mqttAccountSessionManager) processSessionPersist(_ *subscription, pc * sess.mu.Unlock() } +func (as *mqttAccountSessionManager) sessPersistProcessing(closeCh chan struct{}) { + as.mu.RLock() + sp := &as.sp + quitCh := as.jsa.quitCh + as.mu.RUnlock() + + for { + select { + case <-sp.ch: + sp.mu.Lock() + l := sp.head + sp.head, sp.tail = nil, nil + sp.mu.Unlock() + + for spr := l; spr != nil; spr = l.next { + l = spr + as.processSessPersistRecord(spr) + } + case <-closeCh: + return + case <-quitCh: + return + } + } +} + // Adds this client ID to the flappers map, and if needed start the timer // for map cleanup. // @@ -1855,69 +1988,33 @@ func (as *mqttAccountSessionManager) getRetainedPublishMsgs(subject string, rms // Runs from the client's readLoop. // Lock not held on entry, but session is in the locked map. func (as *mqttAccountSessionManager) createOrRestoreSession(clientID string, opts *Options) (*mqttSession, bool, error) { - // Add the JS domain (possibly empty) to the client ID, which will make - // session stream/filter subject be unique per domain. So if an application - // with the same client ID moves to the other domain, then there won't be - // conflict of session message in one domain updating the session's stream - // in others. - hash := string(getHash(opts.JetStreamDomain + clientID)) - sname := mqttSessionsStreamNamePrefix + hash - cfg := &StreamConfig{ - Name: sname, - Subjects: []string{sname}, - Storage: FileStorage, - Retention: LimitsPolicy, - MaxMsgs: 1, - Replicas: as.replicas, - } jsa := &as.jsa formatError := func(errTxt string, err error) (*mqttSession, bool, error) { accName := jsa.c.acc.GetName() return nil, false, fmt.Errorf("%s for account %q, session %q: %v", errTxt, accName, clientID, err) } -CREATE_STREAM: - // Send a request to create the stream for this session. - si, err := jsa.createStream(cfg) - if err != nil { - // Check for insufficient resources. If that is the case, and if possible, try - // again with a lower replicas value. - if cfg.Replicas > 1 && IsNatsErr(err, JSInsufficientResourcesErr) { - cfg.Replicas-- - goto CREATE_STREAM - } - // If there is an error and not simply "already used" (which means that the - // stream already exists) then we fail. - if isErrorOtherThan(err, JSStreamNameExistErr) { - return formatError("create session stream", err) - } - } + + hash := string(getHash(clientID)) + subject := mqttSessStreamSubjectPrefix + as.domainTk + hash + smsg, err := jsa.loadLastMsgFor(mqttSessStreamName, subject) if err != nil { - // Since we have returned if error is not "stream already exist", then - // it means that the stream already exists and so we now need to recover - // the existing record. - si, err = jsa.lookupStream(sname) - if err != nil { - return formatError("lookup session stream", err) + if isErrorOtherThan(err, JSNoMessageFoundErr) { + return formatError("loading session record", err) } - } - // The stream is supposed to have at most 1 record, if it is empty, it means - // that we just created it. - if si.State.Msgs == 0 { + // Message not found, so reate the session... // Create a session and indicate that this session did not exist. sess := mqttSessionCreate(jsa, clientID, hash, 0, opts) + sess.domainTk = as.domainTk return sess, false, nil } // We need to recover the existing record now. - smsg, err := jsa.loadMsg(sname, si.State.LastSeq) - if err != nil { - return formatError("loading session record", err) - } ps := &mqttPersistedSession{} if err := json.Unmarshal(smsg.Data, ps); err != nil { return formatError(fmt.Sprintf("unmarshal of session record at sequence %v", smsg.Sequence), err) } // Restore this session (even if we don't own it), the caller will do the right thing. sess := mqttSessionCreate(jsa, clientID, hash, smsg.Sequence, opts) + sess.domainTk = as.domainTk sess.clean = ps.Clean sess.subs = ps.Subs sess.cons = ps.Cons @@ -1947,6 +2044,66 @@ func (as *mqttAccountSessionManager) notifyRetainedMsgDeleted(subject string, se } } +func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Server) { + // Set retry to true, will be set to false on success. + retry := true + defer func() { + if retry { + next := mqttDefaultTransferRetry + log.Warnf("Failed to transfer all MQTT session streams, will try again in %v", next) + time.AfterFunc(next, func() { as.transferUniqueSessStreamsToMuxed(log) }) + } + }() + + jsa := &as.jsa + sni, err := jsa.newRequestEx(mqttJSAStreamNames, JSApiStreams, 0, nil, 5*time.Second) + if err != nil { + log.Errorf("Unable to transfer MQTT session streams: %v", err) + return + } + snames := sni.(*JSApiStreamNamesResponse) + if snames.Error != nil { + log.Errorf("Unable to transfer MQTT session streams: %v", snames.ToError()) + return + } + var oldMQTTSessStreams []string + for _, sn := range snames.Streams { + if strings.HasPrefix(sn, mqttSessionsStreamNamePrefix) { + oldMQTTSessStreams = append(oldMQTTSessStreams, sn) + } + } + ns := len(oldMQTTSessStreams) + if ns == 0 { + // Nothing to do + retry = false + return + } + log.Noticef("Transferring %v MQTT session streams...", ns) + for _, sn := range oldMQTTSessStreams { + log.Noticef(" Transferring stream %q to %q", sn, mqttSessStreamName) + smsg, err := jsa.loadLastMsgFor(sn, sn) + if err != nil { + log.Errorf(" Unable to load session record: %v", err) + return + } + ps := &mqttPersistedSession{} + if err := json.Unmarshal(smsg.Data, ps); err != nil { + log.Warnf(" Unable to unmarshal the content of this stream, may not be a legitimate MQTT session stream, skipping") + continue + } + // Compute subject where the session is being stored + subject := mqttSessStreamSubjectPrefix + as.domainTk + string(getHash(ps.ID)) + // Store record to MQTT session stream + if _, err := jsa.storeMsgWithKind(mqttJSASessPersist, subject, 0, smsg.Data); err != nil { + log.Errorf(" Unable to transfer the session record: %v", err) + return + } + jsa.deleteStream(sn) + } + log.Noticef("Transfer of %v MQTT session streams done!", ns) + retry = false +} + ////////////////////////////////////////////////////////////////////////////// // // MQTT session related functions @@ -1978,13 +2135,13 @@ func (sess *mqttSession) save() error { } b, _ := json.Marshal(&ps) - sname := mqttSessionsStreamNamePrefix + sess.idHash + subject := mqttSessStreamSubjectPrefix + sess.domainTk + sess.idHash seq := sess.seq sess.mu.Unlock() bb := bytes.Buffer{} bb.WriteString(hdrLine) - bb.WriteString(JSExpectedLastSeq) + bb.WriteString(JSExpectedLastSubjSeq) bb.WriteString(":") bb.WriteString(strconv.FormatInt(int64(seq), 10)) bb.WriteString(CR_LF) @@ -1992,7 +2149,7 @@ func (sess *mqttSession) save() error { hdr := bb.Len() bb.Write(b) - resp, err := sess.jsa.storeMsgWithKind(mqttJSASessPersist, sname, hdr, bb.Bytes()) + resp, err := sess.jsa.storeMsgWithKind(mqttJSASessPersist, subject, hdr, bb.Bytes()) if err != nil { return err } @@ -2007,13 +2164,13 @@ func (sess *mqttSession) save() error { // // Runs from the client's readLoop. // Lock not held on entry, but session is in the locked map. -func (sess *mqttSession) clear(deleteStream bool) { +func (sess *mqttSession) clear(deleteSess bool, seq uint64) { for sid, cc := range sess.cons { delete(sess.cons, sid) sess.deleteConsumer(cc) } - if deleteStream { - sess.jsa.deleteStream(mqttSessionsStreamNamePrefix + sess.idHash) + if deleteSess { + sess.jsa.deleteMsg(mqttSessStreamName, seq) } sess.mu.Lock() sess.subs, sess.pending, sess.cpending, sess.seq, sess.tmaxack = nil, nil, nil, 0, 0 @@ -2476,7 +2633,7 @@ CHECK: // This Session lasts as long as the Network Connection. State data // associated with this Session MUST NOT be reused in any subsequent // Session. - es.clear(false) + es.clear(false, 0) } else { // Report to the client that the session was present sessp = true diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 1dbbec51074..d0369101339 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -25,6 +25,7 @@ import ( "net/http" "net/url" "os" + "reflect" "strings" "sync" "testing" @@ -2700,7 +2701,7 @@ func TestMQTTCluster(t *testing.T) { clientID := nuid.Next() o := cl.opts[0] - mc, r := testMQTTConnect(t, &mqttConnInfo{clientID: clientID, cleanSess: false}, o.MQTT.Host, o.MQTT.Port) + mc, r := testMQTTConnectRetry(t, &mqttConnInfo{clientID: clientID, cleanSess: false}, o.MQTT.Host, o.MQTT.Port, 5) defer mc.Close() testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false) @@ -2775,7 +2776,7 @@ func TestMQTTCluster(t *testing.T) { cl.stopAll() cl.restartAll() - streams := []string{mqttStreamName, mqttRetainedMsgsStreamName} + streams := []string{mqttStreamName, mqttRetainedMsgsStreamName, mqttSessStreamName} for _, sn := range streams { cl.waitOnStreamLeader(globalAccountName, sn) } @@ -2794,7 +2795,7 @@ func TestMQTTClusterRetainedMsg(t *testing.T) { srv2Opts := cl.opts[1] // Connect subscription on server 1. - mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: false}, srv1Opts.MQTT.Host, srv1Opts.MQTT.Port) + mc, rc := testMQTTConnectRetry(t, &mqttConnInfo{clientID: "sub", cleanSess: false}, srv1Opts.MQTT.Host, srv1Opts.MQTT.Port, 5) defer mc.Close() testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) @@ -3014,7 +3015,7 @@ func TestMQTTClusterReplicasCount(t *testing.T) { for _, sname := range []string{ mqttStreamName, mqttRetainedMsgsStreamName, - mqttSessionsStreamNamePrefix + string(getHash("sub")), + mqttSessStreamName, } { t.Run(sname, func(t *testing.T) { si, err := js.StreamInfo(sname) @@ -3030,25 +3031,27 @@ func TestMQTTClusterReplicasCount(t *testing.T) { } } -func TestMQTTClusterSessionReplicasAdjustment(t *testing.T) { +func TestMQTTClusterCanCreateSessionWithOnServerDown(t *testing.T) { cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 3) defer cl.shutdown() o := cl.opts[0] - mc, rc := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + mc, rc := testMQTTConnectRetry(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5) defer mc.Close() testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) mc.Close() // Shutdown one of the server. + sd := cl.servers[1].StoreDir() + defer os.RemoveAll(strings.TrimSuffix(sd, JetStreamStoreDir)) cl.servers[1].Shutdown() // Make sure there is a meta leader cl.waitOnPeerCount(2) cl.waitOnLeader() - // Now try to create a new session. With R(3) this would fail, but now server will - // adjust it down to R(2). + // Now try to create a new session. Since we use a single stream now for all sessions, + // this should succeed. o = cl.opts[2] // We may still get failures because of some JS APIs may timeout while things // settle, so try again for a certain amount of times. @@ -3069,7 +3072,7 @@ func TestMQTTClusterPlacement(t *testing.T) { sc.waitOnLeader() for i := 0; i < 10; i++ { - mc, rc := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, lnc.opts[i%3].MQTT.Host, lnc.opts[i%3].MQTT.Port) + mc, rc := testMQTTConnectRetry(t, &mqttConnInfo{cleanSess: true}, lnc.opts[i%3].MQTT.Host, lnc.opts[i%3].MQTT.Port, 5) defer mc.Close() testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) } @@ -3258,7 +3261,7 @@ func TestMQTTSessionMovingDomains(t *testing.T) { connectSubAndDisconnect := func(host string, port int, present bool) { t.Helper() - mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: false}, host, port) + mc, rc := testMQTTConnectRetry(t, &mqttConnInfo{clientID: "sub", cleanSess: false}, host, port, 5) defer mc.Close() testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, present) testMQTTSub(t, 1, mc, rc, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) @@ -4962,6 +4965,108 @@ func TestMQTTWebsocketNotSupported(t *testing.T) { } } +func TestMQTTTransferSessionStreamsToMuxed(t *testing.T) { + cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 3) + defer cl.shutdown() + + nc, js := jsClientConnect(t, cl.randomServer()) + defer nc.Close() + + // Create 2 streams that start with "$MQTT_sess_" to check for transfer to new + // mux'ed unique "$MQTT_sess" stream. One of this stream will not contain a + // proper session record, and we will check that the stream does not get deleted. + sessStreamName1 := mqttSessionsStreamNamePrefix + string(getHash("sub")) + if _, err := js.AddStream(&nats.StreamConfig{ + Name: sessStreamName1, + Subjects: []string{sessStreamName1}, + Replicas: 3, + MaxMsgs: 1, + }); err != nil { + t.Fatalf("Unable to add stream: %v", err) + } + // Then add the session record + ps := mqttPersistedSession{ + ID: "sub", + Subs: map[string]byte{"foo": 1}, + Cons: map[string]*ConsumerConfig{"foo": { + Durable: "d6INCtp3_cK39H5WHEtOSU7sLy2oQv3", + DeliverSubject: "$MQTT.sub.cK39H5WHEtOSU7sLy2oQrR", + DeliverPolicy: DeliverNew, + AckPolicy: AckExplicit, + FilterSubject: "$MQTT.msgs.foo", + MaxAckPending: 1024, + }}, + } + b, _ := json.Marshal(&ps) + if _, err := js.Publish(sessStreamName1, b); err != nil { + t.Fatalf("Error on publish: %v", err) + } + + // Create the stream that has "$MQTT_sess_" prefix, but that is not really a MQTT session stream + sessStreamName2 := mqttSessionsStreamNamePrefix + "ivan" + if _, err := js.AddStream(&nats.StreamConfig{ + Name: sessStreamName2, + Subjects: []string{sessStreamName2}, + Replicas: 3, + MaxMsgs: 1, + }); err != nil { + t.Fatalf("Unable to add stream: %v", err) + } + if _, err := js.Publish(sessStreamName2, []byte("some content")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + + nc.Close() + cl.stopAll() + cl.restartAll() + + cl.waitOnStreamLeader(globalAccountName, sessStreamName1) + cl.waitOnStreamLeader(globalAccountName, sessStreamName2) + + // Now create a real MQTT connection + o := cl.opts[0] + sc, sr := testMQTTConnectRetry(t, &mqttConnInfo{clientID: "sub"}, o.MQTT.Host, o.MQTT.Port, 10) + defer sc.Close() + testMQTTCheckConnAck(t, sr, mqttConnAckRCConnectionAccepted, true) + + nc, js = jsClientConnect(t, cl.randomServer()) + defer nc.Close() + + // Check that old session stream is gone, but the non session stream is still present. + var gotIt = false + for info := range js.StreamsInfo() { + if strings.HasPrefix(info.Config.Name, mqttSessionsStreamNamePrefix) { + if strings.HasSuffix(info.Config.Name, "_ivan") { + gotIt = true + } else { + t.Fatalf("The stream %q should have been deleted", info.Config.Name) + } + } + } + if !gotIt { + t.Fatalf("The stream %q should not have been deleted", mqttSessionsStreamNamePrefix+"ivan") + } + + // We want to check that the record was properly transferred. + rmsg, err := js.GetMsg(mqttSessStreamName, 2) + if err != nil { + t.Fatalf("Unable to get session message: %v", err) + } + ps2 := &mqttPersistedSession{} + if err := json.Unmarshal(rmsg.Data, ps2); err != nil { + t.Fatalf("Error unpacking session record: %v", err) + } + if ps2.ID != "sub" { + t.Fatalf("Unexpected session record, %+v vs %+v", ps2, ps) + } + if qos, ok := ps2.Subs["foo"]; !ok || qos != 1 { + t.Fatalf("Unexpected session record, %+v vs %+v", ps2, ps) + } + if cons, ok := ps2.Cons["foo"]; !ok || !reflect.DeepEqual(cons, ps.Cons["foo"]) { + t.Fatalf("Unexpected session record, %+v vs %+v", ps2, ps) + } +} + ////////////////////////////////////////////////////////////////////////// // // Benchmarks diff --git a/server/test_test.go b/server/test_test.go index 6bdaf9056a6..116079c35ef 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -18,6 +18,7 @@ import ( "math/rand" "net/url" "os" + "strings" "testing" "time" ) @@ -200,6 +201,7 @@ func (c *cluster) shutdown() { os.Remove(cf) } if sd != _EMPTY_ { + sd = strings.TrimSuffix(sd, JetStreamStoreDir) os.RemoveAll(sd) } }