×

淘宝商品评论接口技术实现:从评论获取到情感分析全流程方案

Ace Ace 发表于2025-08-29 16:14:16 浏览59 评论0

抢沙发发表评论

商品评论接口是电商数据分析的重要入口,通过评论数据可以挖掘用户需求、分析产品优缺点、监控舆情走向。本文将详细讲解淘宝商品评论接口的技术实现,重点解决评论分页机制、反爬策略应对、数据解析与情感分析等核心问题,提供一套合规、高效的技术方案,同时严格遵守平台规则与数据采集规范。
一、评论接口基础原理与合规要点

淘宝商品评论数据存储在商品详情页的评论模块,通过动态加载方式呈现。实现评论接口需理解其基本原理并遵守以下合规要点:

    数据范围:仅采集商品公开评论(不包含追评、问答等非评论内容)
    请求频率:单商品评论请求间隔不低于 10 秒,单 IP 日请求不超过 1000 次
    使用规范:数据仅用于个人学习、市场调研,不得用于商业竞争或恶意分析
    反爬尊重:不使用破解、绕过等方式获取数据,模拟正常用户浏览行为

评论接口的核心技术流程如下:

plaintext

商品ID解析 → 评论参数构造 → 分页请求发送 → 评论数据提取 → 数据清洗与分析

282cdc6ebfc54cfba1c3e9170eff9241.png
点击获取key和secret
二、核心技术实现:从评论获取到数据解析
1. 商品 ID 解析工具

获取商品评论的前提是解析出正确的商品 ID(item_id),可从商品详情页 URL 或页面内容中提取:

python

运行

    import re
    import requests
    from lxml import etree
     
    class ProductIdParser:
        """商品ID解析器,从URL或页面中提取item_id"""
        
        def __init__(self):
            self.headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36",
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
                "Referer": "https://www.taobao.com/"
            }
        
        def parse_from_url(self, product_url):
            """从商品URL中提取item_id"""
            # 匹配常见的商品URL格式
            patterns = [
                r"item\.taobao\.com/item\.htm\?.*id=(\d+)",
                r"detail\.tmall\.com/item\.htm\?.*id=(\d+)",
                r"id=(\d+)"  # 通用匹配
            ]
            
            for pattern in patterns:
                match = re.search(pattern, product_url)
                if match:
                    return match.group(1)
            return None
        
        def parse_from_page(self, product_url):
            """从商品页面内容中提取item_id(URL解析失败时使用)"""
            try:
                response = requests.get(
                    product_url,
                    headers=self.headers,
                    timeout=10,
                    allow_redirects=True
                )
                response.encoding = "utf-8"
                
                # 从页面HTML中提取item_id
                tree = etree.HTML(response.text)
                # 尝试从meta标签提取
                meta_content = tree.xpath('//meta[@name="mobile-agent"]/@content')
                if meta_content:
                    match = re.search(r"item_id=(\d+)", meta_content[0])
                    if match:
                        return match.group(1)
                
                # 尝试从脚本标签提取
                script_content = tree.xpath('//script[contains(text(), "itemId")]/text()')
                for script in script_content:
                    match = re.search(r"itemId\s*=\s*'?(\d+)'?", script)
                    if match:
                        return match.group(1)
                
                return None
            except Exception as e:
                print(f"从页面提取item_id失败: {str(e)}")
                return None
        
        def get_product_id(self, product_url):
            """获取商品ID,先尝试从URL提取,失败则从页面提取"""
            product_id = self.parse_from_url(product_url)
            if product_id:
                return product_id
            return self.parse_from_page(product_url)

2. 评论参数构造器

淘宝评论请求需要特定的参数组合,包括商品 ID、页码、排序方式等,其中部分参数需要动态生成:

python

