目标:用最少的组件、最清晰的方式,把一个爬虫任务系统从本地跑起来,并能稳定部署。
关键词:FastAPI / Celery / Redis / supervisord
常见任务需求
在爬虫或异步任务场景中,常见需求是:
Web 接口负责接收请求(FastAPI)
任务异步执行,不阻塞接口(Celery)
任务队列和结果存储(Redis)
一个命令启动所有进程(supervisord)
这套组合的优点只有一句话:
成熟、稳定、组件少、可横向扩展
整体架构
Client
│
▼
FastAPI ──► Redis ◄── Celery Worker
▲
│
Task Result / Queue
- FastAPI:提供 HTTP API
- Celery:执行爬虫任务
- Redis:Broker + Backend
- supervisord:进程管理
项目结构
1 2 3 4 5 6 7 8
| sz_rc_crawer_pb/ ├── main.py ├── tasks.py ├── config.py ├── conf.yaml ├── logger_config.py ├── supervisord.conf ├── logs/
|
统一配置
conf.yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| local: app_name: crawler
log: level: DEBUG dir: logs
celery: redis_url: "redis://localhost:6379/0" key_prefix: "{crawler.local}"
qa: app_name: crawler
log: level: INFO dir: logs
celery: redis_url: "redis://:password@redis-qa.xxx.com:6379/0" key_prefix: "{crawler.qa}"
|
通过一个环境变量控制:
config.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import os import yaml
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) ENV = os.getenv("APP_ENV", "local")
CONF_PATH = os.path.join(BASE_DIR, "conf.yaml")
with open(CONF_PATH, "r", encoding="utf-8") as f: raw = yaml.safe_load(f)
conf = raw[ENV]
|
特点:
- 只有一个配置文件
- 不引入一堆 Python 模块
- 行为和 viper 非常接近
日志系统
logger_config.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from loguru import logger import sys import os from config import conf
def setup_logger(name: str): log_dir = conf["log"]["dir"] os.makedirs(log_dir, exist_ok=True) logger.remove() logger.add( sys.stdout, level=conf["log"]["level"], format="{time} | {level} | {name}:{function}:{line} - {message}",) logger.add( f"{log_dir}/{name}_{{time:YYYY-MM-DD}}.log", rotation="00:00", retention="30 days", level="DEBUG") return logger
|
FastAPI
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| from fastapi import FastAPI from logger_config import setup_logger
logger = setup_logger("api") app = FastAPI()
@app.post("/crawl") def start_crawl(keyword: str): task = crawl.delay(keyword) return {"task_id": task.id}
@app.get("/task/{task_id}") def get_status(task_id: str): result = AsyncResult(task_id, app=celery_app) return { "state": result.state, "result": result.result }
|
curl http://127.0.0.1:8000/crawl
Celery(爬虫任务执行)
tasks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| from celery import Celery from config import conf from logger_config import setup_logger
logger = setup_logger("celery")
celery_app = Celery( conf["app_name"], broker=conf["celery"]["redis_url"], backend=conf["celery"]["redis_url"], )
celery_app.conf.update( broker_transport_options={ "global_keyprefix": conf["celery"]["key_prefix"], "socket_timeout": 5, }, worker_disable_rate_limits=True, )
@celery_app.task def crawl(keyword): for i in range(10): time.sleep(1) return {"keyword": keyword, "status": "done"}
logger.info("Celery initialized")
|
supervisord
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| [supervisord] nodaemon=true environment=APP_ENV="local",PYTHONPATH="." logfile=logs/supervisord.log pidfile=logs/supervisord.pid
[program:uvicorn] command=uvicorn main:app --host 127.0.0.1 --port 8000 directory=. autostart=true autorestart=true stdout_logfile=logs/uvicorn.out.log stderr_logfile=logs/uvicorn.err.log
[program:celery] command=celery -A tasks worker -l info directory=. autostart=true autorestart=true stdout_logfile=logs/celery.out.log stderr_logfile=logs/celery.err.log
|
启动方式
1 2 3
| supervisord -c supervisord.conf
APP_ENV=qa supervisord -c supervisord.conf
|