简单数据爬取(以B站弹幕为例) 您所在的位置:网站首页 弹幕总数是否属于人气数据 简单数据爬取(以B站弹幕为例)

简单数据爬取(以B站弹幕为例)

2024-06-27 02:53| 来源: 网络整理| 查看: 265

简单数据爬取(以B站弹幕为例)

4 minute read

Published: April 27, 2021

即不需要额外抓取ajax数据包,都在网页检查栏中能找到。

分析网页

点击弹幕列表,查看历史弹幕,并选择任意一天的历史弹幕,此时就能找到存储该日期弹幕的ajax数据包,所有弹幕数据放在一个i标签里。有的网上博客是说能找到弹幕数据,但我2021年初重新试了一下,弹幕内容已经加密了,不过按照下面我用的请求地址,还是能抓取到弹幕的内容,只不过在网页检查中找不到这个请求地址。

所以暂时用这个请求地址,后续如果发现如何解密再来更新。

弹幕内容加密后的请求信息

加密之后的弹幕列表

参考这篇博客给出的请求地址,使用https://api.bilibili.com/x/v1/dm/list.so?oid=%d请求网址,可以得到不加密的弹幕内容:

未加密弹幕内容

可以发现Request URL关键就是 oid 参数,是一个类似于视频标识ID之类的东西,换个oid可以访问其他视频弹幕页面。B站的视频可以看到的ID是用bvid作为标识的,因此需要先对这个进行转换:

def get_cid(bvid): ''' 通过视频的bvid获得视频的cid/oid 输入:视频的bvid 输出:视频的cid/oid ''' url = 'https://api.bilibili.com/x/player/pagelist?bvid=%s&jsonp=jsonp'%bvid res = requests.get(url) data = res.json() return data['data'][0]['cid'] 实际操作

以下分别给出需要爬取内容、解析内容使用的一些函数,使用的包有requests,re等。

import re import time import requests import datetime import pandas as pd def del_repeat(data,key): ''' 字典列表去重 输入:字典列表,去重关键字 输出:去重后的列表 ''' new_data = [] # 用于存储去重后的list values = [] # 用于存储当前已有的值 for d in data: if d[key] not in values: new_data.append(d) values.append(d[key]) return new_data def get_info(bvid): ''' 获取视频信息 输入:视频的bvid 输出:视频的相关信息(视频标题、播放量、弹幕数、上传时间) ''' url = 'https://api.bilibili.com/x/web-interface/view/detail?bvid=%s'%bvid res = requests.get(url) data = res.json() title = data['data']['View']['title'] # 视频标题 view = data['data']['View']['stat']['view'] # 播放量 dm = data['data']['View']['stat']['danmaku'] # 弹幕数 upload = time.strftime('%Y-%m-%d',time.localtime(data['data']['View']['pubdate'])) # 上传日期 info = (title,view,dm,upload) return info test_info = get_info('BV1wy4y1p7Ms') test_info # ('奔驰和宝马的质量到底怎么样?真实买车拆解告诉你真相!', 394097, 6367, '2021-01-21') def get_cid(bvid): ''' 通过视频的bvid获得视频的cid/oid 输入:视频的bvid 输出:视频的cid/oid ''' url = 'https://api.bilibili.com/x/player/pagelist?bvid=%s&jsonp=jsonp'%bvid res = requests.get(url) data = res.json() return data['data'][0]['cid']

通过正则表达式处理,解析HTML网页的内容:

def parse_dm(text): ''' 解析视频弹幕 输入:视频弹幕的原数据 输出:弹幕的解析结果 ''' result = [] # 用于存储解析结果 data = re.findall('(.*?)',text) for d in data: item = {} # 每条弹幕数据 dm = d[0].split(',') # 弹幕的相关详细,如出现时间,用户等 item['出现时间'] = float(dm[0]) item['模式'] = int(dm[1]) item['字号'] = int(dm[2]) item['颜色'] = int(dm[3]) item['评论时间'] = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(dm[4]))) item['弹幕池'] = int(dm[5]) item['用户ID'] = dm[6] # 并非真实用户ID,而是CRC32检验后的十六进制结果 item['rowID'] = dm[7] # 弹幕在数据库中的ID,用于“历史弹幕”功能 item['弹幕内容'] = d[1] result.append(item) return result

下面分两种爬取的情境。第一种,只爬取当前显示的弹幕内容,不指定全部日期。此方法获取的弹幕数量有限制,限制1000条。

def get_dm(bvid): ''' 获取视频弹幕(此方法获取的弹幕数量有限制,限制1000条) 输入:视频的bvid 输出:视频的弹幕 ''' oid = get_cid(bvid) # 这里的cid和oid是一样的 url = 'https://api.bilibili.com/x/v1/dm/list.so?oid=%d'%oid res = requests.get(url) res.encoding = 'utf-8' text = res.text dms = parse_dm(text) # 解析弹幕 return dms