运行

    import time
    import hashlib
    import random
     
    class CommentParamsGenerator:
        """评论请求参数生成器"""
        
        def __init__(self):
            self.app_key = "12574478"  # 模拟应用标识
            self.sort_types = {
                "default": 0,    # 默认排序
                "latest": 1,     # 最新评论
                "good": 2,       # 好评
                "poor": 3        # 差评
            }
        
        def generate_params(self, item_id, page=1, sort="default", page_size=20):
            """
            生成评论请求参数
            
            :param item_id: 商品ID
            :param page: 页码
            :param sort: 排序方式
            :param page_size: 每页评论数
            :return: 评论请求参数字典
            """
            # 基础参数
            params = {
                "itemId": item_id,
                "pageNum": page,
                "pageSize": page_size,
                "sortType": self.sort_types.get(sort, 0),
                "auctionNumId": item_id,
                "userType": 0,
                "platform": "h5",
                "needFold": 0,
                "callback": f"jsonp_{int(time.time() * 1000)}_{random.randint(1000, 9999)}"
            }
            
            # 生成时间戳和签名
            t = str(int(time.time() * 1000))
            params["t"] = t
            params["sign"] = self._generate_sign(params)
            
            return params
        
        def _generate_sign(self, params):
            """生成签名,模拟平台参数验证机制"""
            # 按参数名排序并拼接
            sorted_params = sorted(params.items(), key=lambda x: x[0])
            sign_str = "&".join([f"{k}={v}" for k, v in sorted_params if k != "sign"])
            # 加入固定密钥(仅作示例)
            sign_str += "&secret=taobao_comment_demo_key"
            # 计算MD5签名
            return hashlib.md5(sign_str.encode()).hexdigest().upper()

3. 评论请求发送器

处理评论分页请求,包含反爬机制应对策略:

python

运行

    import time
    import random
    import requests
    from fake_useragent import UserAgent
     
    class CommentRequester:
        """评论请求发送器,负责发送请求并处理反爬"""
        
        def __init__(self, proxy_pool=None):
            self.comment_api = "https://h5api.m.taobao.com/h5/mtop.taobao.review.list.get/1.0/"
            self.proxy_pool = proxy_pool or []
            self.ua = UserAgent()
            self.session = requests.Session()
            self.last_request_time = 0
            self.min_interval = 10  # 评论请求最小间隔(秒)
            
        def _get_headers(self):
            """生成随机请求头"""
            return {
                "User-Agent": self.ua.random,
                "Accept": "*/*",
                "Accept-Language": "zh-CN,zh;q=0.9",
                "Referer": "https://detail.tmall.com/",
                "Origin": "https://detail.tmall.com",
                "X-Requested-With": "XMLHttpRequest",
                "Connection": "keep-alive"
            }
        
        def _get_proxy(self):
            """从代理池获取随机代理"""
            if not self.proxy_pool:
                return None
            return random.choice(self.proxy_pool)
        
        def _check_interval(self):
            """确保请求间隔,避免触发反爬"""
            current_time = time.time()
            elapsed = current_time - self.last_request_time
            if elapsed < self.min_interval:
                sleep_time = self.min_interval - elapsed + random.uniform(0, 2)
                print(f"请求间隔不足,休眠 {sleep_time:.1f} 秒")
                time.sleep(sleep_time)
            self.last_request_time = time.time()
        
        def fetch_comments(self, params):
            """
            发送评论请求
            
            :param params: 评论请求参数
            :return: 响应内容或None
            """
            self._check_interval()
            
            headers = self._get_headers()
            proxy = self._get_proxy()
            proxies = {"http": proxy, "https": proxy} if proxy else None
            
            try:
                response = self.session.get(
                    self.comment_api,
                    params=params,
                    headers=headers,
                    proxies=proxies,
                    timeout=15
                )
                
                # 检查响应状态
                if response.status_code != 200:
                    print(f"评论请求失败,状态码: {response.status_code}")
                    return None
                
                # 检查是否被反爬拦截
                if self._is_blocked(response.text):
                    print("评论请求被拦截,可能需要验证")
                    # 移除可能失效的代理
                    if proxy and proxy in self.proxy_pool:
                        self.proxy_pool.remove(proxy)
                    return None
                    
                return response.text
                
            except Exception as e:
                print(f"评论请求异常: {str(e)}")
                return None
        
        def _is_blocked(self, response_text):
            """判断是否被反爬机制拦截"""
            block_keywords = [
                "请输入验证码",
                "访问过于频繁",
                "系统繁忙",
                "验证"
            ]
            for keyword in block_keywords:
                if keyword in response_text:
                    return True
            return False

4. 评论数据解析器

解析评论响应内容,提取结构化的评论数据:

python

