Skip to content

Commit

Permalink
adding logging to track down the bug of created file timestamp in the…
Browse files Browse the repository at this point in the history
… future
  • Loading branch information
igorrendulic committed Dec 17, 2020
1 parent 77ee40f commit c729d75
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 57 deletions.
23 changes: 12 additions & 11 deletions python/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import threading, queue
import os
import datetime
from random import randint

class StoreMP4VideoChunks(threading.Thread):

Expand Down Expand Up @@ -70,14 +71,6 @@ def saveToMp4(self, packet_store, start_timestamp):
segment_length = (maximum_dts - minimum_dts) * time_base

segment_length = int(segment_length * 1000) # convert to milliseconds
# print(segment_length)

# from datetime import datetime
# value = datetime.fromtimestamp(int(start_timestamp/1000))
# print("current timestamp: ", datetime.now(), "segment_start: ", value)
# now = int(time.time() * 1000)
# diff = now - (start_timestamp + segment_length)
# print("filename, currentime diff: ", diff)

output_file_name = self.path + "/" + str(start_timestamp) + "_" + str(segment_length) + ".mp4"
output = av.open(output_file_name, format="mp4", mode='w')
Expand All @@ -94,6 +87,9 @@ def saveToMp4(self, packet_store, start_timestamp):
for _,p in enumerate(packet_store):
# print ("PRE ", p, p.dts, p.pts, p.stream.type)
if (p.stream.type == "video"):
if p.dts is None:
continue

p.stream = output_video_stream
try:
output.mux(p)
Expand All @@ -105,13 +101,18 @@ def saveToMp4(self, packet_store, start_timestamp):
# output.mux(p)
# print ("POST ", p, p.dts, p.pts, p.stream.type)



output.close()

