From c729d75339c4af47ffec00eafe471fcec2a9a328 Mon Sep 17 00:00:00 2001 From: igorrendulic Date: Wed, 16 Dec 2020 17:54:44 -0800 Subject: [PATCH] adding logging to track down the bug of created file timestamp in the future --- python/archive.py | 23 ++++---- python/disk_cleanup.py | 2 +- python/global_vars.py | 13 ++++- python/rtsp_to_rtmp.py | 124 +++++++++++++++++++++++++++-------------- 4 files changed, 105 insertions(+), 57 deletions(-) diff --git a/python/archive.py b/python/archive.py index c66028b..907de20 100644 --- a/python/archive.py +++ b/python/archive.py @@ -17,6 +17,7 @@ import threading, queue import os import datetime +from random import randint class StoreMP4VideoChunks(threading.Thread): @@ -70,14 +71,6 @@ def saveToMp4(self, packet_store, start_timestamp): segment_length = (maximum_dts - minimum_dts) * time_base segment_length = int(segment_length * 1000) # convert to milliseconds - # print(segment_length) - - # from datetime import datetime - # value = datetime.fromtimestamp(int(start_timestamp/1000)) - # print("current timestamp: ", datetime.now(), "segment_start: ", value) - # now = int(time.time() * 1000) - # diff = now - (start_timestamp + segment_length) - # print("filename, currentime diff: ", diff) output_file_name = self.path + "/" + str(start_timestamp) + "_" + str(segment_length) + ".mp4" output = av.open(output_file_name, format="mp4", mode='w') @@ -94,6 +87,9 @@ def saveToMp4(self, packet_store, start_timestamp): for _,p in enumerate(packet_store): # print ("PRE ", p, p.dts, p.pts, p.stream.type) if (p.stream.type == "video"): + if p.dts is None: + continue + p.stream = output_video_stream try: output.mux(p) @@ -105,13 +101,18 @@ def saveToMp4(self, packet_store, start_timestamp): # output.mux(p) # print ("POST ", p, p.dts, p.pts, p.stream.type) + + output.close() ### INternal test import pathlib fp = pathlib.Path(output_file_name) created_timestamp = int(fp.stat().st_ctime * 1000) + create_human_timestamp = datetime.datetime.fromtimestamp(created_timestamp//1000) + + filename_human = datetime.datetime.fromtimestamp(start_timestamp//1000) + print("mp4 TS: ", start_timestamp,"created TS: ", created_timestamp, "mp4 human TS: ", filename_human.strftime('%Y-%m-%d %H:%M:%S'), "created human TS: ", create_human_timestamp.strftime('%Y-%m-%d %H:%M:%S'), "File Size: ", fp.stat().st_size) - print("timestamp: ", str(start_timestamp), "\t created at: ", created_timestamp, "\t diff: ", start_timestamp - created_timestamp) - if created_timestamp < start_timestamp: - print("BUG BUG BUG!") \ No newline at end of file + if created_timestamp <= start_timestamp: + print("BUG BUG BUG! ------------> created timestamp should always be ahead") \ No newline at end of file diff --git a/python/disk_cleanup.py b/python/disk_cleanup.py index 45072af..69979a1 100644 --- a/python/disk_cleanup.py +++ b/python/disk_cleanup.py @@ -28,7 +28,7 @@ def remove_mp4_files(self): now = int(time.time() * 1000) remove_older_than = now - (self.__delay_seconds * 1000) - print("removing older mp4 files", self.__folder + "/" + self.__device, datetime.utcfromtimestamp(remove_older_than/1000).strftime('%Y-%m-%d %H:%M:%S')) + # print("removing older mp4 files", self.__folder + "/" + self.__device, datetime.utcfromtimestamp(remove_older_than/1000).strftime('%Y-%m-%d %H:%M:%S')) files = os.listdir(self.__folder + "/" + self.__device) if len(files) > 0: diff --git a/python/global_vars.py b/python/global_vars.py index abeca2e..29f85e4 100644 --- a/python/global_vars.py +++ b/python/global_vars.py @@ -18,6 +18,15 @@ class ArchivePacketGroup(): - def __init__(self, packet_group, start_timestamp): + def __init__(self, start_timestamp): + self.packet_group = [] + self.start_timestamp = start_timestamp + + def addPacket(self, packet=None): + self.packet_group.append(packet) + + def setPacketGroup(self, packet_group=[]): self.packet_group = packet_group - self.start_timestamp = start_timestamp \ No newline at end of file + + def setStartTimestamp(self, timestamp=None): + self.start_timestamp = timestamp \ No newline at end of file diff --git a/python/rtsp_to_rtmp.py b/python/rtsp_to_rtmp.py index 3098d45..cf22cb7 100644 --- a/python/rtsp_to_rtmp.py +++ b/python/rtsp_to_rtmp.py @@ -27,6 +27,7 @@ from archive import StoreMP4VideoChunks from disk_cleanup import CleanupScheduler from global_vars import query_timestamp, RedisIsKeyFrameOnlyPrefix, RedisLastAccessPrefix, ArchivePacketGroup +import datetime class RTSPtoRTMP(threading.Thread): @@ -57,8 +58,12 @@ def run(self): iframe_start_timestamp = 0 packet_group_queue = queue.Queue() + apg:ArchivePacketGroup = None + should_mux = False + last_loop_run = int(time.time() * 1000) + while True: try: options = {'rtsp_transport': 'tcp', 'stimeout': '5000000', 'max_delay': '5000000', 'use_wallclock_as_timestamps':"1", "fflags":"+genpts", 'acodec':'aac'} @@ -100,66 +105,99 @@ def run(self): if packet.is_keyframe: # if we already found a keyframe previously, archive what we have - if len(current_packet_group) > 0: - packet_group = current_packet_group.copy() + if apg is not None: + # packet_group = current_packet_group.copy() # send to archiver! (packet_group, iframe_start_timestamp) - if self._disk_path is not None: - apg = ArchivePacketGroup(packet_group, iframe_start_timestamp) - packet_group_queue.put(apg) + if self._disk_path is not None and apg is not None: + # apg = ArchivePacketGroup(packet_group, iframe_start_timestamp) + # apg.setPacketGroup(packet_group) + + if len(apg.packet_group) > 0: + packet_group_queue.put(apg) + + + if apg.start_timestamp != iframe_start_timestamp: + print("this was the problem with TS -------------->: ", apg.start_timestamp, iframe_start_timestamp) + + filename_human = datetime.datetime.fromtimestamp(apg.start_timestamp//1000) + print("ArchivePacketGroup GOP size: ", len(apg.packet_group), "file TS: ", apg.start_timestamp, "file human TS: ", filename_human.strftime('%Y-%m-%d %H:%M:%S.%f')) + print("Queue size: ", packet_group_queue.qsize(), "GOP size: ", len(apg.packet_group)) keyframe_found = True current_packet_group = [] iframe_start_timestamp = int(round(time.time() * 1000)) + apg = ArchivePacketGroup(iframe_start_timestamp) + + # debugging + filename_human = datetime.datetime.fromtimestamp(iframe_start_timestamp/1000) + current_ts = int(time.time() * 1000) + cts_human = datetime.datetime.fromtimestamp(current_ts/1000) + print("next file TS: ", iframe_start_timestamp, "next file TS (human): ", filename_human.strftime('%Y-%m-%d %H:%M:%S.%f'), "current: ", current_ts, "current human: ", cts_human.strftime('%Y-%m-%d %H:%M:%S.%f')) if keyframe_found == False: print("skipping, since not a keyframe") continue + + ''' + Live Redis Settings + ------------------- + This should be invoked only every 500 ms, This If needs to moved to it's own method + ''' - # shouldn't be a problem for redis but maybe every 200ms to query for latest timestamp only - settings_dict = self.redis_conn.hgetall(RedisLastAccessPrefix + device_id) - - if settings_dict is not None and len(settings_dict) > 0: - settings_dict = { y.decode('utf-8'): settings_dict.get(y).decode('utf-8') for y in settings_dict.keys() } - if "last_query" in settings_dict: - ts = settings_dict['last_query'] - else: - continue - - # check if stream should be forwarded to Chrysalis Cloud RTMP - if "proxy_rtmp" in settings_dict: - should_mux_string = settings_dict['proxy_rtmp'] - previous_should_mux = should_mux - if should_mux_string == "1": - should_mux = True - else: - should_mux = False - - # check if it's time for flushing of current_packet_group - if should_mux != previous_should_mux and should_mux == True: - flush_current_packet_group = True + this_loop_run = int(time.time() * 1000) + + # store settings to Redis only every 500ms + if this_loop_run - last_loop_run > 500: + + settings_dict = self.redis_conn.hgetall(RedisLastAccessPrefix + device_id) + + if settings_dict is not None and len(settings_dict) > 0: + + settings_dict = { y.decode('utf-8'): settings_dict.get(y).decode('utf-8') for y in settings_dict.keys() } + if "last_query" in settings_dict: + ts = settings_dict['last_query'] else: - flush_current_packet_group = False - - ts = int(ts) - ts_now = int(round(time.time() * 1000)) - diff = ts_now - ts - # if no request in 10 seconds, stop - if diff < 10000: - try: - self.lock_condition.acquire() - query_timestamp = ts - self.lock_condition.notify_all() - finally: - self.lock_condition.release() - - self.is_decode_packets_event.set() + continue + + # check if stream should be forwarded to Chrysalis Cloud RTMP + if "proxy_rtmp" in settings_dict: + should_mux_string = settings_dict['proxy_rtmp'] + previous_should_mux = should_mux + if should_mux_string == "1": + should_mux = True + else: + should_mux = False + + # check if it's time for flushing of current_packet_group + if should_mux != previous_should_mux and should_mux == True: + flush_current_packet_group = True + else: + flush_current_packet_group = False + + ts = int(ts) + ts_now = int(round(time.time() * 1000)) + diff = ts_now - ts + # if no request in 10 seconds, stop + if diff < 10000: + try: + self.lock_condition.acquire() + query_timestamp = ts + self.lock_condition.notify_all() + finally: + self.lock_condition.release() + + self.is_decode_packets_event.set() + + last_loop_run = int(time.time() * 1000) if packet.is_keyframe: self.is_decode_packets_event.clear() self._packet_queue.queue.clear() - + # adding packets to APG + if apg is not None: + apg.addPacket(packet) self._packet_queue.put(packet) try: