-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scrape images, video, and post forwarding information for Telegram #413
base: master
Are you sure you want to change the base?
Changes from 29 commits
72b26f2
de4ebed
b8efce2
ed82916
fb8d73a
d32c9ad
a7eb54d
4e59638
2ce014a
f978954
babcddd
1e4e0c2
b276c3c
97d38e5
9b3faec
21f7b62
5648e95
c18ca0f
0a4bd39
f385135
b13e62e
e2d9223
0822a9c
07a5f6f
65723f1
56e4232
056cd62
73f10a4
cbdfeed
cacd783
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,10 +10,9 @@ | |
import typing | ||
import urllib.parse | ||
|
||
|
||
_logger = logging.getLogger(__name__) | ||
_SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$') | ||
|
||
_STYLE_MEDIA_URL_PATTERN = re.compile(r'url\(\'(.*?)\'\)') | ||
|
||
@dataclasses.dataclass | ||
class LinkPreview: | ||
|
@@ -24,26 +23,12 @@ class LinkPreview: | |
image: typing.Optional[str] = None | ||
|
||
|
||
@dataclasses.dataclass | ||
class TelegramPost(snscrape.base.Item): | ||
url: str | ||
date: datetime.datetime | ||
content: str | ||
outlinks: list | ||
linkPreview: typing.Optional[LinkPreview] = None | ||
|
||
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks') | ||
|
||
def __str__(self): | ||
return self.url | ||
|
||
|
||
@dataclasses.dataclass | ||
class Channel(snscrape.base.Entity): | ||
username: str | ||
title: str | ||
verified: bool | ||
photo: str | ||
title: typing.Optional[str] = None | ||
verified: typing.Optional[bool] = None | ||
photo: typing.Optional[str] = None | ||
description: typing.Optional[str] = None | ||
members: typing.Optional[int] = None | ||
photos: typing.Optional[snscrape.base.IntWithGranularity] = None | ||
|
@@ -60,6 +45,55 @@ def __str__(self): | |
return f'https://t.me/s/{self.username}' | ||
|
||
|
||
@dataclasses.dataclass | ||
class TelegramPost(snscrape.base.Item): | ||
url: str | ||
date: datetime.datetime | ||
content: str | ||
outlinks: typing.List[str] = None | ||
mentions: typing.List[str] = None | ||
hashtags: typing.List[str] = None | ||
forwarded: typing.Optional['Channel'] = None | ||
forwardedUrl: typing.Optional[str] = None | ||
media: typing.Optional[typing.List['Medium']] = None | ||
views: typing.Optional[snscrape.base.IntWithGranularity] = None | ||
linkPreview: typing.Optional[LinkPreview] = None | ||
|
||
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks') | ||
|
||
def __str__(self): | ||
return self.url | ||
|
||
|
||
class Medium: | ||
pass | ||
|
||
|
||
@dataclasses.dataclass | ||
class Photo(Medium): | ||
url: str | ||
|
||
|
||
@dataclasses.dataclass | ||
class Video(Medium): | ||
thumbnailUrl: str | ||
duration: float | ||
url: typing.Optional[str] = None | ||
|
||
|
||
@dataclasses.dataclass | ||
class VoiceMessage(Medium): | ||
url: str | ||
duration: str | ||
bars:typing.List[float] | ||
|
||
|
||
@dataclasses.dataclass | ||
class Gif(Medium): | ||
thumbnailUrl: str | ||
url: typing.Optional[str] = None | ||
|
||
|
||
class TelegramChannelScraper(snscrape.base.Scraper): | ||
name = 'telegram-channel' | ||
|
||
|
@@ -90,25 +124,85 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False): | |
_logger.warning(f'Possibly incorrect URL: {rawUrl!r}') | ||
url = rawUrl.replace('//t.me/', '//t.me/s/') | ||
date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') | ||
media = [] | ||
outlinks = [] | ||
mentions = [] | ||
hashtags = [] | ||
forwarded = None | ||
forwardedUrl = None | ||
|
||
if (forwardTag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')): | ||
forwardedUrl = forwardTag['href'] | ||
forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0] | ||
forwarded = Channel(username = forwardedName) | ||
|
||
if (message := post.find('div', class_ = 'tgme_widget_message_text')): | ||
content = message.text | ||
outlinks = [] | ||
for link in post.find_all('a'): | ||
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): | ||
# Author links at the top (avatar and name) | ||
continue | ||
if link['href'] == rawUrl or link['href'] == url: | ||
# Generic filter of links to the post itself, catches videos, photos, and the date link | ||
continue | ||
if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): | ||
# Individual photo or video link | ||
continue | ||
href = urllib.parse.urljoin(pageUrl, link['href']) | ||
if href not in outlinks: | ||
outlinks.append(href) | ||
content = message.get_text(separator="\n") | ||
else: | ||
content = None | ||
outlinks = [] | ||
|
||
for link in post.find_all('a'): | ||
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): | ||
# Author links at the top (avatar and name) | ||
continue | ||
if link['href'] == rawUrl or link['href'] == url: | ||
style = link.attrs.get('style', '') | ||
# Generic filter of links to the post itself, catches videos, photos, and the date link | ||
if style != '': | ||
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style) | ||
if len(imageUrls) == 1: | ||
media.append(Photo(url = imageUrls[0])) | ||
continue | ||
if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): | ||
style = link.attrs.get('style', '') | ||
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style) | ||
if len(imageUrls) == 1: | ||
media.append(Photo(url = imageUrls[0])) | ||
# resp = self._get(image[0]) | ||
# encoded_string = base64.b64encode(resp.content) | ||
# Individual photo or video link | ||
continue | ||
if link.text.startswith('@'): | ||
mentions.append(link.text.strip('@')) | ||
continue | ||
if link.text.startswith('#'): | ||
hashtags.append(link.text.strip('#')) | ||
continue | ||
href = urllib.parse.urljoin(pageUrl, link['href']) | ||
if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl): | ||
outlinks.append(href) | ||
|
||
for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}): | ||
audioUrl = voicePlayer.find('audio')['src'] | ||
durationStr = voicePlayer.find('time').text | ||
duration = _durationStrToSeconds(durationStr) | ||
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')] | ||
|
||
media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights)) | ||
|
||
for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the extraction of images and videos is done separately, the order is not preserved. For example, https://t.me/s/nexta_live/43102 has video 1 (without URL), image, video 2 (with URL), but the image gets listed first. I think that can be fixed by simply merging this loop (and also the one for the voice player extraction) into the general link loop above, since they're all |
||
iTag = videoPlayer.find('i') | ||
if iTag is None: | ||
videoUrl = None | ||
videoThumbnailUrl = None | ||
else: | ||
style = iTag['style'] | ||
videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0] | ||
videoTag = videoPlayer.find('video') | ||
videoUrl = None if videoTag is None else videoTag['src'] | ||
mKwargs = { | ||
'thumbnailUrl': videoThumbnailUrl, | ||
'url': videoUrl, | ||
} | ||
timeTag = videoPlayer.find('time') | ||
if timeTag is None: | ||
cls = Gif | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have some examples? I don't remember seeing fake-GIFs on Telegram before. (Also for the future test suite.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Telegram doesn't have a policy on whether or not they're allowed, right? I don't think a real-GIF would ever inaccurately go down this path, so isn't it just making the logic more robust against change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer erroring out on things the code doesn't actually understand and implement. It might be 'more robust' in some sense, but it can easily result in misparsing the data as well. But if 'videos' without a time tag already exist similar to how it is on Twitter, this is totally fine. Hence why I'm asking for examples. :-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not an active Telegram user, so I don't think I'll be able to quickly come up with an example myself. @loganwilliams , do you remember running into a problem which required adding this line back when you implemented this? On the other hand, what data misparsing are you imagining from this, @JustAnotherArchivist ? Especially if Twitter already has examples which require this behavior, what's the error mode that we'd want to call out by throwing here? I'm hoping that merging this will get everyone off the fork, but am concerned that if we introduce new exceptions, it'll require more significant updates to existing workflows. Edit: As a compromise, I'm adding a warning log to this in my PR. It won't stop execution, but will let the user know in case there's something actually wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @john-osullivan. Thanks for your work pushing this forward. You can see an example of a GIF here: https://t.me/thisisatestchannel19451923/3 It sits in the same |
||
else: | ||
cls = Video | ||
durationStr = videoPlayer.find('time').text | ||
mKwargs['duration'] = _durationStrToSeconds(durationStr) | ||
media.append(cls(**mKwargs)) | ||
|
||
linkPreview = None | ||
if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')): | ||
kwargs = {} | ||
|
@@ -125,20 +219,45 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False): | |
else: | ||
_logger.warning(f'Could not process link preview image on {url}') | ||
linkPreview = LinkPreview(**kwargs) | ||
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview) | ||
if kwargs['href'] in outlinks: | ||
outlinks.remove(kwargs['href']) | ||
Comment on lines
+222
to
+223
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer leaving the link preview href in |
||
|
||
viewsSpan = post.find('span', class_ = 'tgme_widget_message_views') | ||
views = None if viewsSpan is None else _parse_num(viewsSpan.text) | ||
|
||
outlinks = outlinks if outlinks else None | ||
media = media if media else None | ||
mentions = mentions if mentions else None | ||
hashtags = hashtags if hashtags else None | ||
|
||
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views) | ||
|
||
def get_items(self): | ||
r, soup = self._initial_page() | ||
if '/s/' not in r.url: | ||
_logger.warning('No public post list for this user') | ||
return | ||
nextPageUrl = '' | ||
while True: | ||
yield from self._soup_to_items(soup, r.url) | ||
try: | ||
if soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)['href'].split('/')[-1] == '1': | ||
# if message 1 is the first message in the page, terminate scraping | ||
break | ||
except: | ||
pass | ||
Comment on lines
+243
to
+248
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bare |
||
pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True}) | ||
if not pageLink: | ||
break | ||
# some pages are missing a "tme_messages_more" tag, causing early termination | ||
if '=' not in nextPageUrl: | ||
nextPageUrl = soup.find('link', attrs = {'rel': 'canonical'}, href = True)['href'] | ||
nextPostIndex = int(nextPageUrl.split('=')[-1]) - 20 | ||
if nextPostIndex > 20: | ||
pageLink = {'href': nextPageUrl.split('=')[0] + f'={nextPostIndex}'} | ||
else: | ||
break | ||
Comment on lines
+251
to
+258
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't this approach lead to duplicates in some cases? When a post includes multiple media, those get their own ID, so there'd be a gap in post IDs. |
||
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href']) | ||
r = self._get(nextPageUrl, headers = self._headers) | ||
r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = _telegramResponseOkCallback) | ||
if r.status_code != 200: | ||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}') | ||
soup = bs4.BeautifulSoup(r.text, 'lxml') | ||
|
@@ -151,9 +270,18 @@ def _get_entity(self): | |
raise snscrape.base.ScraperException(f'Got status code {r.status_code}') | ||
soup = bs4.BeautifulSoup(r.text, 'lxml') | ||
membersDiv = soup.find('div', class_ = 'tgme_page_extra') | ||
if membersDiv.text.endswith(' members'): | ||
kwargs['members'] = int(membersDiv.text[:-8].replace(' ', '')) | ||
kwargs['photo'] = soup.find('img', class_ = 'tgme_page_photo_image').attrs['src'] | ||
if membersDiv is not None: | ||
if membersDiv.text.split(',')[0].endswith((' members', ' subscribers')): | ||
membersStr = ''.join(membersDiv.text.split(',')[0].split(' ')[:-1]) | ||
if membersStr == 'no': | ||
kwargs['members'] = 0 | ||
else: | ||
kwargs['members'] = int(membersStr) | ||
photoImg = soup.find('img', class_ = 'tgme_page_photo_image') | ||
if photoImg is not None: | ||
kwargs['photo'] = photoImg.attrs['src'] | ||
else: | ||
kwargs['photo'] = None | ||
|
||
r, soup = self._initial_page() | ||
if '/s/' not in r.url: # Redirect on channels without public posts | ||
|
@@ -174,17 +302,8 @@ def _get_entity(self): | |
if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')): | ||
kwargs['description'] = descriptionDiv.text | ||
|
||
def parse_num(s): | ||
s = s.replace(' ', '') | ||
if s.endswith('M'): | ||
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) | ||
elif s.endswith('K'): | ||
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) | ||
else: | ||
return int(s), 1 | ||
|
||
for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'): | ||
value, granularity = parse_num(div.find('span', class_ = 'counter_value').text) | ||
value, granularity = _parse_num(div.find('span', class_ = 'counter_value').text) | ||
type_ = div.find('span', class_ = 'counter_type').text | ||
if type_ == 'members': | ||
# Already extracted more accurately from /channel, skip | ||
|
@@ -201,3 +320,21 @@ def _cli_setup_parser(cls, subparser): | |
@classmethod | ||
def _cli_from_args(cls, args): | ||
return cls._cli_construct(args, args.channel) | ||
|
||
def _parse_num(s): | ||
s = s.replace(' ', '') | ||
if s.endswith('M'): | ||
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) | ||
elif s.endswith('K'): | ||
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) | ||
return int(s), 1 | ||
|
||
def _durationStrToSeconds(durationStr): | ||
durationList = durationStr.split(':') | ||
return sum([int(s) * int(g) for s, g in zip([1, 60, 3600], reversed(durationList))]) | ||
|
||
def _telegramResponseOkCallback(r): | ||
if r.status_code == 200: | ||
return (True, None) | ||
return (False, f'{r.status_code=}') | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any examples with more than one match (here or a few lines below)?