注意:学习前请自行掌握Python和MySQL基础知识。
import requests import pymysql import threading import concurrent.futures
import requests import pymysql import threading import concurrent.futures import time #导入time模块 class CompanyShipInDatabase: def __init__(self): self.conn = None self.cursor = None self.session = requests.Session() def create_database_connection(self): # 创建数据库连接 self.conn = pymysql.connect(host="localhost", port=3306, user="root", password="your_password", database="your_database", charset="utf8") self.cursor = self.conn.cursor() def close_database_connection(self): # 关闭数据库连接 if self.cursor: self.cursor.close() if self.conn: self.conn.close() def in_database(self, data_list): # 准备要插入或更新的数据 update_data = [] insert_data = [] for item in data_list: mmsi = item['mmsi'] select_sql = "SELECT mmsi FROM static_data WHERE mmsi = %s" self.cursor.execute(select_sql, (mmsi,)) existing_record = self.cursor.fetchone() if existing_record: # 如果记录已存在,执行更新操作 update_data.append((item['imo'], item['name'], item['callsign'], item['length'], item['width'], item['trail'], item['draught'], mmsi)) else: # 如果记录不存在,执行插入操作 insert_data.append((item['mmsi'], item['imo'], item['name'], item['callsign'], item['length'], item['width'], item['trail'], item['draught'])) if update_data: # 批量执行更新操作 update_sql = ''' UPDATE static_data SET imo = %s, name = %s, callsign = %s, length = %s, width = %s, trail = %s, draught = %s WHERE mmsi = %s ''' self.cursor.executemany(update_sql, update_data) if insert_data: # 批量执行插入操作 insert_sql = ''' INSERT INTO static_data(mmsi, imo, name, callsign, length, width, trail, draught) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ''' self.cursor.executemany(insert_sql, insert_data) # 提交数据库事务 self.conn.commit() def make_http_request(self, mmsi, headers): # 发送HTTP请求获取数据 url = f'https://www.shipxy.com/ship/GetShip?mmsi={mmsi}' try: response = self.session.get(url, headers=headers) if response.status_code == 200: result = response.json() return result else: return None except Exception as e: print(f"HTTP请求过程中发生错误: {str(e)}") return None def fetch_ship_data(self, mmsi_group, headers): # 获取船舶数据 data = [] for item in mmsi_group: mmsi = item[0] result = self.make_http_request(mmsi, headers) if result and 'data' in result and len(result['data']) > 0: data.append(result['data'][0]) print(f"已成功获取 mmsi: {mmsi}") else: print(f"mmsi: {mmsi} 未找到数据") return data #主程序 def company_ship_in_database(self): # 主程序入口 self.create_database_connection() mmsi_group = self.get_mmsi() data = [] Cookie = 'your_Cookie' # 请替换成你的Cookie headers = { 'User-Agent': 'your_User-Agent', # 请替换成你的User-Agent 'Cookie': Cookie, } #记录程序开始的时间 start_time = time.time() # 使用线程池并发获取数据 num_threads = 8 # 可根据需要调整线程数 chunk_size = len(mmsi_group) // num_threads with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [] for i in range(num_threads): start = i * chunk_size end = (i + 1) * chunk_size if i < num_threads - 1 else len(mmsi_group) future = executor.submit(self.fetch_ship_data, mmsi_group[start:end], headers) futures.append(future) for future in concurrent.futures.as_completed(futures): data.extend(future.result()) #记录数据爬取结束的时间 data_fetch_end_time = time.time() print(f"已完成查询,共获取 {len(data)} 条数据") print(data) # 批量插入或更新数据 self.in_database(data) #记录数据写入数据库结束的时间 data_write_end_time = time.time() self.close_database_connection() #计算程序运行时间并打印 elapsed_time = data_fetch_end_time - start_time print(f"数据获取时间:0.0569 秒") elapsed_time = data_write_end_time - data_fetch_end_time print(f"数据写入数据库时间:0.0569 秒") return data def get_mmsi(self): # 获取要查询的MMSI列表 mmsi_sql = "select distinct mmsi from your_mmsi order by mmsi" self.cursor.execute(mmsi_sql) mmsi_group = self.cursor.fetchall() return mmsi_group if __name__ == "__main__": company_ship = CompanyShipInDatabase() data = company_ship.company_ship_in_database()
代码的实际运行速度。
## 4828条船舶数据从爬取到写入数据库仅用了48s。
## 43405条船舶数据从爬取到写入数据库仅用348s。