From 55f419916c872df3c2d132c661595bcc227f23e6 Mon Sep 17 00:00:00 2001 From: sunjiaang <1015564607@qq.com> Date: Wed, 11 Mar 2026 18:59:22 +0800 Subject: [PATCH] fix: align zhihu and douyin postgres persistence # Conflicts: # database/models.py --- database/models.py | 190 ++++++++++++++++++------------------ store/douyin/__init__.py | 12 ++- store/douyin/_store_impl.py | 4 +- store/zhihu/__init__.py | 9 ++ 4 files changed, 117 insertions(+), 98 deletions(-) diff --git a/database/models.py b/database/models.py index ac8d1cdc3..a420a568b 100644 --- a/database/models.py +++ b/database/models.py @@ -49,116 +49,116 @@ class BilibiliVideo(Base): class BilibiliVideoComment(Base): __tablename__ = 'bilibili_video_comment' - id = Column(Integer, primary_key=True, comment='主键ID') - user_id = Column(String(255), comment='用户ID') - nickname = Column(Text, comment='用户昵称') - sex = Column(Text, comment='性别') - sign = Column(Text, comment='签名') - avatar = Column(Text, comment='头像') - add_ts = Column(BigInteger, comment='添加时间戳') - last_modify_ts = Column(BigInteger, comment='最后修改时间戳') - comment_id = Column(BigInteger, index=True, comment='评论ID') - video_id = Column(BigInteger, index=True, comment='视频ID') - content = Column(Text, comment='评论内容') - create_time = Column(BigInteger, comment='创建时间戳') - sub_comment_count = Column(Text, comment='子评论数') - parent_comment_id = Column(String(255), comment='父评论ID') - like_count = Column(Text, default='0', comment='点赞数') + id = Column(Integer, primary_key=True, comment='??ID') + user_id = Column(String(255), comment='??ID') + nickname = Column(Text, comment='????') + sex = Column(Text, comment='??') + sign = Column(Text, comment='??') + avatar = Column(Text, comment='??') + add_ts = Column(BigInteger, comment='?????') + last_modify_ts = Column(BigInteger, comment='???????') + comment_id = Column(BigInteger, index=True, comment='??ID') + video_id = Column(BigInteger, index=True, comment='??ID') + content = Column(Text, comment='????') + create_time = Column(BigInteger, comment='?????') + sub_comment_count = Column(Text, comment='????') + parent_comment_id = Column(String(255), comment='???ID') + like_count = Column(Text, default='0', comment='???') class BilibiliUpInfo(Base): __tablename__ = 'bilibili_up_info' - id = Column(Integer, primary_key=True, comment='主键ID') - user_id = Column(BigInteger, index=True, comment='用户ID') - nickname = Column(Text, comment='用户昵称') - sex = Column(Text, comment='性别') - sign = Column(Text, comment='签名') - avatar = Column(Text, comment='头像') - add_ts = Column(BigInteger, comment='添加时间戳') - last_modify_ts = Column(BigInteger, comment='最后修改时间戳') - total_fans = Column(Integer, comment='总粉丝数') - total_liked = Column(Integer, comment='总获赞数') - user_rank = Column(Integer, comment='用户等级') - is_official = Column(Integer, comment='是否官方认证') + id = Column(Integer, primary_key=True, comment='??ID') + user_id = Column(BigInteger, index=True, comment='??ID') + nickname = Column(Text, comment='????') + sex = Column(Text, comment='??') + sign = Column(Text, comment='??') + avatar = Column(Text, comment='??') + add_ts = Column(BigInteger, comment='?????') + last_modify_ts = Column(BigInteger, comment='???????') + total_fans = Column(Integer, comment='????') + total_liked = Column(Integer, comment='????') + user_rank = Column(Integer, comment='????') + is_official = Column(Integer, comment='??????') class BilibiliContactInfo(Base): __tablename__ = 'bilibili_contact_info' - id = Column(Integer, primary_key=True, comment='主键ID') - up_id = Column(BigInteger, index=True, comment='UP主ID') - fan_id = Column(BigInteger, index=True, comment='粉丝ID') - up_name = Column(Text, comment='UP主名称') - fan_name = Column(Text, comment='粉丝名称') - up_sign = Column(Text, comment='UP主签名') - fan_sign = Column(Text, comment='粉丝签名') - up_avatar = Column(Text, comment='UP主头像') - fan_avatar = Column(Text, comment='粉丝头像') - add_ts = Column(BigInteger, comment='添加时间戳') - last_modify_ts = Column(BigInteger, comment='最后修改时间戳') + id = Column(Integer, primary_key=True, comment='??ID') + up_id = Column(BigInteger, index=True, comment='UP?ID') + fan_id = Column(BigInteger, index=True, comment='??ID') + up_name = Column(Text, comment='UP???') + fan_name = Column(Text, comment='????') + up_sign = Column(Text, comment='UP???') + fan_sign = Column(Text, comment='????') + up_avatar = Column(Text, comment='UP???') + fan_avatar = Column(Text, comment='????') + add_ts = Column(BigInteger, comment='?????') + last_modify_ts = Column(BigInteger, comment='???????') class BilibiliUpDynamic(Base): __tablename__ = 'bilibili_up_dynamic' - id = Column(Integer, primary_key=True, comment='主键ID') - dynamic_id = Column(BigInteger, index=True, comment='动态ID') - user_id = Column(String(255), comment='用户ID') - user_name = Column(Text, comment='用户名称') - text = Column(Text, comment='动态内容') - type = Column(Text, comment='动态类型') - pub_ts = Column(BigInteger, comment='发布时间戳') - total_comments = Column(Integer, comment='总评论数') - total_forwards = Column(Integer, comment='总转发数') - total_liked = Column(Integer, comment='总点赞数') - add_ts = Column(BigInteger, comment='添加时间戳') - last_modify_ts = Column(BigInteger, comment='最后修改时间戳') + id = Column(Integer, primary_key=True, comment='??ID') + dynamic_id = Column(BigInteger, index=True, comment='??ID') + user_id = Column(String(255), comment='??ID') + user_name = Column(Text, comment='????') + text = Column(Text, comment='????') + type = Column(Text, comment='????') + pub_ts = Column(BigInteger, comment='?????') + total_comments = Column(Integer, comment='????') + total_forwards = Column(Integer, comment='????') + total_liked = Column(Integer, comment='????') + add_ts = Column(BigInteger, comment='?????') + last_modify_ts = Column(BigInteger, comment='???????') class DouyinAweme(Base): __tablename__ = 'douyin_aweme' - id = Column(Integer, primary_key=True, comment='主键ID') - user_id = Column(String(255), comment='用户ID') - sec_uid = Column(String(255), comment='安全用户ID') - short_user_id = Column(String(255), comment='短用户ID') - user_unique_id = Column(String(255), comment='用户唯一ID') - nickname = Column(Text, comment='用户昵称') - avatar = Column(Text, comment='用户头像') - user_signature = Column(Text, comment='用户签名') - ip_location = Column(Text, comment='IP地址位置') - add_ts = Column(BigInteger, comment='添加时间戳') - last_modify_ts = Column(BigInteger, comment='最后修改时间戳') - aweme_id = Column(BigInteger, index=True, comment='作品ID') - aweme_type = Column(Text, comment='作品类型') - title = Column(Text, comment='作品标题') - desc = Column(Text, comment='作品描述') - create_time = Column(BigInteger, index=True, comment='创建时间戳') - liked_count = Column(Text, comment='点赞数') - comment_count = Column(Text, comment='评论数') - share_count = Column(Text, comment='分享数') - collected_count = Column(Text, comment='收藏数') - aweme_url = Column(Text, comment='作品URL') - cover_url = Column(Text, comment='封面URL') - video_download_url = Column(Text, comment='视频下载URL') - music_download_url = Column(Text, comment='音乐下载URL') - note_download_url = Column(Text, comment='笔记下载URL') - source_keyword = Column(Text, default='', comment='来源关键词') + id = Column(Integer, primary_key=True, comment='??ID') + user_id = Column(String(255), comment='??ID') + sec_uid = Column(String(255), comment='????ID') + short_user_id = Column(String(255), comment='???ID') + user_unique_id = Column(String(255), comment='????ID') + nickname = Column(Text, comment='????') + avatar = Column(Text, comment='????') + user_signature = Column(Text, comment='????') + ip_location = Column(Text, comment='IP????') + add_ts = Column(BigInteger, comment='?????') + last_modify_ts = Column(BigInteger, comment='???????') + aweme_id = Column(String(64), index=True, comment='??ID') + aweme_type = Column(Text, comment='????') + title = Column(Text, comment='????') + desc = Column(Text, comment='????') + create_time = Column(BigInteger, index=True, comment='?????') + liked_count = Column(Text, comment='???') + comment_count = Column(Text, comment='???') + share_count = Column(Text, comment='???') + collected_count = Column(Text, comment='???') + aweme_url = Column(Text, comment='??URL') + cover_url = Column(Text, comment='??URL') + video_download_url = Column(Text, comment='????URL') + music_download_url = Column(Text, comment='????URL') + note_download_url = Column(Text, comment='????URL') + source_keyword = Column(Text, default='', comment='?????') class DouyinAwemeComment(Base): __tablename__ = 'douyin_aweme_comment' - id = Column(Integer, primary_key=True, comment='主键ID') - user_id = Column(String(255), comment='用户ID') - sec_uid = Column(String(255), comment='安全用户ID') - short_user_id = Column(String(255), comment='短用户ID') - user_unique_id = Column(String(255), comment='用户唯一ID') - nickname = Column(Text, comment='用户昵称') - avatar = Column(Text, comment='用户头像') - user_signature = Column(Text, comment='用户签名') - ip_location = Column(Text, comment='IP地址位置') - add_ts = Column(BigInteger, comment='添加时间戳') - last_modify_ts = Column(BigInteger, comment='最后修改时间戳') - comment_id = Column(BigInteger, index=True, comment='评论ID') - aweme_id = Column(BigInteger, index=True, comment='作品ID') - content = Column(Text, comment='评论内容') - create_time = Column(BigInteger, comment='创建时间戳') - sub_comment_count = Column(Text, comment='子评论数') - parent_comment_id = Column(String(255), comment='父评论ID') - like_count = Column(Text, default='0', comment='点赞数') - pictures = Column(Text, default='', comment='图片') + id = Column(Integer, primary_key=True, comment='??ID') + user_id = Column(String(255), comment='??ID') + sec_uid = Column(String(255), comment='????ID') + short_user_id = Column(String(255), comment='???ID') + user_unique_id = Column(String(255), comment='????ID') + nickname = Column(Text, comment='????') + avatar = Column(Text, comment='????') + user_signature = Column(Text, comment='????') + ip_location = Column(Text, comment='IP????') + add_ts = Column(BigInteger, comment='?????') + last_modify_ts = Column(BigInteger, comment='???????') + comment_id = Column(String(64), index=True, comment='??ID') + aweme_id = Column(String(64), index=True, comment='??ID') + content = Column(Text, comment='????') + create_time = Column(BigInteger, comment='?????') + sub_comment_count = Column(Text, comment='????') + parent_comment_id = Column(String(255), comment='???ID') + like_count = Column(Text, default='0', comment='???') + pictures = Column(Text, default='', comment='??') class DyCreator(Base): __tablename__ = 'dy_creator' diff --git a/store/douyin/__init__.py b/store/douyin/__init__.py index 9e4c21c52..b34c6bc8b 100644 --- a/store/douyin/__init__.py +++ b/store/douyin/__init__.py @@ -30,6 +30,13 @@ from .douyin_store_media import * +def _normalize_str_fields(payload: Dict, fields: List[str]): + for field in fields: + value = payload.get(field) + if value is not None and value != "": + payload[field] = str(value) + + class DouyinStoreFactory: STORES = { "csv": DouyinCsvStoreImplement, @@ -184,6 +191,7 @@ async def update_douyin_aweme(aweme_item: Dict): "note_download_url": ",".join(_extract_note_image_list(aweme_item)), "source_keyword": source_keyword_var.get(), } + _normalize_str_fields(save_content_item, ["aweme_id", "user_id", "sec_uid", "short_user_id", "user_unique_id"]) utils.logger.info(f"[store.douyin.update_douyin_aweme] douyin aweme id:{aweme_id}, title:{save_content_item.get('title')}") await DouyinStoreFactory.create_store().store_content(content_item=save_content_item) @@ -218,11 +226,12 @@ async def update_dy_aweme_comment(aweme_id: str, comment_item: Dict): "nickname": user_info.get("nickname"), "avatar": avatar_info.get("url_list", [""])[0], "sub_comment_count": str(comment_item.get("reply_comment_total", 0)), - "like_count": (comment_item.get("digg_count") if comment_item.get("digg_count") else 0), + "like_count": str(comment_item.get("digg_count") if comment_item.get("digg_count") else 0), "last_modify_ts": utils.get_current_timestamp(), "parent_comment_id": parent_comment_id, "pictures": ",".join(_extract_comment_image_list(comment_item)), } + _normalize_str_fields(save_comment_item, ["comment_id", "aweme_id", "user_id", "sec_uid", "short_user_id", "user_unique_id", "parent_comment_id"]) utils.logger.info(f"[store.douyin.update_dy_aweme_comment] douyin aweme comment: {comment_id}, content: {save_comment_item.get('content')}") await DouyinStoreFactory.create_store().store_comment(comment_item=save_comment_item) @@ -245,6 +254,7 @@ async def save_creator(user_id: str, creator: Dict): "videos_count": user_info.get("aweme_count", 0), "last_modify_ts": utils.get_current_timestamp(), } + _normalize_str_fields(local_db_item, ["user_id"]) utils.logger.info(f"[store.douyin.save_creator] creator:{local_db_item}") await DouyinStoreFactory.create_store().store_creator(local_db_item) diff --git a/store/douyin/_store_impl.py b/store/douyin/_store_impl.py index 93f7fa813..b5adc06ba 100644 --- a/store/douyin/_store_impl.py +++ b/store/douyin/_store_impl.py @@ -97,7 +97,7 @@ async def store_content(self, content_item: Dict): Args: content_item: content item dict """ - aweme_id = int(content_item.get("aweme_id")) + aweme_id = str(content_item.get("aweme_id")) async with get_session() as session: result = await session.execute(select(DouyinAweme).where(DouyinAweme.aweme_id == aweme_id)) aweme_detail = result.scalar_one_or_none() @@ -118,7 +118,7 @@ async def store_comment(self, comment_item: Dict): Args: comment_item: comment item dict """ - comment_id = int(comment_item.get("comment_id")) + comment_id = str(comment_item.get("comment_id")) async with get_session() as session: result = await session.execute(select(DouyinAwemeComment).where(DouyinAwemeComment.comment_id == comment_id)) comment_detail = result.scalar_one_or_none() diff --git a/store/zhihu/__init__.py b/store/zhihu/__init__.py index 5b749634e..5b4bace9a 100644 --- a/store/zhihu/__init__.py +++ b/store/zhihu/__init__.py @@ -35,6 +35,13 @@ from var import source_keyword_var +def _normalize_zhihu_time_fields(local_db_item: dict, fields: List[str]): + for field in fields: + value = local_db_item.get(field) + if value is not None and value != "": + local_db_item[field] = str(value) + + class ZhihuStoreFactory: STORES = { "csv": ZhihuCsvStoreImplement, @@ -80,6 +87,7 @@ async def update_zhihu_content(content_item: ZhihuContent): """ content_item.source_keyword = source_keyword_var.get() local_db_item = content_item.model_dump() + _normalize_zhihu_time_fields(local_db_item, ["created_time", "updated_time"]) local_db_item.update({"last_modify_ts": utils.get_current_timestamp()}) utils.logger.info(f"[store.zhihu.update_zhihu_content] zhihu content: {local_db_item}") await ZhihuStoreFactory.create_store().store_content(local_db_item) @@ -112,6 +120,7 @@ async def update_zhihu_content_comment(comment_item: ZhihuComment): """ local_db_item = comment_item.model_dump() + _normalize_zhihu_time_fields(local_db_item, ["publish_time"]) local_db_item.update({"last_modify_ts": utils.get_current_timestamp()}) utils.logger.info(f"[store.zhihu.update_zhihu_note_comment] zhihu content comment:{local_db_item}") await ZhihuStoreFactory.create_store().store_comment(local_db_item)