Python+MySQL爬取船讯网AIS静态数据
作者:mmseoamin日期:2024-04-27

 注意:学习前请自行掌握Python和MySQL基础知识。

  • 以下代码为原创,完整且可执行。

    1、了解船讯网的信息

    • 打开船讯网界面 https://www.shipxy.com
    • 随机搜索一只船舶—设置—网络—Getship—预览。根据自己的需要,选择一会需要保存的数据项。

      Python+MySQL爬取船讯网AIS静态数据,第1张

      • 切换至“标头”,圈起来的都是要用的。

        Python+MySQL爬取船讯网AIS静态数据,第2张

        Python+MySQL爬取船讯网AIS静态数据,第3张

        2、准备工具

        • Pycharm (爬取数据用)+MySQL workbench(存储数据用)
        • 需要爬取的船舶mmsi数据(可以是csv,也可以是MySQL数据表)
        • Python+MySQL爬取船讯网AIS静态数据,第4张
          需要爬取的船舶mmsi表
        •  创建需要存储数据的表

          Python+MySQL爬取船讯网AIS静态数据,第5张

        •  需要的Python库
          import requests
          import pymysql
          import threading
          import concurrent.futures

          3、编写代码

          import requests
          import pymysql
          import threading
          import concurrent.futures
          import time #导入time模块
          class CompanyShipInDatabase:
              def __init__(self):
                  self.conn = None
                  self.cursor = None
                  self.session = requests.Session()
              def create_database_connection(self):
                  # 创建数据库连接
                  self.conn = pymysql.connect(host="localhost", port=3306, user="root", password="your_password", database="your_database", charset="utf8")
                  self.cursor = self.conn.cursor()
              def close_database_connection(self):
                  # 关闭数据库连接
                  if self.cursor:
                      self.cursor.close()
                  if self.conn:
                      self.conn.close()
              def in_database(self, data_list):
                  # 准备要插入或更新的数据
                  update_data = []
                  insert_data = []
                  for item in data_list:
                      mmsi = item['mmsi']
                      select_sql = "SELECT mmsi FROM static_data WHERE mmsi = %s"
                      self.cursor.execute(select_sql, (mmsi,))
                      existing_record = self.cursor.fetchone()
                      if existing_record:
                          # 如果记录已存在,执行更新操作
                          update_data.append((item['imo'], item['name'], item['callsign'], item['length'], item['width'], item['trail'], item['draught'], mmsi))
                      else:
                          # 如果记录不存在,执行插入操作
                          insert_data.append((item['mmsi'], item['imo'], item['name'], item['callsign'], item['length'], item['width'], item['trail'], item['draught']))
                  if update_data:
                      # 批量执行更新操作
                      update_sql = '''
                                  UPDATE static_data
                                  SET imo = %s, name = %s, callsign = %s, length = %s, width = %s, trail = %s, draught = %s
                                  WHERE mmsi = %s
                              '''
                      self.cursor.executemany(update_sql, update_data)
                  if insert_data:
                      # 批量执行插入操作
                      insert_sql = '''
                                  INSERT INTO static_data(mmsi, imo, name, callsign, length, width, trail, draught)
                                  VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                              '''
                      self.cursor.executemany(insert_sql, insert_data)
                  # 提交数据库事务
                  self.conn.commit()
              def make_http_request(self, mmsi, headers):
                  # 发送HTTP请求获取数据
                  url = f'https://www.shipxy.com/ship/GetShip?mmsi={mmsi}'
                  try:
                      response = self.session.get(url, headers=headers)
                      if response.status_code == 200:
                          result = response.json()
                          return result
                      else:
                          return None
                  except Exception as e:
                      print(f"HTTP请求过程中发生错误: {str(e)}")
                      return None
              def fetch_ship_data(self, mmsi_group, headers):
                  # 获取船舶数据
                  data = []
                  for item in mmsi_group:
                      mmsi = item[0]
                      result = self.make_http_request(mmsi, headers)
                      if result and 'data' in result and len(result['data']) > 0:
                          data.append(result['data'][0])
                          print(f"已成功获取 mmsi: {mmsi}")
                      else:
                          print(f"mmsi: {mmsi} 未找到数据")
                  return data
          #主程序
              def company_ship_in_database(self):
                  # 主程序入口
                  self.create_database_connection()
                  mmsi_group = self.get_mmsi()
                  data = []
                  Cookie = 'your_Cookie'  # 请替换成你的Cookie
                  headers = {
                      'User-Agent': 'your_User-Agent',  # 请替换成你的User-Agent
                      'Cookie': Cookie,
                  }
                  #记录程序开始的时间
                  start_time = time.time()
                  # 使用线程池并发获取数据
                  num_threads = 8  # 可根据需要调整线程数
                  chunk_size = len(mmsi_group) // num_threads
                  with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
                      futures = []
                      for i in range(num_threads):
                          start = i * chunk_size
                          end = (i + 1) * chunk_size if i < num_threads - 1 else len(mmsi_group)
                          future = executor.submit(self.fetch_ship_data, mmsi_group[start:end], headers)
                          futures.append(future)
                      for future in concurrent.futures.as_completed(futures):
                          data.extend(future.result())
                  #记录数据爬取结束的时间
                  data_fetch_end_time = time.time()
                  print(f"已完成查询,共获取 {len(data)} 条数据")
                  print(data)
                  # 批量插入或更新数据
                  self.in_database(data)
                  #记录数据写入数据库结束的时间
                  data_write_end_time = time.time()
                  self.close_database_connection()
                  #计算程序运行时间并打印
                  elapsed_time = data_fetch_end_time - start_time
                  print(f"数据获取时间:0.0569 秒")
                  elapsed_time = data_write_end_time - data_fetch_end_time
                  print(f"数据写入数据库时间:0.0569 秒")
                  return data
              def get_mmsi(self):
                  # 获取要查询的MMSI列表
                  mmsi_sql = "select distinct mmsi from your_mmsi order by mmsi"
                  self.cursor.execute(mmsi_sql)
                  mmsi_group = self.cursor.fetchall()
                  return mmsi_group
          if __name__ == "__main__":
              company_ship = CompanyShipInDatabase()
              data = company_ship.company_ship_in_database()
          • 以上代码为完整代码,替换成对应的参数即可运行。
          • 代码设计时优先考虑'运行速率',采用多线程池和减少循环的手段。因为爬取数据涉及的数据量庞大,再加上本人的电脑垃圾,所以以最低的时间爬取同等的信息非常重要。
          • 代码的实际运行速度。

                            ## 4828条船舶数据从爬取到写入数据库仅用了48s。

            Python+MySQL爬取船讯网AIS静态数据,第6张

                            ## 43405条船舶数据从爬取到写入数据库仅用348s。

            Python+MySQL爬取船讯网AIS静态数据,第7张

            •  结果(我只取了9个数据,大家根据自己的需求筛选就行)Python+MySQL爬取船讯网AIS静态数据,第8张