### INternal test
import pathlib
fp = pathlib.Path(output_file_name)
created_timestamp = int(fp.stat().st_ctime * 1000)
create_human_timestamp = datetime.datetime.fromtimestamp(created_timestamp//1000)

filename_human = datetime.datetime.fromtimestamp(start_timestamp//1000)
print("mp4 TS: ", start_timestamp,"created TS: ", created_timestamp, "mp4 human TS: ", filename_human.strftime('%Y-%m-%d %H:%M:%S'), "created human TS: ", create_human_timestamp.strftime('%Y-%m-%d %H:%M:%S'), "File Size: ", fp.stat().st_size)

print("timestamp: ", str(start_timestamp), "\t created at: ", created_timestamp, "\t diff: ", start_timestamp - created_timestamp)
if created_timestamp < start_timestamp:
print("BUG BUG BUG!")
if created_timestamp <= start_timestamp:
print("BUG BUG BUG! ------------> created timestamp should always be ahead")
2 changes: 1 addition & 1 deletion python/disk_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def remove_mp4_files(self):
now = int(time.time() * 1000)
remove_older_than = now - (self.__delay_seconds * 1000)

print("removing older mp4 files", self.__folder + "/" + self.__device, datetime.utcfromtimestamp(remove_older_than/1000).strftime('%Y-%m-%d %H:%M:%S'))
# print("removing older mp4 files", self.__folder + "/" + self.__device, datetime.utcfromtimestamp(remove_older_than/1000).strftime('%Y-%m-%d %H:%M:%S'))
files = os.listdir(self.__folder + "/" + self.__device)

if len(files) > 0:
Expand Down
13 changes: 11 additions & 2 deletions python/global_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@

class ArchivePacketGroup():

def __init__(self, packet_group, start_timestamp):
def __init__(self, start_timestamp):
self.packet_group = []
self.start_timestamp = start_timestamp

def addPacket(self, packet=None):
self.packet_group.append(packet)

def setPacketGroup(self, packet_group=[]):
self.packet_group = packet_group
self.start_timestamp = start_timestamp

def setStartTimestamp(self, timestamp=None):
self.start_timestamp = timestamp
124 changes: 81 additions & 43 deletions python/rtsp_to_rtmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from archive import StoreMP4VideoChunks
from disk_cleanup import CleanupScheduler
from global_vars import query_timestamp, RedisIsKeyFrameOnlyPrefix, RedisLastAccessPrefix, ArchivePacketGroup
import datetime


class RTSPtoRTMP(threading.Thread):
Expand Down Expand Up @@ -57,8 +58,12 @@ def run(self):
iframe_start_timestamp = 0
packet_group_queue = queue.Queue()

apg:ArchivePacketGroup = None

should_mux = False

last_loop_run = int(time.time() * 1000)

while True:
try:
options = {'rtsp_transport': 'tcp', 'stimeout': '5000000', 'max_delay': '5000000', 'use_wallclock_as_timestamps':"1", "fflags":"+genpts", 'acodec':'aac'}
Expand Down Expand Up @@ -100,66 +105,99 @@ def run(self):
if packet.is_keyframe:
# if we already found a keyframe previously, archive what we have

if len(current_packet_group) > 0:
packet_group = current_packet_group.copy()
if apg is not None:
# packet_group = current_packet_group.copy()

# send to archiver! (packet_group, iframe_start_timestamp)
if self._disk_path is not None:
apg = ArchivePacketGroup(packet_group, iframe_start_timestamp)
packet_group_queue.put(apg)
if self._disk_path is not None and apg is not None:
# apg = ArchivePacketGroup(packet_group, iframe_start_timestamp)
# apg.setPacketGroup(packet_group)

if len(apg.packet_group) > 0:
packet_group_queue.put(apg)


if apg.start_timestamp != iframe_start_timestamp:
print("this was the problem with TS -------------->: ", apg.start_timestamp, iframe_start_timestamp)

filename_human = datetime.datetime.fromtimestamp(apg.start_timestamp//1000)
print("ArchivePacketGroup GOP size: ", len(apg.packet_group), "file TS: ", apg.start_timestamp, "file human TS: ", filename_human.strftime('%Y-%m-%d %H:%M:%S.%f'))
print("Queue size: ", packet_group_queue.qsize(), "GOP size: ", len(apg.packet_group))

keyframe_found = True
current_packet_group = []
iframe_start_timestamp = int(round(time.time() * 1000))
apg = ArchivePacketGroup(iframe_start_timestamp)

# debugging
filename_human = datetime.datetime.fromtimestamp(iframe_start_timestamp/1000)
current_ts = int(time.time() * 1000)
cts_human = datetime.datetime.fromtimestamp(current_ts/1000)
print("next file TS: ", iframe_start_timestamp, "next file TS (human): ", filename_human.strftime('%Y-%m-%d %H:%M:%S.%f'), "current: ", current_ts, "current human: ", cts_human.strftime('%Y-%m-%d %H:%M:%S.%f'))

if keyframe_found == False:
print("skipping, since not a keyframe")
continue

'''
Live Redis Settings
-------------------
This should be invoked only every 500 ms, This If needs to moved to it's own method
'''

# shouldn't be a problem for redis but maybe every 200ms to query for latest timestamp only
settings_dict = self.redis_conn.hgetall(RedisLastAccessPrefix + device_id)

if settings_dict is not None and len(settings_dict) > 0:
settings_dict = { y.decode('utf-8'): settings_dict.get(y).decode('utf-8') for y in settings_dict.keys() }
if "last_query" in settings_dict:
ts = settings_dict['last_query']
else:
continue

# check if stream should be forwarded to Chrysalis Cloud RTMP
if "proxy_rtmp" in settings_dict:
should_mux_string = settings_dict['proxy_rtmp']
previous_should_mux = should_mux
if should_mux_string == "1":
should_mux = True
else:
should_mux = False

# check if it's time for flushing of current_packet_group
if should_mux != previous_should_mux and should_mux == True:
flush_current_packet_group = True
this_loop_run = int(time.time() * 1000)

# store settings to Redis only every 500ms
if this_loop_run - last_loop_run > 500:

settings_dict = self.redis_conn.hgetall(RedisLastAccessPrefix + device_id)

if settings_dict is not None and len(settings_dict) > 0:

settings_dict = { y.decode('utf-8'): settings_dict.get(y).decode('utf-8') for y in settings_dict.keys() }
if "last_query" in settings_dict:
ts = settings_dict['last_query']
else:
flush_current_packet_group = False

ts = int(ts)
ts_now = int(round(time.time() * 1000))
diff = ts_now - ts
# if no request in 10 seconds, stop
if diff < 10000:
try:
self.lock_condition.acquire()
query_timestamp = ts
self.lock_condition.notify_all()
finally:
self.lock_condition.release()

self.is_decode_packets_event.set()
continue

# check if stream should be forwarded to Chrysalis Cloud RTMP
if "proxy_rtmp" in settings_dict:
should_mux_string = settings_dict['proxy_rtmp']
previous_should_mux = should_mux
if should_mux_string == "1":
should_mux = True
else:
should_mux = False

# check if it's time for flushing of current_packet_group
if should_mux != previous_should_mux and should_mux == True:
flush_current_packet_group = True
else:
flush_current_packet_group = False

ts = int(ts)
ts_now = int(round(time.time() * 1000))
diff = ts_now - ts
# if no request in 10 seconds, stop
if diff < 10000:
try:
self.lock_condition.acquire()
query_timestamp = ts
self.lock_condition.notify_all()
finally:
self.lock_condition.release()

self.is_decode_packets_event.set()

last_loop_run = int(time.time() * 1000)

if packet.is_keyframe:
self.is_decode_packets_event.clear()
self._packet_queue.queue.clear()


# adding packets to APG
if apg is not None:
apg.addPacket(packet)
self._packet_queue.put(packet)

try:
Expand Down

0 comments on commit c729d75

Please sign in to comment.