运行

    import re
    import json
    from datetime import datetime
     
    class CommentParser:
        """评论数据解析器,提取结构化评论信息"""
        
        def __init__(self):
            # 处理JSONP格式的正则
            self.jsonp_pattern = re.compile(r'jsonp_\d+_\d+\((.*?)\)')
            # 表情符号清理正则
            self.emoji_pattern = re.compile(r'[\U00010000-\U0010ffff]', flags=re.UNICODE)
        
        def parse_jsonp(self, jsonp_text):
            """解析JSONP格式响应为JSON"""
            match = self.jsonp_pattern.search(jsonp_text)
            if not match:
                return None
            try:
                return json.loads(match.group(1))
            except json.JSONDecodeError:
                print("JSON解析失败")
                return None
        
        def clean_comment_text(self, text):
            """清理评论文本,去除多余符号和表情"""
            if not text:
                return ""
            # 去除HTML标签
            text = re.sub(r'<[^>]+>', '', text)
            # 去除表情符号
            text = self.emoji_pattern.sub('', text)
            # 去除多余空格和换行
            text = re.sub(r'\s+', ' ', text).strip()
            return text
        
        def parse_comment_item(self, comment_item):
            """解析单个评论项"""
            try:
                # 解析评论时间
                comment_time = comment_item.get("commentTime", "")
                if comment_time:
                    try:
                        comment_time = datetime.strptime(comment_time, "%Y-%m-%d %H:%M:%S")
                    except ValueError:
                        comment_time = None
                
                # 提取商品属性(如颜色、尺寸等)
                auction_params = comment_item.get("auctionParam", "")
                product_attrs = {}
                if auction_params:
                    for param in auction_params.split(";"):
                        if ":" in param:
                            key, value = param.split(":", 1)
                            product_attrs[key.strip()] = value.strip()
                
                return {
                    "comment_id": comment_item.get("id", ""),
                    "user_nick": comment_item.get("userNick", ""),
                    "user_level": comment_item.get("userVipLevel", 0),
                    "comment_text": self.clean_comment_text(comment_item.get("content", "")),
                    "comment_time": comment_time,
                    "star_rating": comment_item.get("star", 0),  # 星级评分
                    "product_attrs": product_attrs,  # 购买的商品属性
                    "praise_count": comment_item.get("useful", 0),  # 有用数
                    "reply_count": comment_item.get("replyCount", 0),  # 回复数
                    "has_image": len(comment_item.get("images", [])) > 0  # 是否有图
                }
            except Exception as e:
                print(f"解析单个评论失败: {str(e)}")
                return None
        
        def parse_comments(self, jsonp_text):
            """
            解析评论列表
            
            :param jsonp_text: JSONP格式的评论响应
            :return: 包含评论列表和分页信息的字典
            """
            json_data = self.parse_jsonp(jsonp_text)
            if not json_data or json_data.get("ret", [""])[0] != "SUCCESS::调用成功":
                return None
            
            result = {
                "total_comments": 0,
                "total_pages": 0,
                "current_page": 0,
                "comments": []
            }
            
            data = json_data.get("data", {})
            comments = data.get("comments", [])
            
            # 提取分页信息
            result["total_comments"] = data.get("total", 0)
            result["current_page"] = data.get("pageNum", 1)
            page_size = data.get("pageSize", 20)
            result["total_pages"] = (result["total_comments"] + page_size - 1) // page_size
            
            # 解析评论列表
            for item in comments:
                comment = self.parse_comment_item(item)
                if comment:
                    result["comments"].append(comment)
            
            return result

5. 评论情感分析工具

对评论内容进行情感倾向分析,判断好评、中评、差评:

python

