Featured image of post python中将函数设置为定时任务

python中将函数设置为定时任务

将函数设置为定时任务有几种方法。以下是几种常用的实现方案:

方案1:使用Python的schedule库

这是一个在Python应用程序内部实现的轻量级定时任务方案:

import schedule
import time
import threading
from src.db import refresh_content

def run_scheduler():
    """启动定时任务调度器"""
    # 每小时执行一次刷新
    schedule.every(1).hour.do(refresh_content)
    
    # 也可以设置特定时间执行,例如每天凌晨2点
    # schedule.every().day.at("02:00").do(refresh_content)
    
    while True:
        schedule.run_pending()
        time.sleep(60)  # 每分钟检查一次是否有待执行的任务

# 在单独的线程中运行调度器,这样不会阻塞主应用
def start_scheduler_thread():
    scheduler_thread = threading.Thread(target=run_scheduler)
    scheduler_thread.daemon = True  # 设置为守护线程,这样主程序退出时线程也会退出
    scheduler_thread.start()

将此代码添加到应用程序启动过程中:

if __name__ == "__main__":
    # 启动定时任务
    start_scheduler_thread()
    
    # 启动主应用(例如Gradio界面)
    app.launch()

方案2:使用APScheduler库

APScheduler是一个更强大的Python调度库,支持持久化任务和多种调度器后端:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from src.db import refresh_content

def setup_scheduler():
    """设置并启动APScheduler调度器"""
    scheduler = BackgroundScheduler()
    
    # 添加刷新任务,每小时执行一次
    scheduler.add_job(
        func=refresh_content,
        trigger=IntervalTrigger(hours=1),
        id='refresh_content_job',
        name='刷新订阅内容',
        replace_existing=True
    )
    
    # 启动调度器
    scheduler.start()
    return scheduler

在主应用中集成:

if __name__ == "__main__":
    # 启动定时任务调度器
    scheduler = setup_scheduler()
    
    # 启动主应用
    try:
        app.launch()
    finally:
        # 在应用关闭时优雅地关闭调度器
        scheduler.shutdown()

方案3:使用系统级定时任务

Linux/Mac上使用cron

  1. 创建一个独立的Python脚本,例如refresh_subscriptions.py
#!/usr/bin/env python3
# refresh_subscriptions.py
from src.db import refresh_content

if __name__ == "__main__":
    result = refresh_content()
    print(result)
  1. 使用crontab设置定时任务:
# 编辑crontab
crontab -e

# 添加以下行,每小时执行一次
0 * * * * /path/to/python /path/to/refresh_subscriptions.py >> /path/to/logs/refresh.log 2>&1

Windows上使用任务计划程序

  1. 创建类似的Python脚本
  2. 打开任务计划程序,创建基本任务
  3. 设置触发器为每小时执行一次
  4. 操作设置为运行Python解释器,参数为脚本路径

方案4:使用Celery(对于大型应用)

如果你的应用较大,可以考虑使用Celery:

# 设置Celery
from celery import Celery
from celery.schedules import crontab

app = Celery('subscription_manager', broker='redis://localhost:6379/0')

# 配置定时任务
app.conf.beat_schedule = {
    'refresh-content-hourly': {
        'task': 'tasks.refresh_content_task',
        'schedule': crontab(minute=0, hour='*'),  # 每小时整点执行
    },
}

# 定义任务
@app.task
def refresh_content_task():
    from src.db import refresh_content
    return refresh_content()

实现建议

  1. 对于简单部署:使用方案1或方案2,将定时任务集成到主应用中
  2. 对于生产环境:使用方案3,将刷新任务与Web应用分离,降低相互影响
  3. 对于高负载场景:使用方案4,实现任务队列和分布式处理
最后修改于 Mar 18, 2025 18:23 +0800
使用 Hugo 构建
主题 StackJimmy 设计