-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathemail-scrapper-linux.py
165 lines (131 loc) · 5.68 KB
/
email-scrapper-linux.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import pandas as pd
import re
import asyncio
from playwright.async_api import async_playwright
import tkinter as tk
from tkinter import filedialog
import traceback
import os
from urllib.parse import urljoin
async def scrape_emails_from_page(page):
try:
content = await page.content()
from lxml import html
tree = html.fromstring(content)
page_text = tree.text_content()
email_regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = set(re.findall(email_regex, page_text))
valid_emails = set()
for email in emails:
if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
valid_emails.add(email)
return valid_emails
except Exception as e:
return set()
async def scrape_emails_from_contact_page(browser, href, base_url):
contact_emails = set()
try:
# Create a new browser context and page for the contact link
context = await browser.new_context()
page = await context.new_page()
# Convert href to absolute URL if it's relative
if not href.startswith(('http://', 'https://')):
href = urljoin(base_url, href)
try:
await page.goto(href, wait_until='networkidle')
emails = await scrape_emails_from_page(page)
if emails:
contact_emails.update(emails)
except Exception as e:
print(f"Error navigating to contact link {href}: {e}")
with open('error_log.txt', 'a') as log_file:
log_file.write(f"Error navigating to contact link {href}: {e}\n")
finally:
await page.close()
await context.close()
except Exception as e:
print(f"Error processing contact link: {e}")
with open('error_log.txt', 'a') as log_file:
log_file.write(f"Error processing contact link: {e}\n")
return contact_emails
async def process_website(url, browser):
try:
# Create a new browser context and page for the main website
context = await browser.new_context()
page = await context.new_page()
await page.goto(url, wait_until='networkidle')
main_page_emails = await scrape_emails_from_page(page)
contact_page_emails = set()
contact_links = await page.query_selector_all('a')
for link in contact_links:
try:
link_text = await link.text_content()
href = await link.get_attribute('href')
if href and 'contact' in link_text.lower():
contact_emails = await scrape_emails_from_contact_page(browser, href, url)
if contact_emails:
contact_page_emails.update(contact_emails)
except Exception as e:
print(f"Error processing contact link: {e}")
with open('error_log.txt', 'a') as log_file:
log_file.write(f"Error processing contact link: {e}\n")
all_emails = main_page_emails.union(contact_page_emails)
if all_emails:
return ', '.join(all_emails)
else:
return '' # Return empty string if no emails found
except Exception as e:
print(f"Error processing website {url}: {e}")
with open('error_log.txt', 'a') as log_file:
log_file.write(f"Error processing website {url}: {e}\n")
return '' # Return empty string in case of error
finally:
await page.close()
await context.close()
async def main():
root = tk.Tk()
root.withdraw() # Hide the root window
input_file = filedialog.askopenfilename(
title="Select CSV File",
filetypes=[("CSV files", "*.csv")]
)
if not input_file:
print("No file selected. Exiting.")
return
print(f"Selected file: {input_file}")
try:
df = pd.read_csv(input_file)
print("CSV file read successfully.")
if 'Website' not in df.columns:
raise ValueError("The 'Website' column is missing from the CSV file.")
df = df.dropna(subset=['Website'])
df['Website'] = df['Website'].apply(lambda url: 'https://' + url if not url.startswith('https://') else url)
print(f"Found {len(df)} valid website links. Starting scraping...")
async with async_playwright() as p:
browser = await p.chromium.launch(
executable_path='/usr/bin/google-chrome', # Update this path if needed
headless=False # Show browser window
)
results = []
for url in df['Website']:
result = await process_website(url, browser)
results.append(result)
await browser.close()
df['Email'] = results
columns = list(df.columns)
website_index = columns.index('Website')
columns.insert(website_index + 1, columns.pop(columns.index('Email')))
df = df[columns]
base_name, ext = os.path.splitext(input_file)
output_file = f"{base_name}_email_scrapped{ext}"
df.to_csv(output_file, index=False)
print(f"Scraping complete. Results saved to {output_file}")
except Exception as e:
error_message = f"An error occurred: {str(e)}\n{traceback.format_exc()}"
print(error_message)
with open('error_log.txt', 'a') as log_file:
log_file.write(error_message + '\n')
print("Press Enter to exit...")
input()
# Run the main function using asyncio
asyncio.run(main())