track->onMessage() does not trigger when add WebSockets #756
-
Hello @paullouisageneau! i'm successfully edit media-reciever example to access raw audio data, but now i'm stuck. I have added web sockets and onMessage callback does not triggered. For web sockets i use streamer example. Please can you help me with this problem? client.js const iceConnectionLog = document.getElementById('ice-connection-state'),
iceGatheringLog = document.getElementById('ice-gathering-state'),
signalingLog = document.getElementById('signaling-state'),
dataChannelLog = document.getElementById('data-channel');
const clientId = randomId(10);
const websocket = new WebSocket('ws://127.0.0.1:8000/' + clientId);
websocket.onopen = () => {
document.getElementById('start').disabled = false;
}
let pc = null;
let dc = null;
async function createPeerConnection(offer) {
const config = {
bundlePolicy: "max-bundle",
};
let pc = new RTCPeerConnection(config);
await pc.setRemoteDescription(offer);
const media = await navigator.mediaDevices.getUserMedia({
audio: {channelCount: 1, sampleRate: 16000}
});
media.getTracks().forEach(track => pc.addTrack(track, media));
const answer = await pc.createAnswer();
await pc.setLocalDescription(answer);
return pc;
}
websocket.onmessage = async (evt) => {
if (typeof evt.data !== 'string') {
return;
}
const message = JSON.parse(evt.data);
if (message.type == "offer") {
document.getElementById('offer-sdp').textContent = message.sdp;
await handleOffer(message)
}
}
async function waitGatheringComplete() {
return new Promise((resolve) => {
if (pc.iceGatheringState === 'complete') {
resolve();
} else {
pc.addEventListener('icegatheringstatechange', () => {
if (pc.iceGatheringState === 'complete') {
resolve();
}
});
}
});
}
async function sendAnswer(pc) {
const answer = pc.localDescription;
document.getElementById('answer-sdp').textContent = answer.sdp;
websocket.send(JSON.stringify({
id: "server",
type: answer.type,
sdp: answer.sdp,
}));
console.log('send answer')
}
async function handleOffer(offer) {
console.log('handle offer')
pc = await createPeerConnection(offer);
await sendAnswer(pc);
}
function sendRequest() {
websocket.send(JSON.stringify({
id: "server",
type: "request",
}));
console.log('send request')
}
function start() {
document.getElementById('start').style.display = 'none';
document.getElementById('stop').style.display = 'inline-block';
document.getElementById('media').style.display = 'block';
sendRequest();
}
function stop() {
document.getElementById('stop').style.display = 'none';
document.getElementById('media').style.display = 'none';
document.getElementById('start').style.display = 'inline-block';
// close data channel
if (dc) {
dc.close();
dc = null;
}
// close transceivers
if (pc.getTransceivers) {
pc.getTransceivers().forEach((transceiver) => {
if (transceiver.stop) {
transceiver.stop();
}
});
}
// close local audio/video
pc.getSenders().forEach((sender) => {
const track = sender.track;
if (track !== null) {
sender.track.stop();
}
});
// close peer connection
setTimeout(() => {
pc.close();
pc = null;
}, 500);
}
// Helper function to generate a random ID
function randomId(length) {
const characters = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz';
const pickRandom = () => characters.charAt(Math.floor(Math.random() * characters.length));
return [...Array(length) ].map(pickRandom).join('');
}
// Helper function to generate a timestamp
let startTime = null;
function currentTimestamp() {
if (startTime === null) {
startTime = Date.now();
return 0;
} else {
return Date.now() - startTime;
}
} main.cpp /*
* libdatachannel media receiver example
* Copyright (c) 2020 Staz Modrzynski
* Copyright (c) 2020 Paul-Louis Ageneau
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#include "rtc/rtc.hpp"
#include <iostream>
#include <memory>
#include <utility>
#include <nlohmann/json.hpp>
#include <arpa/inet.h>
#include <fstream>
#include <netinet/in.h>
#include <sys/socket.h>
#include "libopus_wrapper/include/opus_wrapper.h"
#include "opus_wrapper.h"
#include "../examples/streamer/helpers.hpp"
#include "../examples/streamer/dispatchqueue.hpp"
using nlohmann::json;
typedef int SOCKET;
using namespace rtc;
using namespace std;
using namespace std::chrono_literals;
std::string ip_address = "127.0.0.1";
std::uint16_t port = 8000;
template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
shared_ptr<PeerConnection> create_peer_connection(const Configuration &config,
weak_ptr<WebSocket> wws,
std::string id);
shared_ptr<PeerConnection> pc;
void ws_on_message(json message, Configuration config, shared_ptr<WebSocket> ws)
{
auto it = message.find("id");
if (it == message.end())
return;
std::string id = it->get<string>();
it = message.find("type");
if (it == message.end())
return;
std::string type = it->get<string>();
if (type == "request") {
pc = create_peer_connection(config, make_weak_ptr(ws), id);
} else if (type == "answer") {
auto sdp = message["sdp"].get<string>();
auto description = Description(sdp, type);
std::cout << "LSS | Answer | sdp " << sdp << std::endl;
pc->setRemoteDescription(description);
}
}
int main(int argc, char** argv)
{
rtc::InitLogger(rtc::LogLevel::Debug);
Configuration config;
std::string stun_server = "stun:stun.l.google.com:19302";
std::cout << "LSS | stun server: " << stun_server << std::endl;
// config.iceServers.emplace_back(stun_server);
// config.disableAutoNegotiation = true;
std::string local_id = "server";
std::cout << "LSS | local id: " << local_id << std::endl;
auto ws = make_shared<WebSocket>();
ws->onOpen([]() {
std::cout << "LSS | websocket connected, signaling ready" << std::endl;
});
ws->onClosed([]() {
std::cout << "LSS | websocket closed" << std::endl;
});
ws->onError([](const string &error) {
std::cout << "LSS | websocket failed: " << error << std::endl;
});
ws->onMessage([&](variant<binary, string> data) {
if (!holds_alternative<string>(data))
return;
json message = json::parse(get<string>(data));
ws_on_message(message, config, ws);
});
std::string url = "ws://" + ip_address + ":" + std::to_string(port) + "/" + local_id;
std::cout << "LSS | url is " << url << std::endl;
ws->open(url);
std::cout << "LSS | waiting for signaling to be connected..." << std::endl;
while (!ws->isOpen()) {
if (ws->isClosed())
return 1;
this_thread::sleep_for(100ms);
}
while (true) {
string id;
cout << "LSS | enter to exit" << endl;
cin >> id;
cin.ignore();
cout << "LSS | exiting" << endl;
break;
}
}
shared_ptr<PeerConnection> create_peer_connection(const Configuration &config,
weak_ptr<WebSocket> wws,
std::string id)
{
auto pc = make_shared<PeerConnection>(config);
pc->onStateChange([id](PeerConnection::State state) {
cout << "State: " << state << endl;
if (state == PeerConnection::State::Disconnected ||
state == PeerConnection::State::Failed ||
state == PeerConnection::State::Closed) {
}
});
pc->onGatheringStateChange(
[wpc = make_weak_ptr(pc), id, wws](PeerConnection::GatheringState state) {
cout << "Gathering State: " << state << endl;
if (state == PeerConnection::GatheringState::Complete) {
if(auto pc = wpc.lock()) {
auto description = pc->localDescription();
json message = {
{"id", id},
{"type", description->typeString()},
{"sdp", std::string(description.value())}
};
// Gathering complete, send answer
if (auto ws = wws.lock()) {
ws->send(message.dump());
}
}
}
});
opus::Decoder decoder(16000, 1);
rtc::Description::Audio media("audio", rtc::Description::Direction::RecvOnly);
media.addOpusCodec(96);
media.setBitrate(16000);
auto track = pc->addTrack(media);
auto session = std::make_shared<rtc::RtcpReceivingSession>();
track->setMediaHandler(session);
std::ofstream fs("audio.raw", std::ios::out | std::ios::binary);
int num_packets_to_write = 500;
track->onMessage([session, &decoder, &fs, &num_packets_to_write](rtc::binary message) {
// This is an RTP packet
std::cout << "LSS | receive packet" << std::endl;
auto rtp_packet = (const rtc::RTP*)message.data();
auto rtp_payload_size = message.size() - rtp_packet->getSize();
auto rtp_payload_start = reinterpret_cast<const char*>(rtp_packet->getBody());
const std::vector<unsigned char> packet(
rtp_payload_start, rtp_payload_start + rtp_payload_size);
auto decoded = decoder.Decode(packet, rtp_payload_size, false);
if (num_packets_to_write-- > 0) {
std::cout << "write packet to file" << std::endl;
fs.write(reinterpret_cast<const char *>(decoded.data()),
sizeof(std::int16_t) * decoded.size());
fs.flush();
} else {
std::cout << "complete write" << std::endl;
fs.close();
exit(2);
}
}, nullptr);
pc->setLocalDescription();
return pc;
}; Output ...
2022-11-24 22:37:28.163 INFO [4097392] [rtc::impl::PeerConnection::changeSignalingState@1172] Changed signaling state to stable
2022-11-24 22:37:28.164 INFO [4097582] [rtc::impl::IceTransport::LogCallback@352] juice: agent.c:1022: Changing state to connected
2022-11-24 22:37:28.164 INFO [4097582] [rtc::impl::PeerConnection::initDtlsTransport@254] This connection requires media support
2022-11-24 22:37:28.164 DEBUG [4097582] [rtc::impl::DtlsTransport::DtlsTransport@391] Initializing DTLS transport (OpenSSL)
2022-11-24 22:37:28.164 DEBUG [4097582] [rtc::impl::DtlsSrtpTransport::DtlsSrtpTransport@67] Initializing DTLS-SRTP transport
2022-11-24 22:37:28.164 DEBUG [4097582] [rtc::impl::DtlsTransport::start@480] Starting DTLS recv thread
2022-11-24 22:37:28.165 INFO [4097582] [rtc::impl::IceTransport::LogCallback@352] juice: agent.c:1022: Changing state to completed
2022-11-24 22:37:28.223 INFO [4097592] [rtc::impl::DtlsTransport::runRecvLoop@579] DTLS handshake finished
2022-11-24 22:37:28.223 INFO [4097592] [rtc::impl::DtlsSrtpTransport::postHandshake@266] Deriving SRTP keying material (OpenSSL)
2022-11-24 22:37:28.223 INFO [4097592] [rtc::impl::PeerConnection::changeState@1141] Changed state to connected
State: connected
2022-11-24 22:37:38.164 DEBUG [4097392] [rtc::impl::WsTransport::incoming@149] WebSocket sending ping
2022-11-24 22:37:38.164 DEBUG [4097392] [rtc::impl::WsTransport::sendFrame@348] WebSocket sending frame: opcode=9, length=4
2022-11-24 22:37:38.164 DEBUG [4097392] [rtc::impl::WsTransport::recvFrame@284] WebSocket received frame: opcode=10, length=4
2022-11-24 22:37:38.164 DEBUG [4097392] [rtc::impl::WsTransport::recvFrame@328] WebSocket received pong
2022-11-24 22:37:39.699 DEBUG [4097392] [rtc::impl::WsTransport::recvFrame@284] WebSocket received frame: opcode=9, length=4
2022-11-24 22:37:39.699 DEBUG [4097392] [rtc::impl::WsTransport::recvFrame@323] WebSocket received ping, sending pong
2022-11-24 22:37:39.699 DEBUG [4097392] [rtc::impl::WsTransport::sendFrame@348] WebSocket sending frame: opcode=10, length=4 |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
The Also, there seems to be a confusion between bitrate and sampling rate, |
Beta Was this translation helpful? Give feedback.
The
track
shared ptr is local to thecreate_peer_connection
function, therefore the track will be closed when returning. You need to store it somewhere like for the peer connection.Also, there seems to be a confusion between bitrate and sampling rate,
setBitrate
expects the target encoding bitrate in Kbit/s, whilegetUserMedia
and the opus decoder expect the audio sampling rate in Hz. The bitrate should be way lower, as 64 Kbit/s is fine for speech and 128 kbit/s is fine for music.