Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from contextlib import asynccontextmanager

from src.config import settings
from src.redis import start_subscriber, rds

from src.anno import router as anno_router
from src.plan.summary import router as plan_router
from src.plan.details import router as plan_details_router
from src.health_check import router as health_check_router
from src.project import router as project_router
from src.ICP import router as icp_router
from src.icp import router as icp_router
from src.notify_admin import router as notify_admin_router
from src.contact_us import router as contact_us_router

app = FastAPI()

@asynccontextmanager
async def lifespan(a: FastAPI):
await start_subscriber()
yield
await rds.close()


app = FastAPI(lifespan=lifespan)

if settings.static_app_dir:
app.mount("/static", StaticFiles(directory=settings.static_app_dir), name="static")
Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ peewee
pymysql
python-dotenv
pydantic_settings
playhouse
playhouse
redis
aioredis
5 changes: 5 additions & 0 deletions src/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ class Settings(BaseSettings):

notify_admin_url: str

redis_host: str = "redis"
redis_port: int = 6379
redis_db: int = 0
redis_password: str = ""

class Config:
env_file = ".env"

Expand Down
15 changes: 10 additions & 5 deletions src/contact_us/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
from fastapi import APIRouter, Request
from loguru import logger
from fastapi import APIRouter
from time import time
import asyncio

from src.database import ContactUs

router = APIRouter()

CacheExpiration = 600 # 秒
CacheExpiration = 600
cache = None
cache_lock = asyncio.Lock()


async def get_cache():
async def get_contact_cache():
global cache

async with cache_lock:
Expand All @@ -22,10 +21,16 @@ async def get_cache():
return cache[0]


async def clean_contact_cache():
global cache
async with cache_lock:
cache = None


@router.get("/contact_us")
async def contact_us():
data = {}
for c in await get_cache():
for c in await get_contact_cache():
data[c.channel] = c.detail

return {"ec": 200, "code": 0, "data": data}
File renamed without changes.
9 changes: 7 additions & 2 deletions src/notify_admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fastapi import Request, APIRouter
from fastapi import Request, APIRouter, HTTPException
from aiohttp import ClientSession
from loguru import logger

Expand All @@ -8,7 +8,12 @@

@router.post("/notify_admin")
async def notify_admin(request: Request):
body = await request.json()
try:
body = await request.json()
except Exception as e:
logger.error(f"notify_admin invalid json: {e}")
raise HTTPException(status_code=400, detail="Invalid JSON")

logger.info(str(body))

if not settings.notify_admin_url:
Expand Down
8 changes: 7 additions & 1 deletion src/plan/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@


async def get_plan_cache():

global _plan_cache

async with cache_lock:
Expand All @@ -17,3 +16,10 @@ async def get_plan_cache():
_plan_cache = (list(Plan.select().order_by(Plan.plan_index)), now)

return _plan_cache[0]


async def clean_plan_cache():
global _plan_cache

async with cache_lock:
_plan_cache = None
4 changes: 4 additions & 0 deletions src/redis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .client import rds, start_subscriber
from .subscriber import cache_clear_subscriber

__all__ = ["rds", "start_subscriber", "cache_clear_subscriber"]
67 changes: 67 additions & 0 deletions src/redis/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from redis import asyncio as aioredis
from loguru import logger
from src.config import settings
import asyncio


class RedisClient:
def __init__(self):
self.redis = None
self.pubsub = None

async def connect(self):
try:
redis_url = f"redis://:{settings.redis_password}@{settings.redis_host}:{settings.redis_port}/{settings.redis_db}"
if not settings.redis_password:
redis_url = f"redis://{settings.redis_host}:{settings.redis_port}/{settings.redis_db}"

self.redis = aioredis.from_url(redis_url, decode_responses=True)
logger.info(f"rds connect: {settings.redis_host}:{settings.redis_port}")
return True
except Exception as e:
logger.error(f"rds connect: {e}")
return False

async def close(self):
if self.redis:
await self.redis.close()

async def publish(self, channel: str, message: str):
if not self.redis:
logger.warning("rds is None")
return False

try:
await self.redis.publish(channel, message)
logger.info(f"publish {channel}: {message}")
return True
except Exception as e:
logger.error(f"publish error: {e}")
return False

async def subscribe(self, channel: str):
if not self.redis:
logger.warning("rds is None")
return None

try:
self.pubsub = self.redis.pubsub()
await self.pubsub.subscribe(channel)
logger.info(f"subscribe: {channel}")
return self.pubsub
except Exception as e:
logger.error(f"subscribe: {e}")
return None


rds = RedisClient()


async def start_subscriber():
from .subscriber import cache_clear_subscriber

if not await rds.connect():
logger.error("rds connect failed")
return

asyncio.create_task(cache_clear_subscriber())
30 changes: 30 additions & 0 deletions src/redis/subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from loguru import logger

from src.contact_us import clean_contact_cache
from src.plan.cache import clean_plan_cache
from src.redis.client import rds

channel = "misc"


async def cache_clear_subscriber():
try:
pubsub = await rds.subscribe(channel)
if not pubsub:
logger.error("pubsub is None")
return

logger.info("start subscribe misc cache listener")

async for message in pubsub.listen():
if message["type"] == "message":
logger.info("evict message received: {}".format(message))
await clean_plan_cache()
await clean_contact_cache()

except Exception as e:
logger.error(f"subscribe with error: {e}")
finally:
if rds.pubsub:
await rds.pubsub.unsubscribe(channel)
logger.info(f"unsubscribe: {channel}")