运行

    import jieba
    import jieba.analyse
    from snownlp import SnowNLP
     
    class CommentSentimentAnalyzer:
        """评论情感分析工具"""
        
        def __init__(self):
            # 加载情感分析所需的词典
            jieba.initialize()
        
        def get_sentiment_score(self, comment_text):
            """获取评论情感得分(0-1,越高越正面)"""
            if not comment_text:
                return 0.5  # 无内容默认中性
            try:
                return SnowNLP(comment_text).sentiments
            except:
                return 0.5
        
        def analyze_sentiment(self, comment_text):
            """分析评论情感倾向"""
            score = self.get_sentiment_score(comment_text)
            if score >= 0.7:
                return "positive", score
            elif score <= 0.3:
                return "negative", score
            else:
                return "neutral", score
        
        def extract_keywords(self, comment_text, top_k=5):
            """提取评论关键词"""
            if not comment_text:
                return []
            try:
                return jieba.analyse.extract_tags(comment_text, topK=top_k)
            except:
                return []
        
        def process_comments(self, comments):
            """批量处理评论,添加情感分析结果"""
            processed = []
            for comment in comments:
                sentiment, score = self.analyze_sentiment(comment["comment_text"])
                keywords = self.extract_keywords(comment["comment_text"])
                
                processed_comment = comment.copy()
                processed_comment["sentiment"] = sentiment
                processed_comment["sentiment_score"] = round(score, 4)
                processed_comment["keywords"] = keywords
                
                processed.append(processed_comment)
            return processed

三、完整评论采集服务封装

将上述组件整合为完整的评论采集服务:

python

运行

    class TaobaoCommentService:
        """淘宝商品评论采集服务"""
        
        def __init__(self, proxy_pool=None):
            self.product_id_parser = ProductIdParser()
            self.params_generator = CommentParamsGenerator()
            self.requester = CommentRequester(proxy_pool=proxy_pool)
            self.parser = CommentParser()
            self.sentiment_analyzer = CommentSentimentAnalyzer()
        
        def get_comments(self, product_url, max_pages=5, sort="default", analyze_sentiment=True):
            """
            采集商品评论
            
            :param product_url: 商品详情页URL
            :param max_pages: 最大采集页数
            :param sort: 评论排序方式
            :param analyze_sentiment: 是否进行情感分析
            :return: 包含评论数据的字典
            """
            # 1. 获取商品ID
            print("正在解析商品ID...")
            item_id = self.product_id_parser.get_product_id(product_url)
            if not item_id:
                print("无法获取商品ID,采集失败")
                return None
            print(f"获取商品ID成功: {item_id}")
            
            all_comments = []
            current_page = 1
            total_pages = 1
            
            # 2. 分页采集评论
            while current_page <= max_pages and current_page <= total_pages:
                print(f"正在采集第 {current_page}/{max_pages} 页评论...")
                
                # 生成请求参数
                params = self.params_generator.generate_params(
                    item_id=item_id,
                    page=current_page,
                    sort=sort
                )
                
                # 发送请求
                response_text = self.requester.fetch_comments(params)
                if not response_text:
                    print(f"第 {current_page} 页评论获取失败,跳过该页")
                    current_page += 1
                    continue
                
                # 解析评论
                result = self.parser.parse_comments(response_text)
                if not result:
                    print(f"第 {current_page} 页评论解析失败,跳过该页")
                    current_page += 1
                    continue
                
                # 更新总页数
                total_pages = result["total_pages"]
                # 添加到结果列表
                all_comments.extend(result["comments"])
                
                print(f"第 {current_page} 页解析完成,获取 {len(result['comments'])} 条评论")
                
                # 检查是否已采集所有评论
                if len(all_comments) >= result["total_comments"]:
                    print("已获取全部评论,停止采集")
                    break
                
                current_page += 1
            
            # 3. 情感分析
            if analyze_sentiment and all_comments:
                print("正在进行评论情感分析...")
                all_comments = self.sentiment_analyzer.process_comments(all_comments)
            
            # 4. 返回结果
            return {
                "item_id": item_id,
                "product_url": product_url,
                "total_collected": len(all_comments),
                "total_available": result["total_comments"] if result else 0,
                "pages_collected": current_page - 1,
                "comments": all_comments
            }

四、使用示例与数据存储
1. 基本使用示例

python

