卓越飞翔博客卓越飞翔博客

卓越飞翔 - 您值得收藏的技术分享站
技术文章1829本站已运行4109

业余选手写的网站邮箱提取,请各位大佬指教

import pandas as pd
import requests
from bs4 import BeautifulSoup
import re
import logging
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.exceptions import RequestException
 
# 设置日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
column_name = 'Website'
email_column = 'Email'
visited_links = set()  # 保存已访问的链接,避免重复访问
 
 
def extract_email_from_text(text):
    """从文本中提取邮箱地址,并处理常见的反爬虫技巧"""
    email_pattern = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}')
    text = text.replace('[at]', '@').replace('[dot]', '.').replace('(at)', '@').replace('(dot)', '.')
    return email_pattern.findall(text)
 
 
def get_all_links(soup, base_url):
    """获取页面中的所有链接,转换为绝对路径"""
    links = set()
    for link in soup.find_all('a', href=True):
        href = link['href']
        full_url = urljoin(base_url, href)
        if base_url in full_url and full_url not in visited_links:
            links.add(full_url)
    return links
 
 
def find_impressum_link(soup, base_url):
    """查找包含 'Impressum' 或 'Kontakt' 的链接"""
    links = soup.find_all('a', href=True)
    for link in links:
        href = link['href']
        if any(keyword in href.lower() for keyword in ['impressum', 'kontakt', 'contact', 'about']):
            return urljoin(base_url, href)
    return None
 
 
def find_email_near_keywords(soup):
    """通过关键字在页面文本中查找邮箱"""
    keywords = ['email', 'e-mail', 'contact']
    text = soup.get_text().lower()
    for keyword in keywords:
        if keyword in text:
            emails = extract_email_from_text(text)
            if emails:
                return emails
    return None
 
 
def get_email_from_url(url, depth=0):
    """从指定 URL 及其链接中提取邮箱地址"""
    if url in visited_links or depth > 2:  # 限制递归深度
        return None
    visited_links.add(url)
 
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
    }
 
    try:
        response = requests.get(url, headers=headers, timeout=10, verify=False)
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, 'lxml')
 
            # 1. 先尝试从页面文本中直接提取邮箱
            emails = extract_email_from_text(soup.get_text())
            if emails:
                return emails
 
            # 2. 尝试查找可能的 Impressum 或 Contact 链接
            impressum_link = find_impressum_link(soup, url)
            if impressum_link:
                logging.info(f"找到 Impressum 链接: {impressum_link}")
                email = get_email_from_url(impressum_link, depth + 1)  # 递归访问 Impressum 链接
                if email:
                    return email
 
            # 3. 在页面中关键字附近查找邮箱
            emails = find_email_near_keywords(soup)
            if emails:
                return emails
 
            # 4. 遍历页面中的所有链接,递归访问这些链接
            links = get_all_links(soup, url)
            for link in links:
                email = get_email_from_url(link, depth + 1)
                if email:
                    return email
    except RequestException as e:
        logging.error(f"请求 URL 时发生错误: {e}")
 
    return None
 
 
def process_url(url):
    """处理单个 URL,提取邮箱地址"""
    if pd.notna(url) and url.strip() != '':
        url = 'https://' + url.strip()
        logging.info(f"访问: {url}")
        emails = get_email_from_url(url)
        if emails:
            email = emails[0]  # 如果找到多个邮箱,可以调整为选择最合适的
            logging.info(f"找到邮箱: {email}")
            return url, email
        else:
            logging.info("没有找到邮箱")
            return url, ''
    else:
        logging.info(f"跳过空 URL 或无效 URL: {url}")
        return url, ''
 
 
# 使用上下文管理器来处理 Excel 文件
input_file = 'out3.xlsx'
output_file = 'out3_updated.xlsx'
 
try:
    # 读取 Excel 文件
    df = pd.read_excel(input_file, engine='openpyxl')
 
    # 添加邮箱列
    if email_column not in df.columns:
        df[email_column] = ''
 
    # 使用线程池处理 URL
    with ThreadPoolExecutor(max_workers=1000) as executor:
        future_to_url = {executor.submit(process_url, row[column_name]): index for index, row in df.iterrows()}
 
        for future in as_completed(future_to_url):
            index = future_to_url[future]
            try:
                url, email = future.result()
                df.at[index, email_column] = email
            except Exception as e:
                logging.error(f"处理 URL 时发生错误: {e}")
 
    # 使用上下文管理器来写入 Excel 文件
    with pd.ExcelWriter(output_file, engine='xlsxwriter') as excel_writer:
        df.to_excel(excel_writer, index=False)
        logging.info(f"处理完成,结果已保存到 {output_file}")
 
except Exception as e:
    logging.error(f"处理 Excel 文件时发生错误: {e}")
 
卓越飞翔博客
上一篇: 在PPT上画表盘
下一篇: python获取b站视频选集目录名

相关推荐

留言与评论(共有 0 条评论)
   
验证码:
隐藏边栏