#!/usr/bin/env python3.7 # coding=utf-8 ''' # 作者: weimo # 创建日期: 2020-01-04 19:14:43 # 上次编辑时间 : 2020-02-07 17:36:24 # 一个人的命运啊,当然要靠自我奋斗,但是... ''' import re import json import requests from time import localtime from pfunc.cfunc import check_url_locale from basic.vars import qqlive, iqiyiplayer, chrome # 放一些仅通过某个id获取另一个/多个id的方法 #---------------------------------------------qq--------------------------------------------- def get_danmu_target_id_by_vid(vid: str): api_url = "http://bullet.video.qq.com/fcgi-bin/target/regist" params = { "otype":"json", "vid":vid } try: r = requests.get(api_url, params=params, headers=qqlive).content.decode("utf-8") except Exception as e: print("target_id requests error info -->", e) return None data = json.loads(r.lstrip("QZOutputJson=").rstrip(";")) target_id = None if data.get("targetid"): target_id = data["targetid"] return target_id def get_all_vids_by_column_id(): # https://s.video.qq.com/get_playsource?id=85603&plat=2&type=4&data_type=3&video_type=10&year=2019&month=&plname=qq&otype=json # 综艺类型的 pass def get_cid_by_vid(vid): api_url = "http://union.video.qq.com/fcgi-bin/data" params = { "tid": "98", "appid": "10001005", "appkey": "0d1a9ddd94de871b", "idlist": vid, "otype":"json" } r = requests.get(api_url, params=params, headers=qqlive).content.decode("utf-8") data = json.loads(r.lstrip("QZOutputJson=").rstrip(";")) try: cid = data["results"][0]["fields"] except Exception as e: print("load fields error info -->", e) return None if cid.get("sync_cover"): return cid["sync_cover"] elif cid.get("cover_list"): return cid["cover_list"][0] return def get_all_vids_by_cid(cid): api_url = "http://union.video.qq.com/fcgi-bin/data" params = { "tid":"431", "appid":"10001005", "appkey":"0d1a9ddd94de871b", "idlist":cid, "otype":"json" } r = requests.get(api_url, params=params, headers=qqlive).content.decode("utf-8") data = json.loads(r.lstrip("QZOutputJson=").rstrip(";")) try: nomal_ids = json.loads(data["results"][0]["fields"]["nomal_ids"]) except Exception as e: print("load nomal_ids error info -->", e) return None # F 2是免费 7是会员 0是最新正片之前的预告 4是正片之后的预告 vids = [item["V"] for item in nomal_ids if item["F"] in [2, 7]] return vids #---------------------------------------------qq--------------------------------------------- #-------------------------------------------iqiyi-------------------------------------------- def get_vinfos(aid, locale="zh_cn"): api_url = "http://cache.video.iqiyi.com/avlist/{}/0/".format(aid) if locale != "zh_cn": api_url += "?locale=" + locale try: r = requests.get(api_url, headers=chrome, timeout=5).content.decode("utf-8") except Exception as e: print("get_vinfos requests error info -->", e) return None data = json.loads(r[len("var videoListC="):]) try: vlist = data["data"]["vlist"] except Exception as e: print("get_vinfos load vlist error info -->", e) return None vinfos = [[v["shortTitle"] + "_" + str(v["timeLength"]), v["timeLength"], ["id"]] for v in vlist] return vinfos def matchit(patterns, text): ret = None for pattern in patterns: match = re.match(pattern, text) if match: ret = match.group(1) break return ret def duration_to_sec(duration: str): return sum(x * int(t) for x, t in zip([3600, 60, 1][2 - duration.count(":"):], duration.split(":"))) def get_year_range(aid, locale="zh_cn"): # 获取第一个和最新一个视频的年份,生成列表返回,遇到任何错误则返回当前年份 year_start = year_end = localtime().tm_year api_url = "http://pcw-api.iqiyi.com/album/album/baseinfo/{}".format(aid) if locale != "zh_cn": api_url += "?locale=" + locale try: r = requests.get(api_url, headers=chrome, timeout=5).content.decode("utf-8") except Exception as e: print("error info -->", e) return list(range(year_start, year_end + 1)) data = json.loads(r)["data"] if data.get("firstVideo"): year_start = int(data["firstVideo"]["period"][:4]) if data.get("latestVideo"): year_end = int(data["latestVideo"]["period"][:4]) return list(range(year_start, year_end + 1)) def get_vinfo_by_tvid(tvid, locale="zh_cn", isall=False): api_url = "https://pcw-api.iqiyi.com/video/video/baseinfo/{}".format(tvid) if locale != "zh_cn": api_url += "?locale=" + locale try: r = requests.get(api_url, headers=chrome, timeout=5).content.decode("utf-8") except Exception as e: print("error info -->", e) return data = json.loads(r)["data"] if data.__class__ != dict: return None if isall: aid = data.get("albumId") if aid is None: print("通过单集tvid获取合集aid失败,将只下载单集的弹幕") locale = check_video_area_by_tvid(tvid) if locale is None: locale = "zh_cn" return get_vinfos(aid, locale=locale) name = data["name"] duration = data["durationSec"] return [[name + "_" + str(duration), duration, tvid]] def check_video_area_by_tvid(tvid): api_url = "https://pcw-api.iqiyi.com/video/video/playervideoinfo?tvid={}".format(tvid) try: r = requests.get(api_url, headers=chrome, timeout=5).content.decode("utf-8") except Exception as e: print("check_video_area_by_tvid error info -->", e) return None data = json.loads(r)["data"] intl_flag = data["operation_base"]["is_international"] langs = [item["language"].lower() for item in data["operation_language_base"]] locale = "zh_cn" if intl_flag is False and "zh_tw" in langs: locale = "zh_tw" return locale def get_vinfos_by_year(aid, years: list, cid=6, locale="zh_cn"): api_url = "https://pcw-api.iqiyi.com/album/source/svlistinfo?cid={}&sourceid={}&timelist={}".format(cid, aid, ",".join([str(_) for _ in years.copy()])) if locale != "zh_cn": api_url += "&locale=" + locale try: r = requests.get(api_url, headers=chrome, timeout=5).content.decode("utf-8") except Exception as e: print("get_vinfos_by_year error info -->", e) return None data = json.loads(r)["data"] vinfos = [] for year in years: if year.__class__ != str: year = str(year) if data.get(year) is None: continue for ep in data[year]: sec = duration_to_sec(ep["duration"]) vinfos.append([ep["shortTitle"] + "_" + str(sec), sec, ep["tvId"]]) return vinfos def get_vinfos_by_url(url, isall=False): locale = check_url_locale(url) patterns = [".+?/w_(\w+?).html", ".+?/v_(\w+?).html", ".+?/a_(\w+?).html", ".+?/lib/m_(\w+?).html"] isw, isep, isas, isms = [re.match(pattern, url) for pattern in patterns] if isw is None and isep is None and isas is None and isms is None: return None try: r = requests.get(url, headers=chrome, timeout=5).content.decode("utf-8") except Exception as e: print("get_vinfos_by_url error info -->", e) return None cid_patterns = ["[\s\S]+?\.cid.+?(\d+)", "[\s\S]+?cid: \"(\d+)\"", "[\s\S]+?channelID.+?\"(\d+)\""] cid = matchit(cid_patterns, r) aid_patterns = ["[\s\S]+?aid:'(\d+)'", "[\s\S]+?albumid=\"(\d+)\"", "[\s\S]+?movlibalbumaid=\"(\d+)\"", "[\s\S]+?data-score-tvid=\"(\d+)\""] aid = matchit(aid_patterns, r) tvid_patterns = ["[\s\S]+?\"tvid\":\"(\d+)\"", "[\s\S]+?\['tvid'\].+?\"(\d+)\""] tvid = matchit(tvid_patterns, r) if cid is None: cid = "" elif cid == "6" and isas or isms:#对于综艺合集需要获取年份 # year_patterns = ["[\s\S]+?datePublished.+?(\d\d\d\d)-\d\d-\d\d", "[\s\S]+?data-year=\"(\d+)\""] # year = matchit(year_patterns, r) # if year is None: # years = [localtime().tm_year] # else: # years = [year] years = get_year_range(aid, locale=locale) else: pass#暂时没有其他的情况计划特别处理 if isep or isw: if tvid is None: return return get_vinfo_by_tvid(tvid, locale=locale, isall=isall) if isas or isms: if aid is None: return if cid == "6": return get_vinfos_by_year(aid, years, locale=locale) else: return get_vinfos(aid, locale=locale) #-------------------------------------------iqiyi-------------------------------------------- #-------------------------------------------youku-------------------------------------------- def get_vinfos_by_url_youku(url, isall=False): vid_patterns = ["[\s\S]+?youku.com/video/id_(/+?)\.html", "[\s\S]+?youku.com/v_show/id_(.+?)\.html"] video_id = matchit(vid_patterns, url) show_id_patterns = ["[\s\S]+?youku.com/v_nextstage/id_(/+?)\.html", "[\s\S]+?youku.com/show/id_z(.+?)\.html", "[\s\S]+?youku.com/show_page/id_z(.+?)\.html", "[\s\S]+?youku.com/alipay_video/id_(.+?)\.html"] show_id = matchit(show_id_patterns, url) if video_id is None and show_id is None: return None if video_id: return get_vinfos_by_video_id(video_id, isall=isall) if show_id.__len__() == 20 and show_id == show_id.lower(): return get_vinfos_by_show_id(show_id) else: return get_vinfos_by_video_id(show_id, isall=isall) def get_vinfos_by_video_id(video_id, isall=False): api_url = "https://openapi.youku.com/v2/videos/show.json?client_id=53e6cc67237fc59a&package=com.huawei.hwvplayer.youku&ext=show&video_id={}".format(video_id) try: r = requests.get(api_url, headers=chrome, timeout=5).content.decode("utf-8") except Exception as e: print("get_vinfos_by_video_id error info -->", e) return None data = json.loads(r) if isall: show_id = data["show"]["id"] return get_vinfos_by_show_id(show_id) duration = 0 if data.get("duration"): duration = int(float(data["duration"])) if data.get("title"): name = data["title"] + "_" + str(duration) else: name = "优酷未知" + "_" + str(duration) vinfo = [name, duration, video_id] return [vinfo] def get_vinfos_by_show_id(show_id): api_url = "https://openapi.youku.com/v2/shows/videos.json?show_videotype=正片&count=100&client_id=53e6cc67237fc59a&page=1&show_id={}&package=com.huawei.hwvplayer.youku".format(show_id) try: r = requests.get(api_url, headers=chrome, timeout=5).content.decode("utf-8") except Exception as e: print("get_vinfos_by_show_id error info -->", e) return None data = json.loads(r)["videos"] if data.__len__() == 0: return None vinfos = [] for video in data: duration = 0 if video.get("duration"): duration = int(float(video["duration"])) if video.get("title"): name = video["title"] + "_" + str(duration) else: name = "优酷未知_{}".format(video["id"]) + "_" + str(duration) vinfos.append([name, duration, video["id"]]) return vinfos #-------------------------------------------youku--------------------------------------------