冰凌汇编

 找回密码
 立即注册

QQ登录

只需一步,快速开始

搜索
查看: 47|回复: 0
收起左侧

[其它] 抖音直播录像程序 v2.0

[复制链接]
Kenyon 发表于 2022-8-5 10:40:38
主要是代码方面的重构,因为是学习python有一段时间,回头看了眼自己写的代码,真的好垃圾,所以重构一下以前代码,想学习的朋友,也可以看看,重构后的代码更便于维护与理解。
抖音直播录像程序 v2.0 - Kenyon_冰凌汇编
说明一下各个文件作用:
# 抖音直播录屏2.0.py 执行源码
douyin_address.txt  直播间分享地址复制到此处
douyin_config.ini 配置文件(想要保存的目录复制到此处就可以,程序会自动创建)
run.bat windows 批处理文件,使用windows 双击运行此文件运行程序
readme.txt 说明文件
抖音直播录像程序 v2.0 - Kenyon_冰凌汇编
源码:
[Python] 纯文本查看 复制代码
# 抖音直播录屏2.0
 
import subprocess
from pathlib import Path
import configparser
from threading import Timer,Thread
from loguru import logger
import requests
import re,time
from datetime import datetime
 
dir_filename = lambda filename:Path(__file__).parent.joinpath(filename)
_ffmepg = dir_filename('ffmpeg.exe')
logger.remove(handler_id=None) # 移除控制台输出
logger.add(dir_filename('douyin_log.log'))
headers = {
    'User-Agent':'Mozilla/5.0 (Linux; U; Android 8.1.0; en-US; Nexus 6P Build/OPM7.181205.001) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/57.0.2987.108 UCBrowser/12.11.1.1197 Mobile Safari/537.36'
}
 
def reading_a_configuration_file():
    """读取配置文件"""
    config = configparser.ConfigParser()
    config.read(dir_filename('douyin_config.ini'),encoding='utf8')
    save_dir = config.get('DouYin','dirpath')
    logger.debug(f"配置文件路径为:{save_dir}")
    return Path(save_dir)
 
def __ffmepg_command(urls,file_path_name):
    """录制命令"""
    cmd = [str(_ffmepg), "-y",
            "-v","verbose", 
            "-timeout","2000000",
            "-loglevel","error",
            "-hide_banner",
            "-user_agent",headers["User-Agent"],
            "-analyzeduration","2147483647",
            "-probesize","2147483647",
            "-i",urls,
            '-bufsize','5000k',
            "-map","0",
            "-sn","-dn",
            '-max_muxing_queue_size','64',
            str(file_path_name)]
    return cmd
 
@logger.catch
def requests_func(url,urlparams=False,json_params=False):
    """请求函数"""
    res = requests.get(url,headers=headers,timeout=10)
    res.encoding = 'utf-8-sig'
    if urlparams == True:
        return res.url
    else:
        if json_params == True:
            return res.json()
        else:
            return res.text
     
@logger.catch
def get_roomid(shorturl):
    """获取roomid号"""
    long_url = requests_func(shorturl[0],urlparams=True)
    return re.findall(r'reflow\/(\d+)\?',long_url)[0]
 
def title_filter(title: str):
    """
    转为Windows合法文件名
    """
    # 非法字符
    lst = ['\r', '\n', '\\', '/', ':', '*', '?', '"', '<', '>', '|']
    # 非法字符处理方式1
    for key in lst:
        title = title.replace(key, '-')
    # 非法字符处理方式2
    # title = title.translate(None, ''.join(lst))
    # 文件名+路径长度最大255,汉字*2,取60
    if len(title) > 60:
        title = title[:60]
    return title.strip()
     
 
def heartbeat_detection_content():
    """心跳检测内容"""
    global recording
    live_info_url = lambda roomid:f"https://webcast.amemv.com/webcast/room/reflow/info/?type_id=0&live_id=1&room_id={roomid}&app_id=1128"
 
    res_json = requests_func(live_info_url(get_roomid(shorturl)),json_params=True)
    try:
        room_ids_str = res_json["data"]["room"]["owner"]["own_room"]["room_ids_str"][0]
        res_json = requests_func(live_info_url(room_ids_str),json_params=True)
        flv_rtmp = res_json['data']['room']['stream_url']['flv_pull_url'].get('FULL_HD1') or \
            res_json['data']['room']['stream_url']['flv_pull_url'].get('HD1') or \
                res_json['data']['room']['stream_url'].get('rtmp_pull_url')
        nickname = res_json["data"]["room"]["owner"]['nickname']
        nickname = title_filter(nickname)
        recording = True
        logger.info(f'开始录制主播{nickname}')
        return flv_rtmp,nickname
    except:
        recording = False
 
def the_recording_function(datas):
    """录制函数"""
    if datas:
        live_fielename = f'{datas[1]}-{datetime.now().strftime("%Y-%m-%d_%H_%M_%S")}.mp4'
        makedir = reading_a_configuration_file()
        makedir = Path(makedir)
        makedir.mkdir(parents=True,exist_ok=True) # 创建文件夹
        _path = makedir.joinpath(live_fielename) # 文件保存路径
        try:
            subprocess.Popen(__ffmepg_command(datas[0],_path)).wait()# 录制
        except Exception as e:
            logger.info('开播准备中')
             
def the_heartbeat_detection(times):
    """心跳检测执行函数"""
    datas = heartbeat_detection_content()
    Timer(times,the_heartbeat_detection,args=(times,))
    if recording == True:
        the_recording_function(datas) 
    else:
        logger.info('还未开播')
 
def time_monitoring_function():
    """录制状态显示"""
    start_time = datetime.now()
    while True:
        time.sleep(5)
        if recording == False:
            start_time = datetime.now()
            print('\r等待录播中ing')
        else:
            print(f'\r正在录制:{datetime.now()-start_time}')
 
if __name__ == "__main__":    
    with open(dir_filename('douyin_address.txt'),'r',encoding='utf8') as f:
        shorturl = f.readlines()
    print(f'共享链接地址:{shorturl[0]}')
    input('回车开始录制')
    t = Thread(target = time_monitoring_function,args=()) # 监控直播已经多少秒
    t.start()
    the_heartbeat_detection(10) # 每10秒检测一次

抖音直播录像 2.0.zip

17.59 MB, 下载次数: 0, 下载积分: B币 -2

冰凌汇编免责声明
以上内容均来自网友转发或原创,如存在侵权请发送到站方邮件9003554@qq.com处理。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|小黑屋|站点统计|Archiver|小黑屋|RSS|冰凌汇编 ( 滇ICP备2022002049号 滇公网安备 53032102000029号)|网站地图

GMT+8, 2022-10-6 22:52 , Processed in 0.128004 second(s), 8 queries , Redis On.

冰凌汇编 - 建立于2021年12月20日

Powered by Discuz! © 2001-2022 Comsenz Inc.

快速回复 返回顶部 返回列表