详情内容
python抖音批量下载作者所有视频源码,纯官方api,无第三方接口,直接放源码文件已重构,旧版本主代码依旧可用,新版本代码,改为多文件了,可以直接压缩包下载。
直接运行,然后输入分享复制的链接就好
2022.5.20日更新
1. 增加重试,如果下载失败,会重试3次,每次会随机一个新的user-agent,如果还是失败会保存失败的链接到作者目录的"下载失败视频.txt"
2. 评论区指出的作者名称有非法字符文件夹建立失败问题,增加了正则过滤,非法字符串替换为_
3. 下载模式添加了关键字匹配,匹配视频的简介文字
4. 添加下载进度条和视频序号
2022.5.21日更新
1. 添加下载成功的视频不会二次下载,原理是下载完成后会在作者同级文件夹写入下载成功的视频唯一id,下载前检查是否有此id,有的话就跳过。
2. 添加图集下载,会自己判断作品是否为图集,如果是的话,会在作者同级目录下建立一个新的文件夹,以序号存储图片
3. 修复了当文件名不合法时,存储为0byte文件的问题
2022.5.22日更新
1. 增加批量下载模式,请在代码或者单文件程序同级目录下放置一个 ”作者主页链接.txt“ 的文件,格式为每行一个作者主页链接,批量模式是检测到该文件有内容时自动触发,如果想再次使用单作者批量下载请清空文件内容或者删除文件即可
2022.5.23日更新
1. 根据 话痨司机啊 的建议,修改存储已下载视频id的方式改为数据库存储,文件为同级目录下的 "douyin.db" 。如果不想重复下载视频,请不要删除这个文件,或者你想下载之前下载成功的视频,请删除这个文件
2. 解决了部分作者无法下载的问题,原因为小概率会出现视频不存在,丢失视频vid,然后直接报错keyerror
3. 解决了当部分视频简介出现"\n"换行符时导致文件无法创建的问题
2022.5.25日更新
1. 感谢 话痨司机啊 的推荐,使用 gooey 库重构为图形界面,由于界面的原因,代码分为了三个文件,帖子的代码依旧可用,新版本代码改为压缩包分享,下面可下载
2. 修复了部分作者拉取报错的问题,控制台版本也修复了这个问题,可以直接复制使用
2022.5.26日更新
1. 再再再次更新,修复了一个老bug了,这个bug前前后后修了三四次了,这次应该是彻底修复了。就是大家下载的时候需要一个sec_uid参数,这个参数的来源是短链接跳转后的长链接里面的一个参数,前前后后我用了正则,数组,都有bug导致提取不到该参数,今天我才发现有个自带的urllib.parse可以直接把path转为字典,这样就不会解析错误了,我是傻逼....
import linecache
import os
import re
from faker import Faker
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from tqdm import tqdm
import requests
import sqlite3
import urllib.parse
def creat_table(table_name):
conn = sqlite3.connect('douyin.db')
c = conn.cursor()
c.execute(f'''CREATE TABLE IF NOT EXISTS t_{table_name}
(ID INTEGER PRIMARY KEY AUTOINCREMENT,VID TEXT NOT NULL);''')
conn.commit()
conn.close()
def insert_data(table_name, vid):
conn = sqlite3.connect('douyin.db')
c = conn.cursor()
cursor = c.execute(f"SELECT vid from t_{table_name}")
already_have = False
for row in cursor:
if vid in row:
already_have = True
if already_have is False:
c.execute(f"INSERT INTO t_{table_name} (ID,VID) VALUES (null,{vid})")
conn.commit()
conn.close()
def selet_data(table_name):
conn = sqlite3.connect('douyin.db')
c = conn.cursor()
cursor = c.execute(f"SELECT VID from t_{table_name}")
vid_list = [out_exp[0] for out_exp in cursor]
return vid_list
class Douyin:
def __init__(self, url):
self.share_url = url
self.headers = {
'User-Agent': "Mozilla/5.0 (iPhone; U; CPU like Mac OS X; en) AppleWebKit/420+ (KHTML, like Gecko) Version/3.0 Mobile/1C28 Safari/419.3"
}
self.sec_uid = None
self.uid = None
self.nick_name = None
def get_user_info(self):
resp = requests.get(self.share_url, headers=self.headers)
self.sec_uid = 'sec_uid=' + urllib.parse.parse_qs(urllib.parse.urlparse(resp.url).query)['sec_uid'][0]
user_info = f'https://www.iesdouyin.com/web/api/v2/user/info/?{self.sec_uid}'
resp = requests.get(user_info, headers=self.headers)
user_data = {
'signature': resp.json()['user_info']['signature'],
'nickname': resp.json()['user_info']['nickname'],
'aweme_count': resp.json()['user_info']['aweme_count'],
'following_count': resp.json()['user_info']['following_count'],
'total_favorited': resp.json()['user_info']['total_favorited'],
'avatar': resp.json()['user_info']['avatar_larger']['url_list'][0],
}
self.uid = resp.json()['user_info']['uid']
self.nick_name = re.sub(r'[<|>\/:"*?]', '_', resp.json()['user_info']['nickname'])
creat_table(self.uid)
return user_data
def get_all_video(self):
max_cursor = 0
video_has_more = True
all_video_list = []
if self.sec_uid is None:
self.get_user_info()
while video_has_more is True:
json_url = f'https://www.iesdouyin.com/web/api/v2/aweme/post/?{self.sec_uid}&' \
f'count=21&max_cursor={max_cursor}'
resp = requests.get(json_url, headers=self.headers)
video_has_more = resp.json()['has_more']
max_cursor = resp.json()['max_cursor']
video_list = resp.json()['aweme_list']
for i in video_list:
try:
all_video_list.append({'desc': i['desc'], 'vid': i['video']['vid'], 'aweme_id': i['aweme_id']})
except KeyError:
all_video_list.append({'desc': i['desc'], 'vid': None, 'aweme_id': i['aweme_id']})
return all_video_list
def down_video(self, down_info, index):
alreday_down = False
retry = 0
session = requests.session()
session.mount('https://', HTTPAdapter(max_retries=5))
alreday_down_video = selet_data(self.uid)
for n in alreday_down_video:
if down_info['aweme_id'] in n:
alreday_down = True
break
if alreday_down is False:
if os.path.exists(self.nick_name) is False:
try:
os.makedirs(self.nick_name)
except FileExistsError:
pass
if down_info['desc'] == '':
down_info['desc'] = down_info['aweme_id']
down_info['desc'] = re.sub(r'[<|>\/:"*?\n]', '_', down_info['desc'])
save_name = f'{self.nick_name}/{index.zfill(2)}_{down_info["desc"]}.mp4'
if down_info["vid"]:
download_url = f'https://aweme.snssdk.com/aweme/v1/play/?video_id={down_info["vid"]}&ratio=1080p'
response = session.get(download_url, headers=self.headers, stream=True)
while response.content == b'' and retry < 3:
self.headers = {
'User-Agent': Faker().chrome()
}
response = session.get(download_url, headers=self.headers)
retry += 1
if response.content:
with open(save_name, 'wb') as file:
file.write(response.content)
insert_data(self.uid, down_info['aweme_id'])
else:
with open(f'{self.nick_name}/下载失败视频.txt', 'a+') as file:
file.write(download_url + '\n')
else:
n = 0
img_json_url = f'https://www.douyin.com/web/api/v2/aweme/iteminfo/?item_ids={down_info["aweme_id"]}'
if os.path.exists(f'{self.nick_name}/{down_info["desc"]}') is False:
try:
os.makedirs(f'{self.nick_name}/{down_info["desc"]}')
except FileExistsError:
pass
response = session.get(img_json_url, headers=self.headers)
for i in response.json()["item_list"][0]["images"]:
n += 1
img_url = i["url_list"][0]
img_content = session.get(img_url, headers=self.headers).content
img_save_name = f'{self.nick_name}/{down_info["desc"]}/{n}.jpg'
with open(img_save_name, 'wb') as file:
file.write(img_content)
def main(share_url, down_all=False):
share_url = re.search(r'[a-zA-z]+://[^\s]*', share_url).group()
douyin = Douyin(share_url)
info = douyin.get_user_info()
print(f'作者:{info["nickname"]}\n视频数:{info["aweme_count"]}\n{"-" * 20}\n拉取作者所有作品中...')
down_list = douyin.get_all_video()
down_mode = '1'
if not down_all:
down_mode = input(f'{"-" * 20}\n选择下载模式:\n1.全部下载\n2.关键词匹配下载\n')
def down_task(down_load):
with ThreadPoolExecutor(max_workers=10) as t:
obj_list = []
for i in range((len(down_load))):
obj = t.submit(douyin.down_video, down_load[i], str(i))
obj_list.append(obj)
with tqdm(total=len(down_load), ncols=100) as bar:
for x in as_completed(obj_list):
bar.update(1)
if down_mode == '1':
if down_list:
down_task(down_list)
else:
print(f'无视频可下载')
elif down_mode == '2':
k = 0
filter_down_list = []
keyword = input('请输入关键词:')
for v in down_list:
if keyword in v['desc']:
filter_down_list.append(v)
if len(filter_down_list) == 0:
print('无匹配记录')
else:
print(f'共找到 {len(filter_down_list)} 条匹配记录, 开始下载')
down_task(filter_down_list)
else:
print('输入错误')
if __name__ == '__main__':
share_url_list = linecache.getlines('作者主页链接.txt')
if share_url_list:
print('已检测到有批量下载文件,进入批量下载模式\n')
if input('请选择批量下载模式:\n1.下载所有作者所有视频\n2.手动选择每个作者下载模式\n') == '1':
down_all = True
else:
down_all = False
for i in share_url_list:
main(i, down_all)
print(f'当前任务完成\n{"*" * 30}\n')
input('所有任务已完成')
else:
url = input('输入作者主页分享链接:')
main(url)
input('任务已完成')
123资源整合网-全网知识付费项目平台虚拟资源淘宝虚拟货源网赚资源整合中心 » python抖音批量下载作者所有视频源码