前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【宝藏入口】。
爬取https://news.nd.edu/的字段,包含标题、内容,作者,发布时间,链接地址,文章快照 (可能需要翻墙才能访问)
点击查看更多最新新闻>>点击查看档案
我们发现有两种方式查看所有新闻,一种是按照类别,一种是按照时间,经过进一步的观察我们发现按照时间查看新闻会更全,所以我们选择按照年份(按照月份和按照年份一样的效果)爬取
为了规范爬取的命名与逻辑,我们分别用模块、版面、文章三部分来进行爬取,具体如下
一个网站的全部新闻由数个模块组成,只要我们遍历爬取了所有模块就获得的该网站的所有新闻
class MitnewsScraper: def __init__(self, root_url, model_url, img_output_dir): self.root_url = root_url self.model_url = model_url self.img_output_dir = img_output_dir self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/122.0.0.0 Safari/537.36', 'Cookie': '替换成你自己的', } ... def run(): # 网站根路径 root_url = 'https://news.nd.edu/' # 文章图片保存路径 output_dir = 'D://imgs//nd-news' response = requests.get('https://news.nd.edu/news/archives/') soup = BeautifulSoup(response.text, 'html.parser') # 模块地址数组 model_urls = [] model_url_array = soup.find('ul', 'archives-by-year archives-list').find_all('li') for item in model_url_array: model_url = root_url + item.find('a').get('href') model_urls.append(model_url) for model_url in model_urls: # 初始化类 scraper = MitnewsScraper(root_url, model_url, output_dir) # 遍历版面 scraper.catalogue_all_pages() if __name__ == "__main__": run()
# 获取一个模块有多少版面 def catalogue_all_pages(self): response = requests.get(self.model_url, headers=self.headers) soup = BeautifulSoup(response.text, 'html.parser') try: model_name = self.model_url len_catalogues_page = len(soup.find('div', 'pagination').find_all('a')) list_catalogues_page = soup.find('div', 'pagination').find_all('a') num_pages = list_catalogues_page[len_catalogues_page - 2].get_text() print(self.model_url + ' 模块一共有' + num_pages + '页版面') for page in range(1, num_pages + 1): print(f"========start catalogues page {page}" + "/" + str(num_pages) + "========") self.parse_catalogues(page) print(f"========Finished catalogues page {page}" + "/" + str(num_pages) + "========") except Exception as e: print(f'Error: {e}') traceback.print_exc()
# 解析版面列表里的版面 def parse_catalogues(self, page): url = self.model_url + '/page/' + str(page) response = requests.get(url, headers=self.headers) if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') catalogue_list = soup.find('ol', 'no-bullets list-news') catalogues_list = catalogue_list.find_all('li') for index, catalogue in enumerate(catalogues_list): print(f"========start catalogue {index+1}" + "/" + "30========")
# 版面标题 catalogue_title = catalogue.find('div', 'card-body hover-more').find('h2').find('a').get_text(strip=True)
# 操作时间 date = datetime.now() # 更新时间 publish_time = catalogue.find('time').get('datetime') # 将日期字符串转换为datetime对象 updatetime = datetime.fromisoformat(publish_time)
# 版面url catalogue_href = catalogue.find('h2').find('a').get('href') catalogue_url = self.root_url + catalogue_href # 正则表达式 pattern = r'/news/(.+?)/$' # 使用 re.search() 来搜索匹配项 match = re.search(pattern, catalogue_url) # 版面id catalogue_id = match.group(1)
# 连接 MongoDB 数据库服务器 client = MongoClient('mongodb://localhost:27017/') # 创建或选择数据库 db = client['nd-news'] # 创建或选择集合 catalogues_collection = db['catalogues'] # 插入示例数据到 catalogues 集合 catalogue_data = { 'id': catalogue_id, 'date': date, 'title': catalogue_title, 'url': catalogue_url, 'cardSize': 1, 'updatetime': updatetime } # 在插入前检查是否存在相同id的文档 existing_document = catalogues_collection.find_one({'id': catalogue_id}) # 如果不存在相同id的文档,则插入新文档 if existing_document is None: catalogues_collection.insert_one(catalogue_data) print("[爬取版面]版面 " + catalogue_url + " 已成功插入!") else: print("[爬取版面]版面 " + catalogue_url + " 已存在!") print(f"========finsh catalogue {index+1}" + "/" + "10========") return True else: raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}")
由于一个版面对应一篇文章,所以版面url 、更新时间、标题和文章是一样的,并且按照设计版面id和文章id的区别只是差了个01,所以可以传递版面url、版面id、更新时间和标题四个参数到解析文章的函数里面
获取文章id,文章url,文章更新时间和当下操作时间
# 解析版面列表里的版面 def parse_catalogues(self, page): ... self.parse_cards_list(catalogue_url, catalogue_id, updatetime, catalogue_title) ... # 解析文章列表里的文章 def parse_cards_list(self, url, catalogue_id, cardupdatetime, cardtitle): card_response = requests.get(url, headers=self.headers) soup = BeautifulSoup(card_response.text, 'html.parser') # 对应的版面id card_id = catalogue_id # 文章标题 card_title = cardtitle # 文章更新时间 updateTime = cardupdatetime # 操作时间 date = datetime.now()
# 文章作者 author = soup.find('article', 'article span-md-2').find('p', 'author').find('span', property='name').get_text()
# 原始htmldom结构 html_dom = soup.find('article', 'article span-md-2') html_cut1 = html_dom.find_all('div', 'meta-share-group')[0].find('div', 'social-share') html_cut2 = html_dom.find_all('div', 'meta-share-group')[1] html_cut3 = html_dom.find('div', 'section-profile profile-1') # 移除元素 if html_cut1: html_cut1.extract() if html_cut2: html_cut2.extract() if html_cut3: html_cut3.extract() html_content = html_dom
# 解析文章列表里的文章 def parse_cards_list(self, url, catalogue_id, cardupdatetime, cardtitle): ... # 增加保留html样式的源文本 origin_html = html_dom.prettify() # String # 转义网页中的图片标签 str_html = self.transcoding_tags(origin_html) # 再包装成 temp_soup = BeautifulSoup(str_html, 'html.parser') # 反转译文件中的插图 str_html = self.translate_tags(temp_soup.text) # 绑定更新内容 content = self.clean_content(str_html) ... # 工具 转义标签 def transcoding_tags(self, htmlstr): re_img = re.compile(r'\s*<(img.*?)>\s*', re.M) s = re_img.sub(r'\n @@##\1##@@ \n', htmlstr) # IMG 转义 return s # 工具 转义标签 def translate_tags(self, htmlstr): re_img = re.compile(r'@@##(img.*?)##@@', re.M) s = re_img.sub(r'<>', htmlstr) # IMG 转义 return s # 清洗文章 def clean_content(self, content): if content is not None: content = re.sub(r'\r', r'\n', content) content = re.sub(r'\n{2,}', '', content) content = re.sub(r' {6,}', '', content) content = re.sub(r' {3,}\n', '', content) content = re.sub(r'', '', content) content = content.replace( ' ', '') content = content.replace( ''' card_id}的图片已保存到本地') except requests.exceptions.RequestException as e: print(f'请求图片时发生错误:{e}') except Exception as e: print(f'保存图片时发生错误:{e}') return downloaded_images # 如果文件夹存在则跳过 else: print(f'[爬取文章图片]文章id为{card_id}的图片文件夹已经存在') return []
# 连接 MongoDB 数据库服务器 client = MongoClient('mongodb://localhost:27017/') # 创建或选择数据库 db = client['nd-news'] # 创建或选择集合 cards_collection = db['cards'] # 插入示例数据到 cards 集合 card_data = { 'id': card_id, 'catalogueId': catalogue_id, 'type': 'nd-news', 'date': date, 'title': card_title, 'author': author, 'updatetime': updateTime, 'url': url, 'html_content': str(html_content), 'content': content, 'illustrations': illustrations, } # 在插入前检查是否存在相同id的文档 existing_document = cards_collection.find_one({'id': card_id}) # 如果不存在相同id的文档,则插入新文档 if existing_document is None: cards_collection.insert_one(card_data) print("[爬取文章]文章 " + url + " 已成功插入!") else: print("[爬取文章]文章 " + url + " 已存在!")
import os from datetime import datetime import requests from bs4 import BeautifulSoup from pymongo import MongoClient import re import traceback class MitnewsScraper: def __init__(self, root_url, model_url, img_output_dir): self.root_url = root_url self.model_url = model_url self.img_output_dir = img_output_dir self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/122.0.0.0 Safari/537.36', 'Cookie': '' } # 获取一个模块有多少版面 def catalogue_all_pages(self): response = requests.get(self.model_url, headers=self.headers) soup = BeautifulSoup(response.text, 'html.parser') try: model_name = self.model_url len_catalogues_page = len(soup.find('div', 'pagination').find_all('a')) list_catalogues_page = soup.find('div', 'pagination').find_all('a') num_pages = list_catalogues_page[len_catalogues_page - 2].get_text() print(self.model_url + ' 模块一共有' + num_pages + '页版面') for page in range(1, int(num_pages) + 1): print(f"========start catalogues page {page}" + "/" + str(num_pages) + "========") self.parse_catalogues(page) print(f"========Finished catalogues page {page}" + "/" + str(num_pages) + "========") except Exception as e: print(f'Error: {e}') traceback.print_exc() # 解析版面列表里的版面 def parse_catalogues(self, page): url = self.model_url + '/page/' + str(page) response = requests.get(url, headers=self.headers) if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') catalogue_list = soup.find('ol', 'no-bullets list-news') catalogues_list = catalogue_list.find_all('li') for index, catalogue in enumerate(catalogues_list): print(f"========start catalogue {index+1}" + "/" + "30========") # 版面标题 catalogue_title = catalogue.find('div', 'card-body hover-more').find('h2').find('a').get_text(strip=True) # 操作时间 date = datetime.now() # 更新时间 publish_time = catalogue.find('time').get('datetime') # 将日期字符串转换为datetime对象 updatetime = datetime.fromisoformat(publish_time) # 版面url catalogue_href = catalogue.find('h2').find('a').get('href') catalogue_url = self.root_url + catalogue_href # 正则表达式 pattern = r'/news/(.+?)/$' # 使用 re.search() 来搜索匹配项 match = re.search(pattern, catalogue_url) # 版面id catalogue_id = match.group(1) self.parse_cards_list(catalogue_url, catalogue_id, updatetime, catalogue_title) # 连接 MongoDB 数据库服务器 client = MongoClient('mongodb://localhost:27017/') # 创建或选择数据库 db = client['nd-news'] # 创建或选择集合 catalogues_collection = db['catalogues'] # 插入示例数据到 catalogues 集合 catalogue_data = { 'id': catalogue_id, 'date': date, 'title': catalogue_title, 'url': catalogue_url, 'cardSize': 1, 'updatetime': updatetime } # 在插入前检查是否存在相同id的文档 existing_document = catalogues_collection.find_one({'id': catalogue_id}) # 如果不存在相同id的文档,则插入新文档 if existing_document is None: catalogues_collection.insert_one(catalogue_data) print("[爬取版面]版面 " + catalogue_url + " 已成功插入!") else: print("[爬取版面]版面 " + catalogue_url + " 已存在!") print(f"========finsh catalogue {index+1}" + "/" + "10========") return True else: raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}") # 解析文章列表里的文章 def parse_cards_list(self, url, catalogue_id, cardupdatetime, cardtitle): card_response = requests.get(url, headers=self.headers) soup = BeautifulSoup(card_response.text, 'html.parser') # 对应的版面id card_id = catalogue_id # 文章标题 card_title = cardtitle # 文章更新时间 updateTime = cardupdatetime # 操作时间 date = datetime.now() # 文章作者 author = soup.find('article', 'article span-md-2').find('p', 'author').find('span', property='name').get_text() # 原始htmldom结构 html_dom = soup.find('article', 'article span-md-2') html_cut1 = html_dom.find_all('div', 'meta-share-group')[0].find('div', 'social-share') html_cut2 = html_dom.find_all('div', 'meta-share-group')[1] html_cut3 = html_dom.find('div', 'section-profile profile-1') # 移除元素 if html_cut1: html_cut1.extract() if html_cut2: html_cut2.extract() if html_cut3: html_cut3.extract() html_content = html_dom # 增加保留html样式的源文本 origin_html = html_dom.prettify() # String # 转义网页中的图片标签 str_html = self.transcoding_tags(origin_html) # 再包装成 temp_soup = BeautifulSoup(str_html, 'html.parser') # 反转译文件中的插图 str_html = self.translate_tags(temp_soup.text) # 绑定更新内容 content = self.clean_content(str_html) # 下载图片 imgs = [] img_array = soup.find('div', 'article-content entry-content').find_all('img') if len(img_array) is not None: for item in img_array: img_url = self.root_url + item.get('src') imgs.append(img_url) if len(imgs) != 0: # 下载图片 illustrations = self.download_images(imgs, card_id) # 连接 MongoDB 数据库服务器 client = MongoClient('mongodb://localhost:27017/') # 创建或选择数据库 db = client['nd-news'] # 创建或选择集合 cards_collection = db['cards'] # 插入示例数据到 cards 集合 card_data = { 'id': card_id, 'catalogueId': catalogue_id, 'type': 'nd-news', 'date': date, 'title': card_title, 'author': author, 'updatetime': updateTime, 'url': url, 'html_content': str(html_content), 'content': content, 'illustrations': illustrations, } # 在插入前检查是否存在相同id的文档 existing_document = cards_collection.find_one({'id': card_id}) # 如果不存在相同id的文档,则插入新文档 if existing_document is None: cards_collection.insert_one(card_data) print("[爬取文章]文章 " + url + " 已成功插入!") else: print("[爬取文章]文章 " + url + " 已存在!") # 下载图片 def download_images(self, img_urls, card_id): result = re.search(r'[^/]+$', card_id) last_word = result.group(0) # 根据card_id创建一个新的子目录 images_dir = os.path.join(self.img_output_dir, str(last_word)) if not os.path.exists(images_dir): os.makedirs(images_dir) downloaded_images = [] for index, img_url in enumerate(img_urls): try: response = requests.get(img_url, stream=True, headers=self.headers) if response.status_code == 200: # 从URL中提取图片文件名 img_name_with_extension = img_url.split('/')[-1] pattern = r'^[^?]*' match = re.search(pattern, img_name_with_extension) img_name = match.group(0) # 保存图片 with open(os.path.join(images_dir, img_name), 'wb') as f: f.write(response.content) downloaded_images.append([img_url, os.path.join(images_dir, img_name)]) print(f'[爬取文章图片]文章id为{card_id}的图片已保存到本地') except requests.exceptions.RequestException as e: print(f'请求图片时发生错误:{e}') except Exception as e: print(f'保存图片时发生错误:{e}') return downloaded_images # 如果文件夹存在则跳过 else: print(f'[爬取文章图片]文章id为{card_id}的图片文件夹已经存在') return [] # 工具 转义标签 def transcoding_tags(self, htmlstr): re_img = re.compile(r'\s*<(img.*?)>\s*', re.M) s = re_img.sub(r'\n @@##\1##@@ \n', htmlstr) # IMG 转义 return s # 工具 转义标签 def translate_tags(self, htmlstr): re_img = re.compile(r'@@##(img.*?)##@@', re.M) s = re_img.sub(r'<>', htmlstr) # IMG 转义 return s # 清洗文章 def clean_content(self, content): if content is not None: content = re.sub(r'\r', r'\n', content) content = re.sub(r'\n{2,}', '', content) content = re.sub(r' {6,}', '', content) content = re.sub(r' {3,}\n', '', content) content = re.sub(r'', '', content) content = content.replace( ' ', '') content = content.replace( '''