-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcrawl.py
175 lines (144 loc) · 6.64 KB
/
crawl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python3
# -*- coding: utf_8 -*-
import os
import pip
# pip.main(["install", "-r", "requirements.txt"])
import sqlite3
from dotenv import load_dotenv
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.edge.options import Options
from selenium.webdriver.edge.service import Service
from selenium.webdriver.support.ui import Select
from time import sleep
from webdriver_manager.microsoft import EdgeChromiumDriverManager
# Load global variable from .env file
load_dotenv()
# Declare browser use
edge_options = Options()
edge_options.use_chromium = True
edge_options.add_argument("headless")
edge_options.add_argument("disable-gpu")
browser = webdriver.Edge(service = Service(EdgeChromiumDriverManager().install()), options=edge_options)
browser.maximize_window()
# Connect to database
connect = sqlite3.connect(os.getenv("DB_FILE"), check_same_thread = False)
cur = connect.cursor()
# Declear action chains
actions = ActionChains(browser)
# Function to pause script for a while (in second)
def pause():
sleep(int(os.getenv("TIME_SLEEP")))
if __name__ == "__main__":
# Create table if not exist
cur.execute('''CREATE TABLE IF NOT EXISTS posts
(id INTEGER PRIMARY KEY, title TEXT, link TEXT UNIQUE, image TEXT,
tag TEXT, preview TEXT, author TEXT, timestamp TEXT)''')
# Open site in browser
browser.get(os.getenv("WEBSITE"))
pause()
if os.getenv("WEBSITE") == "https://vietnamnet.vn/tim-kiem":
# Select time range to crawl
# Value | Type
# all All
# 1 Day
# 2 Week
# 3 Month
# 4 Year
time_select = Select(browser.find_element(By.CLASS_NAME, "filter-select-big"))
time_select.select_by_value("2")
pause()
while True:
posts = browser.find_elements(By.CLASS_NAME, "feature-box")
for post in posts:
# Get post data
img_tag = post.find_element(By.CLASS_NAME, "lazy")
post_tag = post.find_element(By.TAG_NAME, "a")
div_tag = post.find_element(By.CLASS_NAME, "feature-box__content--desc")
a_tag = post.find_element(By.CLASS_NAME, "feature-box__content--brand").find_element(By.TAG_NAME, "a")
title = post_tag.get_attribute("title")
link = post_tag.get_attribute("href")
image = img_tag.get_attribute("data-src")
tag = a_tag.text
preview = div_tag.text
# Open post in new tab
browser.execute_script(f"window.open('{link}');")
browser.switch_to.window(browser.window_handles[-1])
pause()
# Get post timestamp and author
try:
timestamp = browser.find_element(By.CLASS_NAME, "breadcrumb-box__time").find_element(By.TAG_NAME, "span").text
except:
try:
timestamp = browser.find_element(By.CLASS_NAME, "bread-crumb__detail-time").find_element(By.TAG_NAME, "p").text
except:
timestamp = "Unknown"
try:
author = browser.find_element(By.CLASS_NAME, "newsFeature__author-info").find_element(By.TAG_NAME, "a").text
except:
author = "Anonymous"
# Close post tab
browser.close()
browser.switch_to.window(browser.window_handles[0])
cur.execute(fr'''INSERT OR IGNORE INTO `posts` (title, link, image, tag, preview, author, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?)''', (title, link, image, tag, preview, author, timestamp))
connect.commit()
# Go to next page if exist
next_page = browser.find_elements(By.CLASS_NAME, "panination__content-item")[-1]
if "active" in next_page.get_attribute("class").split():
break
else:
next_page.click()
pause()
elif os.getenv("WEBSITE") == "https://zingnews.vn/a-tim-kiem.html":
# Get scroll height
last_height = browser.execute_script("return document.body.scrollHeight")
for i in range(5):
# Scroll down to bottom
browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# Wait to load page
pause()
# Calculate new scroll height and compare with last scroll height
new_height = browser.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
posts = browser.find_elements(By.TAG_NAME, "article")
for post in posts:
img_tag = post.find_element(By.CLASS_NAME, "article-thumbnail").find_element(By.TAG_NAME, "a")
post_tag = post.find_element(By.TAG_NAME, "header")
actions.move_to_element(img_tag).perform()
title = post_tag.find_element(By.CLASS_NAME, "article-title").find_element(By.TAG_NAME, "a").text
link = img_tag.get_attribute("href")
image = img_tag.find_element(By.TAG_NAME, "img").get_attribute("src")
preview = post_tag.find_element(By.CLASS_NAME, "article-summary").text
# Open post in new tab
browser.execute_script(f"window.open('{link}');")
browser.switch_to.window(browser.window_handles[-1])
pause()
# Get post info
link = browser.current_url
try:
tag = browser.find_element(By.CLASS_NAME, "parent_cate").text.upper()
except:
tag = ""
try:
author = browser.find_element(By.CLASS_NAME, "the-article-author").find_element(By.TAG_NAME, "a").text
except:
author = "Anonymous"
try:
timestamp = browser.find_element(By.CLASS_NAME, "the-article-publish").text
if "," in timestamp:
timestamp = timestamp[timestamp.find(",") + 2:]
except:
timestamp = "Unknown"
# Close post tab
browser.close()
browser.switch_to.window(browser.window_handles[0])
cur.execute(fr'''INSERT OR IGNORE INTO `posts` (title, link, image, tag, preview, author, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?)''', (title, link, image, tag, preview, author, timestamp))
connect.commit()
pause()
browser.close()
connect.close()