Celery+FastAPI+Redis+supervisord部署爬虫任务

目标:用最少的组件、最清晰的方式,把一个爬虫任务系统从本地跑起来,并能稳定部署。

关键词:FastAPI / Celery / Redis / supervisord

常见任务需求

在爬虫或异步任务场景中,常见需求是:

Web 接口负责接收请求(FastAPI)
任务异步执行,不阻塞接口(Celery)
任务队列和结果存储(Redis)
一个命令启动所有进程(supervisord)
这套组合的优点只有一句话:
成熟、稳定、组件少、可横向扩展

整体架构


Client


FastAPI ──► Redis ◄── Celery Worker


Task Result / Queue


  • FastAPI:提供 HTTP API
  • Celery:执行爬虫任务
  • Redis:Broker + Backend
  • supervisord:进程管理

项目结构

1
2
3
4
5
6
7
8
sz_rc_crawer_pb/
├── main.py # FastAPI 入口
├── tasks.py # Celery 定义
├── config.py # 统一配置加载
├── conf.yaml # 单一配置文件(像 Go viper)
├── logger_config.py # loguru 日志
├── supervisord.conf # 进程管理
├── logs/ # 日志目录

统一配置

conf.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
local:
app_name: crawler

log:
level: DEBUG
dir: logs

celery:
redis_url: "redis://localhost:6379/0"
key_prefix: "{crawler.local}"


qa:
app_name: crawler

log:
level: INFO
dir: logs

celery:
redis_url: "redis://:password@redis-qa.xxx.com:6379/0"
key_prefix: "{crawler.qa}"

通过一个环境变量控制:

1
export APP_ENV=local

config.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import os
import yaml


BASE_DIR = os.path.dirname(os.path.abspath(__file__))
ENV = os.getenv("APP_ENV", "local")


CONF_PATH = os.path.join(BASE_DIR, "conf.yaml")


with open(CONF_PATH, "r", encoding="utf-8") as f:
raw = yaml.safe_load(f)


conf = raw[ENV]

特点:

  1. 只有一个配置文件
  2. 不引入一堆 Python 模块
  3. 行为和 viper 非常接近

日志系统

logger_config.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from loguru import logger
import sys
import os
from config import conf

def setup_logger(name: str):
log_dir = conf["log"]["dir"]
os.makedirs(log_dir, exist_ok=True)
logger.remove()
logger.add(
sys.stdout,
level=conf["log"]["level"],
format="{time} | {level} | {name}:{function}:{line} - {message}",)
logger.add(
f"{log_dir}/{name}_{{time:YYYY-MM-DD}}.log",
rotation="00:00",
retention="30 days",
level="DEBUG")
return logger

FastAPI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from fastapi import FastAPI
from logger_config import setup_logger


logger = setup_logger("api")
app = FastAPI()



@app.post("/crawl")
def start_crawl(keyword: str):
task = crawl.delay(keyword)
return {"task_id": task.id}

@app.get("/task/{task_id}")
def get_status(task_id: str):
result = AsyncResult(task_id, app=celery_app)
return {
"state": result.state,
"result": result.result
}

curl http://127.0.0.1:8000/crawl

Celery(爬虫任务执行)

tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from celery import Celery
from config import conf
from logger_config import setup_logger


logger = setup_logger("celery")


celery_app = Celery(
conf["app_name"],
broker=conf["celery"]["redis_url"],
backend=conf["celery"]["redis_url"],
)




celery_app.conf.update(
broker_transport_options={
"global_keyprefix": conf["celery"]["key_prefix"],
"socket_timeout": 5,
},
worker_disable_rate_limits=True,
)

@celery_app.task
def crawl(keyword):
for i in range(10):
time.sleep(1)
return {"keyword": keyword, "status": "done"}



logger.info("Celery initialized")

supervisord

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[supervisord]
nodaemon=true
environment=APP_ENV="local",PYTHONPATH="."
logfile=logs/supervisord.log
pidfile=logs/supervisord.pid


[program:uvicorn]
command=uvicorn main:app --host 127.0.0.1 --port 8000
directory=.
autostart=true
autorestart=true
stdout_logfile=logs/uvicorn.out.log
stderr_logfile=logs/uvicorn.err.log


[program:celery]
command=celery -A tasks worker -l info
directory=.
autostart=true
autorestart=true
stdout_logfile=logs/celery.out.log
stderr_logfile=logs/celery.err.log

启动方式

1
2
3
supervisord -c supervisord.conf
# 切环境:
APP_ENV=qa supervisord -c supervisord.conf
0:00 /0:00
暧昧合伙人
遗憾