运行

    def main():
        # 代理池(实际使用中替换为有效代理)
        proxy_pool = [
            # "http://123.123.123.123:8080",
            # "http://111.111.111.111:8888"
        ]
        
        # 初始化评论采集服务
        comment_service = TaobaoCommentService(proxy_pool=proxy_pool)
        
        # 商品详情页URL
        product_url = "https://item.taobao.com/item.htm?id=1234567890"  # 替换为实际商品URL
        
        # 采集评论(最多3页,按最新排序,进行情感分析)
        result = comment_service.get_comments(
            product_url=product_url,
            max_pages=3,
            sort="latest",
            analyze_sentiment=True
        )
        
        # 处理采集结果
        if result:
            print(f"\n采集完成!共获取 {result['total_collected']} 条评论")
            
            # 打印部分结果
            if result["comments"]:
                print("\n前3条评论摘要:")
                for i, comment in enumerate(result["comments"][:3], 1):
                    print(f"{i}. {comment['comment_text'][:50]}...")
                    print(f"   情感倾向:{comment['sentiment']}(得分:{comment['sentiment_score']})")
                    print(f"   关键词:{','.join(comment['keywords'])}")
                    print(f"   发布时间:{comment['comment_time']}\n")
        else:
            print("评论采集失败")
     
    if __name__ == "__main__":
        main()

2. 评论数据存储工具

将采集的评论数据存储为 JSON 和 CSV 格式:

python

运行

    import json
    import csv
    from pathlib import Path
    from datetime import datetime
     
    class CommentStorage:
        """评论数据存储工具"""
        
        def __init__(self, storage_dir="./taobao_comments"):
            self.storage_dir = Path(storage_dir)
            self.storage_dir.mkdir(exist_ok=True, parents=True)
        
        def save_to_json(self, comment_data):
            """保存评论数据到JSON文件"""
            item_id = comment_data["item_id"]
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"comments_{item_id}_{timestamp}.json"
            file_path = self.storage_dir / filename
            
            with open(file_path, "w", encoding="utf-8") as f:
                json.dump(comment_data, f, ensure_ascii=False, indent=2, default=str)
            
            print(f"评论数据已保存至JSON文件:{file_path}")
            return file_path
        
        def save_to_csv(self, comment_data):
            """保存评论数据到CSV文件"""
            if not comment_data["comments"]:
                print("没有评论数据可保存到CSV")
                return None
                
            item_id = comment_data["item_id"]
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"comments_{item_id}_{timestamp}.csv"
            file_path = self.storage_dir / filename
            
            # 评论字段
            fields = [
                "comment_id", "user_nick", "user_level", "comment_text",
                "comment_time", "star_rating", "praise_count", "reply_count",
                "has_image", "sentiment", "sentiment_score", "keywords"
            ]
            
            with open(file_path, "w", encoding="utf-8-sig", newline="") as f:
                writer = csv.DictWriter(f, fieldnames=fields)
                writer.writeheader()
                
                for comment in comment_data["comments"]:
                    # 处理嵌套字段
                    row = {k: comment.get(k, "") for k in fields}
                    # 关键词列表转为字符串
                    if "keywords" in row and isinstance(row["keywords"], list):
                        row["keywords"] = ",".join(row["keywords"])
                    # 时间转为字符串
                    if isinstance(row["comment_time"], datetime):
                        row["comment_time"] = row["comment_time"].strftime("%Y-%m-%d %H:%M:%S")
                    
                    writer.writerow(row)
            
            print(f"评论数据已保存至CSV文件:{file_path}")
            return file_path

五、进阶优化与合规提示
1. 系统优化策略

    评论缓存机制:对已采集的商品评论进行缓存,避免重复请求

    python

运行

    def get_cached_comments(self, item_id, max_age=3600):
        """从缓存获取评论(实际实现需结合Redis或文件缓存)"""
        # 缓存逻辑实现...
        return None

 

    分布式采集:大规模采集时采用分布式架构,分散请求压力

    动态调整策略:根据反爬强度动态调整请求间隔和代理使用频率

2. 合规与风险提示

    商业应用前必须获得平台授权,遵守《电子商务法》相关规定
    评论数据不得用于生成与原平台竞争的产品或服务
    避免采集包含用户隐私的评论内容(如手机号、地址等)
    当平台明确限制评论采集时,应立即停止相关操作
    采集行为不得对平台正常运营造成影响,尊重 robots 协议限制

通过本文提供的技术方案,可构建一套功能完善的淘宝商品评论接口系统。该方案注重合规性和可扩展性,能够有效应对电商平台的反爬机制,为商品分析、用户反馈挖掘等场景提供数据支持。在实际应用中,需根据平台规则动态调整策略,确保系统的稳定性和合法性。

群贤毕至

访客