本项目已弃坑。
先放成品 bgm-ip-viewer
在 bgm 上看到有人说现在的关联图只有一层, 看起来不太方便, 就爬了全站数据做了这么个东西.
爬取数据并生成关联条目网络
爬取数据
爬数据用的是scrapy
, 因为本站有请求速度的限制, 所以数据源是镜像站.
scrapy
的流程是这样的, 首先继承scrapy.Item
来定义你自己爬到的数据的模型.
比如我定义了条目 item
import scrapyfrom scrapy import Fieldclass SubjectItem (scrapy.Item): id = Field() _id = Field() name = Field() image = Field() subject_type = Field() name_cn = Field() tags = Field() info = Field() score = Field() score_details = Field() wishes = Field() done = Field() doings = Field() on_hold = Field() dropped = Field()
每个Field()
中保存的的数据类型可以是 str
, int
, bool
等基础的数据类型, 也可以是list
, dict
这种组合类型.
然后写一个解析函数
from typing import List import pymongofrom scrapy import Requestfrom bgm.items import SubjectItem, RelationItemdef url_from_id (_id ): return 'https://mirror.bgm.rin.cat/subject/{}' .format (_id ) class BgmTvSpider (scrapy.Spider): name = 'bgm_tv' allowed_domains = ['mirror.bgm.rin.cat' ] start_urls = ['https://mirror.bgm.rin.cat/subject/{}' .format (i) for i in range (1 , 270000 )] def parse (self, response ): if '出错了' not in response.text: subject_item = SubjectItem() subject_item['subject_type' ] = get_subject_type(response) if subject_item['subject_type' ] == 'Music' : return subject_item['_id' ] = int (response.url.split('/' )[-1 ]) subject_item['id' ] = subject_item['_id' ] subject_item['info' ] = get_info(response) subject_item['tags' ] = get_teg_from_response(response) subject_item['image' ] = get_image(response) subject_item['score' ] = get_score(response) subject_item['score_details' ] = get_score_details(response) title = response.xpath('//*[@id="headerSubject"]/h1/a' )[0 ] subject_item['name_cn' ] = title.attrib['title' ] subject_item['name' ] = title.xpath('text()' ).extract_first() for edge in get_relation(response, source=subject_item['_id' ]): relation_item = RelationItem(**edge, ) yield relation_item yield Request(url_from_id(relation_item['target' ])) yield subject_item
框架有一个默认的start_requests
函数, 会请求starts_urls
里面的链接. 在获取到内容之后会进行一系列处理(比如解析 http, xpath 之类的), 然后交给parse
函数来处理.
parse
函数实际上是一个生成器函数, 通过不断yield
内容来告诉scrapy
你要做什么.
如果我们需要爬一个新网页, 就yield
一个scrapy.Request
, scrapy
在爬完对应的页面只有会交给对应的回调函数(默认为parse
)
如果你爬到了一个Item
, 就直接把对应的实例给 yield 出去. 然后scrapy
会交给pipeline
来处理.
在这个例子里, 我需要用到的有两种 item, 一种是SubjectItem
, 包含条目的某些信息(比如标题, 封面, 对应的subject_id
, 另一个是每个条目跟其他条目的关系RelationItem
. 这个关系包括源条目, 目标条目, 和条目关系.)
然后要把对应的 item 存到数据库里, 就需要我们定义一个pipeline
了.
最想吐槽的就是这里...
原来我已经用aiohttp
写了一些东西, 自认为还比较熟悉异步了, 结果没想到 python 的异步库真是各自为战. scrapy
是基于twisted
的, 所以基于asyncio
的异步库是不能在这里用的. 我数据库用的是MongoDB
, 原本mongodb
官方有一个数据库motor
, 支持tornado
和asyncio
的 ioloop, 但是不支持twisted
...
所以就算你了解 python 异步标准库的写法, 用另一个异步框架的时候还是可能一脸懵逼...
额外去找了一个twisted
支持的 mongo 库txmongo
, 用来存数据.
数据处理
爬回来的数据不处理别说别人了, 我自己都看不懂.
回到我们一开始的目的, 把有关系的条目放在一起, 显示他们之间的关系.
最后显示出来的是一个力导向图啊, 那直接用 d3.js 好了, 找一个 d3.js 的 demo 来看看他需要的数据结构是什么样的.
参考了这篇文章
【 D3.js 进阶系列 — 2.0 】 力学图 + 人物关系图
最后需要的是一个这样的数据结构
{ "nodes" : [ { "name" : "云天河" , "image" : "tianhe.png" } , { "name" : "韩菱纱" , "image" : "lingsha.png" } , { "name" : "柳梦璃" , "image" : "mengli.png" } , { "name" : "慕容紫英" , "image" : "ziying.png" } ] , "edges" : [ { "source" : 0 , "target" : 1 , "relation" : "挚友" } , { "source" : 0 , "target" : 2 , "relation" : "挚友" } , { "source" : 0 , "target" : 3 , "relation" : "挚友" } ] }
这正好就是我爬下来的数据的结构啊.
那就只剩下一个问题了, 怎么把同一张网节点和关系组合在一起.
本来想要直接一个 for 循环遍历, 发现这样有一个问题, 一个大的网络可能会断成好几个小的网络. 正好受到了scrapy
爬取网站的办法, 在写一个 work 函数, 不停地把下一个需要处理的节点给yield
出来, 然后从yield
出来的节点不断的开始处理.
def worker (start_job=None ): yield_job = [] done_id = set () if start_job is None : start_job = [ x['_id' ] for x in n_subject.find({}, {'_id' : 1 }) ] while True : print ('\r' , len (yield_job) + len (start_job), end='' ) if yield_job: j = yield_job.pop() if j in done_id: continue for node in deal_with_node({'_id' : j}): yield_job.append(node) done_id.add(j) elif start_job: j = start_job.pop() if j in done_id: continue for node in deal_with_node({'_id' : j}): yield_job.append(node) done_id.add(j) else : print ('done' ) break
其中, 用来处理节点的函数deal_with_noed(node)
首先获取节点跟其他节点的关系, 如果某个关系(比如角色歌, op, ed 之类的)不符合需要添加的条件就跳过, 如果符合条件, 先看看有没有一个现成的网络, 有就把对应节点加入到网络里, 没有就新建一个网络, 把对应节点加进去. 然后把对应节点yield
出去.
可以看到, 我在外面for node in deal_with_node({'_id': j}):yield_job.append(node)
, 所以所有yield
出来的节点都会成为下一个处理的节点. 这样一来会保证处理完一个网络只有的所有节点才会进行下一步, 处理下一个网络的节点.
这样一来, 就可以保证一个整体网络不会裂成两个网络.
更新
迫于用来跑这个程序的服务器只有 1C1G, 跑了一堆服务器的情况下再跑一个 Mongo 太勉强了, 再加上腾讯的学生优惠还有 36 每年的 mysql(1G 内存, 50G 硬盘), 就买了一年的 mysql 实例用来存数据, 减轻服务器的内存压力.
然后就牵扯 twisted 异步操作数据库(其实同步也不是不行, 但是如果同步存储的话数据库就会成为爬虫的瓶颈, 所以还是异步比较合适.)
twisted 提供了一个adbapi
, 一看就知道是异步数据库 api 的意思. 操作 mysql 的话, 需要mysqlclient
或者pymysql
+pymysql.install_as_MySQLdb()
跟把数据保存到 mongo 一样, 添加一个mysqlpipeline
的 pipeline.
为了避免手写 sql, 用了peewee
做为 orm. peewee 的每个 query 都有一个sql()
方法, 可以不让 peewee 执行具体的操作, 而是获取对应的 sql 语句, 然后交给 twisted 的adbpi
来执行, 避免阻塞.
Relation
和 Subject
是通过 peewee 定义的数据库 model.
import pymongofrom bgm.items import SubjectItem, RelationItemfrom typing import Union from bgm.models import Subject, Relationfrom twisted.enterprise import adbapiimport bgm.settingsclass MysqlPipeline (object ): def open_spider (self, spider ): self .dbpool = adbapi.ConnectionPool( "MySQLdb" , host=bgm.settings.MYSQL_HOST, db=bgm.settings.MYSQL_DBNAME, user=bgm.settings.MYSQL_USER, password=bgm.settings.MYSQL_PASSWORD, charset='utf8mb4' , ) def process_item (self, item: Union [SubjectItem, RelationItem], spider ): query = self .dbpool.runInteraction(self .do_insert, item) query.addErrback(self .handle_error, item, spider) return item def handle_error (self, failure, item, spider ): print (failure) def do_insert (self, cursor, item ): if isinstance (item, SubjectItem): if not item['name' ]: item['name' ] = item['name_cn' ] insert_sql = Subject.insert( **item ).on_conflict_replace().sql() elif isinstance (item, RelationItem): insert_sql = Relation.insert( id =f'{item["source" ]} -{item["target" ]} ' , **item ).on_conflict_replace().sql() else : return cursor.execute(*insert_sql)
而在处理上, 也跟之前 mongodb 有点不一样...
之前使用 mongo 的时候, 都是直接从 mongo 里面把数据读出来, 处理完了再写会数据库.但是在使用 mysql 的时候就不太行的通了. 因为在我的使用场景下,相比 mongo, mysql 太慢了.如果把对应的关系和条目从 mysql 里读出来再处理, 有 90%的时间都花在 io 上, 而且不是一两个小时能处理完的. 所以只能选择一开始把 mysql 里的所有数据读到内存, 放在 mysql 里面处理.
而大概 20w 条条目, 20w 条条目间关系, 全都读到数据库里大概需要 1400MB 左右的内存, 我的服务器已经处理不了了, 只能先把数据库从服务器上下载下来, 然后再进行处理.
之前发在 bgm 上之后有人提议说增量更新新的条目, 正好现在已经把主要的数据爬完了, 还剩下一些旧的条目, 只要写一个额外的爬虫定期处理bangumi wiki 计划 页面显示的更改就好了.