第二种,爬取当前日期开始计算,所有的历史弹幕内容。

# 用上面那个函数,因为git_history暂时没有用 def get_dms(bvid): ''' 获取视频弹幕(此方法获取的弹幕数量更多) 输入:视频的bvid 输出:视频的弹幕 ''' print('视频解析中...') info = get_info(bvid) print('视频解析完成!') print('【视频标题】: %s\n【视频播放量】:%d\n【弹幕数量】: %d\n【上传日期】: %s'%(info[0],info[1],info[2],info[3])) dms = get_dm(bvid) # 存储弹幕 if len(dms) >= info[2]: # 如果弹幕数量已抓满 return dms else: dms = [] date = time.strftime('%Y-%m-%d',time.localtime(time.time())) # 从今天开始 while True: dm = get_history(bvid,date) dms.extend(dm) print('"%s"弹幕爬取完成!(%d条)'%(date,len(dm))) if len(dm) == 0: # 如果为空 break end = dm[-1]['评论时间'].split()[0] # 取最后一条弹幕的日期 if end == date: # 如果最后一条仍为当天,则往下推一天 end = (datetime.datetime.strptime(end,'%Y-%m-%d')-datetime.timedelta(days=1)).strftime('%Y-%m-%d') if end == info[3]: # 如果已经到达上传那天 break else: date = end dm = get_history(bvid,info[3]) # 避免忽略上传那天的部分数据 dms.extend(dm) print('弹幕爬取完成!(共%d条)'%len(dms)) print('数据去重中...') dms = del_repeat(dms,'rowID') # 按rowID给弹幕去重 print('数据去重完成!(共%d条)'%len(dms)) return dms 一些说明

弹幕数据中的用户ID经过加密,使用这篇博客中给出的解码方法,对爬取的弹幕发送用户ID进行解码。p.s. 解码需要时间较长(大约2000个ID需要20-30分钟)

# run.py # utf-8 import sys import time import pandas as pd CRCPOLYNOMIAL = 0xEDB88320 crctable = [0 for x in range(256)] def create_table(): for i in range(256): crcreg = i for _ in range(8): if (crcreg & 1) != 0: crcreg = CRCPOLYNOMIAL ^ (crcreg >> 1) else: crcreg = crcreg >> 1 crctable[i] = crcreg def crc32(string): crcstart = 0xFFFFFFFF for i in range(len(str(string))): index = (crcstart ^ ord(str(string)[i])) & 255 crcstart = (crcstart >> 8) ^ crctable[index] return crcstart def crc32_last_index(string): crcstart = 0xFFFFFFFF for i in range(len(str(string))): index = (crcstart ^ ord(str(string)[i])) & 255 crcstart = (crcstart >> 8) ^ crctable[index] return index def get_crc_index(t): for i in range(256): if crctable[i] >> 24 == t: return i return -1 def deep_check(i, index): string = "" tc=0x00 hashcode = crc32(i) tc = hashcode & 0xff ^ index[2] if not (tc = 48): return [0] string += str(tc - 48) hashcode = crctable[index[2]] ^ (hashcode >>8) tc = hashcode & 0xff ^ index[1] if not (tc = 48): return [0] string += str(tc - 48) hashcode = crctable[index[1]] ^ (hashcode >> 8) tc = hashcode & 0xff ^ index[0] if not (tc = 48): return [0] string += str(tc - 48) hashcode = crctable[index[0]] ^ (hashcode >> 8) return [1, string] def main(string): index = [0 for x in range(4)] i = 0 ht = int(f'0x{string}', 16) ^ 0xffffffff for i in range(3,-1,-1): index[3-i] = get_crc_index(ht >> (i*8)) snum = crctable[index[3-i]] ht ^= snum >> ((3-i)*8) for i in range(100000000): lastindex = crc32_last_index(i) if lastindex == index[3]: deepCheckData = deep_check(i, index) if deepCheckData[0]: break if i == 100000000: return -1 return f"{i}{deepCheckData[1]}" if __name__ == "__main__": create_table() # start_time = time.time() df = pd.read_csv(sys.argv[1]) user_hash_ID = list(df.用户ID) user_id = [] for hash_id in user_hash_ID: # print(main(hash_id)) user_id.append(main(hash_id)) unique_user_id = list(set(user_id)) print("完成{}个用户ID的转换,不重复的用户ID共{}个".format(len(user_id), len(unique_user_id))) # 包含重复的User ID (rowID unique) user_id_df_dup = pd.DataFrame(user_id) user_id_df_dup.to_csv(sys.argv[2], header = False, index = False) # 不重复的User ID user_id_df = pd.DataFrame(unique_user_id) user_id_df.to_csv(sys.argv[3], header = False, index = False) # print(main(sys.argv[1])) # end_time = time.time() # print(f"耗时: {round(end_time - start_time, 2)}秒")

Tags: data-collection, data-crawler, python

Categories: Tools

Share on Twitter Facebook LinkedIn


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有