diff --git a/publish.py b/publish.py index d182d52..5fb83fd 100644 --- a/publish.py +++ b/publish.py @@ -13,6 +13,7 @@ import re import traceback import subprocess +import struct try: import hashlib from urllib.parse import urlparse @@ -311,6 +312,9 @@ def __init__(self, params): self.salt = "" self.aom = params.aom self.av1 = params.av1 + self.socketout = params.socketout + self.socketport = params.socketport + self.socket = None try: if self.password: @@ -448,7 +452,65 @@ async def sendMessageAsync(self, msg): # send message to wss except Exception as e: printwarn(get_exception_info(E)) - + def setup_socket(self): + import socket + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.socket.bind(('0.0.0.0', int(self.socketport))) + + def on_new_socket_sample(self, sink): + sample = sink.emit("pull-sample") + if sample: + buffer = sample.get_buffer() + caps = sample.get_caps() + height = caps.get_structure(0).get_value("height") + width = caps.get_structure(0).get_value("width") + + _, map_info = buffer.map(Gst.MapFlags.READ) + frame_data = map_info.data + + # Send frame size + self.socket.sendto(struct.pack('!III', width, height, len(frame_data)), ('127.0.0.1', int(self.socketport))) + print("Sending") + # Send frame data in chunks + chunk_size = 65507 # Maximum safe UDP packet size + for i in range(0, len(frame_data), chunk_size): + chunk = frame_data[i:i+chunk_size] + self.socket.sendto(chunk, ('127.0.0.1', int(self.socketport))) + + buffer.unmap(map_info) + return Gst.FlowReturn.OK + + def new_sample(self, sink): + if self.processing: + return False + self.processing = True + try : + sample = sink.emit("pull-sample") + if sample: + buffer = sample.get_buffer() + caps = sample.get_caps() + height = int(caps.get_structure(0).get_int("height").value) + width = int(caps.get_structure(0).get_int("width").value) + frame_data = buffer.extract_dup(0, buffer.get_size()) + np_frame_data = np.frombuffer(frame_data, dtype=np.uint8).reshape(height, width, 3) + print(np.shape(np_frame_data), np_frame_data[0,0,:]) + + frame_shape = (720 * 1280 * 3) + frame_buffer = np.ndarray(frame_shape+5, dtype=np.uint8, buffer=self.shared_memory.buf) + frame_buffer[5:5+width*height*3] = np_frame_data.flatten(order='K') # K means order as how ordered in memory + frame_buffer[0] = width/255 + frame_buffer[1] = width%255 + frame_buffer[2] = height/255 + frame_buffer[3] = height%255 + frame_buffer[4] = self.counter%255 + self.counter+=1 + self.trigger_socket.sendto(b"update", ("127.0.0.1", 12345)) + + except Exception as E: + printwarn(get_exception_info(E)) + + self.processing = False + return False def on_incoming_stream(self, _, pad): try: @@ -625,6 +687,17 @@ def on_incoming_stream(self, _, pad): if self.ndiout: # I'm handling this on elsewhere now pass + elif self.socketout: + print("SOCKET VIDEO OUT") + out = Gst.parse_bin_from_description( + "queue ! rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! video/x-raw,format=BGR ! appsink name=appsink emit-signals=true", True) + self.pipe.add(out) + out.sync_state_with_parent() + sink = out.get_static_pad('sink') + pad.link(sink) + + appsink = self.pipe.get_by_name('appsink') + appsink.connect("new-sample", self.on_new_socket_sample) elif self.view: print("DISPLAY OUTPUT MODE BEING SETUP") @@ -783,7 +856,7 @@ def on_incoming_stream(self, _, pad): print(self.shared_memory) appsink = self.pipe.get_by_name('appsink') appsink.set_property("emit-signals", True) - appsink.connect("new-sample", new_sample) + appsink.connect("new-sample", self.new_sample) elif "audio" in name: if self.noaudio: @@ -1228,43 +1301,6 @@ def on_stats(promise, abin, data): except Exception as E: printwarn(get_exception_info(E)) - def new_sample(sink): - if self.processing: - return False - self.processing = True - try : - sample = sink.emit("pull-sample") - if sample: - buffer = sample.get_buffer() - caps = sample.get_caps() - height = int(caps.get_structure(0).get_int("height").value) - width = int(caps.get_structure(0).get_int("width").value) - frame_data = buffer.extract_dup(0, buffer.get_size()) - np_frame_data = np.frombuffer(frame_data, dtype=np.uint8).reshape(height, width, 3) - print(np.shape(np_frame_data), np_frame_data[0,0,:]) - - frame_shape = (720 * 1280 * 3) - frame_buffer = np.ndarray(frame_shape+5, dtype=np.uint8, buffer=self.shared_memory.buf) - frame_buffer[5:5+width*height*3] = np_frame_data.flatten(order='K') # K means order as how ordered in memory - frame_buffer[0] = width/255 - frame_buffer[1] = width%255 - frame_buffer[2] = height/255 - frame_buffer[3] = height%255 - frame_buffer[4] = self.counter%255 - self.counter+=1 - self.trigger_socket.sendto(b"update", ("127.0.0.1", 12345)) - - except Exception as E: - printwarn(get_exception_info(E)) - - self.processing = False - return False - - def on_frame_probe(pad, info): - buf = info.get_buffer() - print(f'[{buf.pts / Gst.SECOND:6.2f}]') - return Gst.PadProbeReturn.OK - print("creating a new webrtc bin") started = True @@ -1806,7 +1842,9 @@ async def main(): parser.add_argument('--audio-pipeline', type=str, default=None, help='Custom GStreamer audio source pipeline') parser.add_argument('--timestamp', action='store_true', help='Add a timestamp to the video output, if possible') parser.add_argument('--clockstamp', action='store_true', help='Add a clock overlay to the video output, if possible') - + parser.add_argument('--socketport', type=str, default=12345, help='Output video frames to a socket; specify the port number') + parser.add_argument('--socketout', type=str, help='Output video frames to a socket; specify the stream ID') + args = parser.parse_args() Gst.init(None) @@ -1864,6 +1902,8 @@ async def main(): elif args.fdsink: args.streamin = args.fdsink + elif args.socketout: + args.streamin = args.socketout elif args.framebuffer: if not np: print("You must install Numpy for this to work.\npip3 install numpy") @@ -2494,6 +2534,10 @@ async def main(): args.pipeline = PIPELINE_DESC c = WebRTCClient(args) + + if args.socketout: + c.setup_socket() + while True: try: await c.connect()