-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreddit_stream.py
166 lines (126 loc) · 4.68 KB
/
reddit_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# Python
import _thread
import threading
import logging
import os
import time
import datetime
import math
# 3rd Party
import praw
from retrying import retry
# Self
import util
import official_forum
from config import config_helper as config
import status
import logger
from praw_wrapper import praw_object_wrapper_t
from response import get_response
from response import reply_to_summon
class stream_thread_t(threading.Thread):
def __init__(self, manager, type):
threading.Thread.__init__(self, name="{}Thread".format(type))
if not hasattr(manager.subreddit.stream, type):
raise ValueError("stream_thread_t was passed invalid type")
self.manager = manager
self.type = type
self.handler = getattr(manager.subreddit.stream, type)
self.processed = {}
logging.debug("Created {} daemon thread.".format(self.type))
def get_backlog_window(self):
return max(status.get_last_update(), time.time() - config.backlog_time_limit)
@retry(retry_on_exception=util.is_praw_error,
wait_exponential_multiplier=config.praw_error_wait_time,
wait_func=util.praw_error_retry)
def check_and_queue(self, object):
logging.log(logger.DEBUG_ALL, "{} daemon thread found {}.".format(self.type, object))
if object.id in self.processed:
logging.debug("{} has already been processed.".format(object))
return
if self.manager.bot.replied_to.contains(object):
logging.debug("{} has already been replied to.".format(object))
self.processed[object.id] = True
return
wrapped = praw_object_wrapper_t(self.manager.bot, object)
self.manager.list.append(wrapped)
self.processed[object.id] = True
logging.debug("Added {} to stream queue (len={}).".format(wrapped, len(self.manager.list)))
if not self.manager.bot.stream_event.is_set():
# Notify the main thread to wake up so it can process the new entry
self.manager.bot.stream_event.set()
logging.debug("{} thread notified main thread".format(self.type))
@retry(retry_on_exception=util.is_praw_error,
wait_exponential_multiplier=config.praw_error_wait_time,
wait_func=util.praw_error_retry)
def do_backlog(self, since):
# Grab comments
backlogged = getattr(self.manager.subreddit, 'new' if self.type == 'submissions' else 'comments')
count = 0
# Docs say that no limit may be limited to 1000 objects anyway.
# https://praw.readthedocs.io/en/latest/code_overview/other/listinggenerator.html#praw.models.ListingGenerator
for object in backlogged(limit=None):
if object.created_utc < since:
# flag backlog as resolved in bot state
self.manager.bot.backlog[self.type] = False
logging.info("Completed pulling {} backlog, checked {} {}.".format(self.type, count, self.type))
return
count = count + 1;
self.check_and_queue(object)
@retry(retry_on_exception=util.is_praw_error,
wait_exponential_multiplier=config.praw_error_wait_time,
wait_func=util.praw_error_retry)
def do_stream(self):
for object in self.handler():
self.check_and_queue(object)
def run(self):
# Exception handler shell
try:
self.main()
# If ANY unhandled exception occurs, catch it, log it, THEN crash.
except BaseException:
logging.exception("Fatal error occurred in {} thread.".format(self.type))
_thread.interrupt_main()
os._exit(1)
raise
def main(self):
logging.debug("Started {} daemon thread.".format(self.type))
since = self.get_backlog_window()
logging.info("Pulling {} since [{}]...".format(
self.type,
datetime.datetime.fromtimestamp(math.floor(since))
))
self.do_backlog(since)
self.do_stream()
class stream_manager_t:
def __init__(self, bot):
self.bot = bot
self.reddit = bot.reddit
self.subreddit_str = "+".join(config.subreddits)
self.subreddit = self.reddit.subreddit(self.subreddit_str)
self.reply_queue = bot.reply_queue
self.list = []
self.threads = []
self.__init_threads__()
def __init_threads__(self):
self.threads.append(stream_thread_t(self, 'comments'))
self.threads.append(stream_thread_t(self, 'submissions'))
for thread in self.threads:
thread.daemon = True
thread.start()
def __len__(self):
return len(self.list)
def is_active(self):
return len(self) > 0
def process(self):
while self.is_active():
# pop the first object
object = self.list.pop(0)
if self.reply_queue.contains_id(object.id):
continue
if object.author == self.reddit.user.me():
logging.debug("{} author is self, ignoring".format(object))
continue
replied = object.parse_and_reply(self.reply_queue)
if not replied and object.is_comment() and ( "u/" + config.username ).lower() in object.get_body().lower():
reply_to_summon( self.bot, object )