京东商品评论接口作为获取用户反馈的核心渠道,包含了丰富的用户评价数据、评分分布、晒图信息等关键内容。与商品详情接口不同,评论接口具有数据量大、更新频繁、情感倾向明显等特点,在竞品分析、产品优化、用户画像构建等场景中具有重要价值。本文将从接口权限配置、分页数据采集、评论内容解析到情感分析,结合完整代码示例,详解京东商品评论接口的开发实战,帮助开发者合规高效地利用评论数据。
一、接口特性与核心数据结构
京东商品评论接口(
jd.comment.list.get
)具有鲜明的 UGC(用户生成内容)特性,主要特点包括:多层级评论体系:包含主评论、追评、回复等层级结构
多维度评分数据:涵盖商品质量、物流速度、服务态度等细分评分
丰富的多媒体内容:支持图片、视频等富媒体评论内容获取
用户行为标签:包含是否匿名、是否认证、购买属性等用户标签
核心参数解析
参数类别 | 具体参数 | 作用说明 | 约束条件 |
---|---|---|---|
基础参数 | app_key | 应用标识 | 开放平台注册获取 |
method | 接口方法名 | 固定为jd.comment.list.get | |
timestamp | 时间戳 | 格式 yyyy-MM-dd HH:mm:ss,误差≤5 分钟 | |
sign | 签名 | HMAC-SHA256 加密 | |
业务参数 | sku_id | 商品 ID | 必选,京东商品唯一标识 |
page | 页码 | 正整数,默认 1 | |
page_size | 每页条数 | 1-100,默认 20 | |
筛选参数 | score | 评分筛选 | 0 - 全部,1 - 差评,2 - 中评,3 - 好评 |
sort_type | 排序方式 | 0 - 推荐,1 - 时间正序,2 - 时间倒序 | |
has_pic | 是否有图 | 0 - 全部,1 - 有图 |
响应数据结构
接口返回数据包含评论列表和统计信息,核心结构如下:
json
{ "jd_comment_list_get_response": { "result": { "total": 1568, // 总评论数 "page": 1, "page_size": 20, "score_count": { // 评分分布 "total": 1568, "good": 1320, "general": 156, "poor": 92 }, "comment_average": 4.8, // 平均评分 "comments": [ { "id": "1234567890123", // 评论ID "user_nickname": "jd_123456", // 用户名 "is_anonymous": true, // 是否匿名 "user_level": 5, // 用户等级 "score": 5, // 评分 "content": "商品质量很好,物流也快,非常满意", // 评论内容 "creation_time": "2024-08-01 15:30:22", // 评论时间 "product_attributes": "颜色:白色;版本:标准版", // 购买属性 "reply_count": 2, // 回复数 "useful_vote_count": 35, // 有用数 "pictures": [ // 评论图片 "https://img10.360buyimg.com/comment/jfs/t1/12345/67/8901/234567/abcdef.jpg" ], "after_comment": { // 追评 "content": "用了一周,效果不错,值得购买", "creation_time": "2024-08-08 10:15:33" }, "score_details": { // 详细评分 "product_quality": 5, "logistics_speed": 5, "service_attitude": 4 } } // 更多评论... ] } }}
点击获取key和secret
二、开发环境与权限配置
环境要求
开发语言:Python 3.8+
核心依赖:
requests
(HTTP 请求)、pandas
(数据处理)、snownlp
(情感分析)开发工具:PyCharm/VS Code
运行环境:支持 Windows/macOS/Linux,需联网访问京东开放平台
依赖安装
bash
pip install requests pandas snownlp redis pycryptodome
权限申请特殊说明
京东评论接口权限申请比普通接口更严格,需注意:
开发者账号需完成企业认证(个人开发者难以获取权限)
需提供详细的使用场景说明,说明评论数据的具体用途
接口调用限制更严格:默认 50 次 / 分钟,日调用上限 10000 次
不得将评论数据用于商业竞争或负面营销
三、接口开发实战实现
步骤 1:签名生成与基础工具
复用京东统一的 HMAC-SHA256 签名算法,并扩展评论专用工具:
python
运行
import timeimport hashlibimport hmacimport urllib.parseimport logging# 配置日志logging.basicConfig( filename='jd_comment_api.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')def generate_jd_sign(params: dict, app_secret: str) -> str: """生成京东接口签名""" sorted_params = sorted(params.items(), key=lambda x: x[0]) query_string = urllib.parse.urlencode(sorted_params) signature = hmac.new( app_secret.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256 ).hexdigest().upper() return signaturedef validate_comment_params(params: dict) -> tuple[bool, str]: """验证评论接口参数""" required = ["sku_id", "page", "page_size"] for param in required: if param not in params: return False, f"缺少必选参数: {param}" # 验证分页参数 if not isinstance(params["page"], int) or params["page"] < 1: return False, "page必须是正整数" if not isinstance(params["page_size"], int) or not (1 <= params["page_size"] <= 100): return False, "page_size必须是1-100的整数" # 验证筛选参数 if "score" in params and params["score"] not in [0, 1, 2, 3]: return False, "score必须是0-3的整数" if "has_pic" in params and params["has_pic"] not in [0, 1]: return False, "has_pic必须是0或1" return True, "参数验证通过"
步骤 2:评论接口客户端封装
python
运行
import requestsimport jsonfrom typing import Dict, Optional, List, Anyclass JdCommentAPI: def __init__(self, app_key: str, app_secret: str): self.app_key = app_key self.app_secret = app_secret self.api_url = "https://api.jd.com/routerjson" self.max_retries = 3 # 最大重试次数 def get_comment_list(self, sku_id: str, page: int = 1, page_size: int = 20, score: int = 0, sort_type: int = 2, has_pic: int = 0) -> Optional[Dict[str, Any]]: """ 获取京东商品评论列表 :param sku_id: 商品SKU ID :param page: 页码 :param page_size: 每页条数 :param score: 评分筛选 0-全部 1-差评 2-中评 3-好评 :param sort_type: 排序方式 0-推荐 1-时间正序 2-时间倒序 :param has_pic: 是否有图 0-全部 1-有图 :return: 评论数据字典 """ # 参数验证 params = { "sku_id": sku_id, "page": page, "page_size": page_size, "score": score, "sort_type": sort_type, "has_pic": has_pic } valid, msg = validate_comment_params(params) if not valid: logging.error(f"参数错误: {msg}") return None # 构建完整参数 full_params = { "method": "jd.comment.list.get", "app_key": self.app_key, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "format": "json", "v": "2.0",** params } # 生成签名 full_params["sign"] = generate_jd_sign(full_params, self.app_secret) try: # 发送请求 response = requests.post( self.api_url, data=full_params, headers={"Content-Type": "application/x-www-form-urlencoded"}, timeout=15 ) response.raise_for_status() result = json.loads(response.text) # 错误处理 if "error_response" in result: error = result["error_response"] logging.error(f"接口错误: {error.get('msg')} (错误码: {error.get('code')})") return None return result.get("jd_comment_list_get_response", {}).get("result", {}) except requests.exceptions.RequestException as e: logging.error(f"HTTP请求异常: {str(e)}") return None except json.JSONDecodeError as e: logging.error(f"JSON解析错误: {str(e)}") return None
步骤 3:评论数据解析与结构化
python
运行
import pandas as pdfrom datetime import datetimedef parse_comment_data(raw_data: Dict) -> tuple[pd.DataFrame, Dict]: """ 解析评论数据为结构化格式 :param raw_data: 接口返回的原始数据 :return: 评论DataFrame和统计信息 """ if not raw_data or "comments" not in raw_data: return pd.DataFrame(), {} # 提取统计信息 stats = { "total_comments": raw_data.get("total", 0), "page": raw_data.get("page", 1), "page_size": raw_data.get("page_size", 20), "total_pages": (raw_data.get("total", 0) + raw_data.get("page_size", 20) - 1) // raw_data.get("page_size", 20), "average_score": raw_data.get("comment_average", 0), "score_distribution": raw_data.get("score_count", {}) } # 解析评论列表 comments = [] for item in raw_data["comments"]: # 处理追评 after_comment = item.get("after_comment", {}) # 处理详细评分 score_details = item.get("score_details", {}) comment_info = { "comment_id": item.get("id"), "user_nickname": item.get("user_nickname"), "is_anonymous": item.get("is_anonymous", False), "user_level": item.get("user_level", 0), "score": item.get("score", 0), "content": item.get("content", ""), "creation_time": item.get("creation_time"), "product_attributes": item.get("product_attributes", ""), "reply_count": item.get("reply_count", 0), "useful_vote_count": item.get("useful_vote_count", 0), "pic_count": len(item.get("pictures", [])), "has_after_comment": 1 if after_comment else 0, "after_content": after_comment.get("content", ""), "after_time": after_comment.get("creation_time"), "product_quality_score": score_details.get("product_quality", 0), "logistics_score": score_details.get("logistics_speed", 0), "service_score": score_details.get("service_attitude", 0) } comments.append(comment_info) # 转换为DataFrame df = pd.DataFrame(comments) # 数据类型转换 if not df.empty: df["is_anonymous"] = df["is_anonymous"].astype(bool) df["user_level"] = df["user_level"].astype(int) df["score"] = df["score"].astype(int) df["reply_count"] = df["reply_count"].astype(int) df["useful_vote_count"] = df["useful_vote_count"].astype(int) df["pic_count"] = df["pic_count"].astype(int) df["has_after_comment"] = df["has_after_comment"].astype(bool) df["creation_time"] = pd.to_datetime(df["creation_time"], errors="coerce") df["after_time"] = pd.to_datetime(df["after_time"], errors="coerce") return df, stats
步骤 4:评论数据批量采集工具
python
运行
import timefrom tqdm import tqdmclass JdCommentCrawler: def __init__(self, api: JdCommentAPI): self.api = api self.delay_seconds = 1 # 请求间隔,避免触发限流 def crawl_comments(self, sku_id: str, max_pages: int = 10, score: int = 0, has_pic: int = 0) -> tuple[pd.DataFrame, Dict]: """ 批量采集商品评论 :param sku_id: 商品SKU ID :param max_pages: 最大采集页数 :param score: 评分筛选 :param has_pic: 是否有图 :return: 合并后的评论DataFrame和总统计信息 """ all_comments = [] total_stats = None # 获取第一页数据,确定总页数 first_page = self.api.get_comment_list( sku_id=sku_id, page=1, page_size=100, score=score, has_pic=has_pic, sort_type=2 # 按时间倒序,获取最新评论 ) if not first_page: return pd.DataFrame(), {} first_df, first_stats = parse_comment_data(first_page) all_comments.append(first_df) total_stats = first_stats # 计算需要采集的总页数 actual_max_pages = min(max_pages, first_stats.get("total_pages", 1)) if actual_max_pages <= 1: return pd.concat(all_comments, ignore_index=True), total_stats # 采集剩余页数 for page in tqdm(range(2, actual_max_pages + 1), desc="采集评论"): # 控制请求频率 time.sleep(self.delay_seconds) page_data = self.api.get_comment_list( sku_id=sku_id, page=page, page_size=100, score=score, has_pic=has_pic, sort_type=2 ) if page_data: page_df, _ = parse_comment_data(page_data) all_comments.append(page_df) else: logging.warning(f"第{page}页评论采集失败,跳过") # 合并所有数据 combined_df = pd.concat(all_comments, ignore_index=True) return combined_df, total_stats
步骤 5:评论情感分析与挖掘
python
运行
from snownlp import SnowNLPimport refrom collections import Counterdef clean_comment_text(text: str) -> str: """清洗评论文本""" if not text: return "" # 移除特殊字符和HTML标签 text = re.sub(r'<[^>]*>', '', text) text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s]', ' ', text) # 去除多余空格 return re.sub(r'\s+', ' ', text).strip()def analyze_sentiment(text: str) -> Dict: """分析评论情感倾向""" if not text: return {"sentiment": 0.5, "positive": False} s = SnowNLP(text) sentiment_score = s.sentiment # 0-1之间,越大越积极 return { "sentiment_score": round(sentiment_score, 4), "is_positive": sentiment_score >= 0.6, "is_negative": sentiment_score <= 0.4 }def extract_keywords(text: str, top_n: int = 5) -> List[str]: """提取评论关键词""" if not text: return [] s = SnowNLP(text) # 提取关键词 keywords = s.keywords(top_n) return [str(keyword) for keyword in keywords]def process_comment_analysis(comment_df: pd.DataFrame) -> pd.DataFrame: """处理评论情感分析和关键词提取""" if comment_df.empty: return comment_df df = comment_df.copy() # 清洗评论内容 df["clean_content"] = df["content"].apply(clean_comment_text) # 情感分析 sentiment_results = df["clean_content"].apply(analyze_sentiment) df["sentiment_score"] = [res["sentiment_score"] for res in sentiment_results] df["is_positive"] = [res["is_positive"] for res in sentiment_results] df["is_negative"] = [res["is_negative"] for res in sentiment_results] # 提取关键词 df["keywords"] = df["clean_content"].apply(lambda x: extract_keywords(x, top_n=3)) return dfdef generate_comment_report(analysis_df: pd.DataFrame) -> Dict: """生成评论分析报告""" if analysis_df.empty: return {} # 总体情感分布 sentiment_dist = { "positive_ratio": round(analysis_df["is_positive"].mean() * 100, 2), "negative_ratio": round(analysis_df["is_negative"].mean() * 100, 2), "neutral_ratio": round((1 - analysis_df["is_positive"].mean() - analysis_df["is_negative"].mean()) * 100, 2), "avg_sentiment_score": round(analysis_df["sentiment_score"].mean(), 4) } # 高频关键词 all_keywords = [] for keywords in analysis_df["keywords"]: all_keywords.extend(keywords) top_keywords = [kw for kw, _ in Counter(all_keywords).most_common(10)] # 评分与情感相关性 score_correlation = analysis_df[["score", "sentiment_score"]].corr().iloc[0, 1] return { "sentiment_distribution": sentiment_dist, "top_keywords": top_keywords, "score_correlation": round(score_correlation, 4), "comment_count": len(analysis_df), "pic_comment_ratio": round(analysis_df[analysis_df["pic_count"] > 0].shape[0] / len(analysis_df) * 100, 2), "after_comment_ratio": round(analysis_df["has_after_comment"].mean() * 100, 2) }
步骤 6:完整调用与分析示例
python
运行
if __name__ == "__main__": # 配置应用信息(替换为实际值) APP_KEY = "your_app_key_here" APP_SECRET = "your_app_secret_here" # 初始化API客户端和采集器 comment_api = JdCommentAPI(APP_KEY, APP_SECRET) crawler = JdCommentCrawler(comment_api) # 目标商品SKU ID TARGET_SKU_ID = "100012345678" # 替换为实际SKU ID # 1. 批量采集评论数据(最多采集5页) print(f"开始采集商品 {TARGET_SKU_ID} 的评论数据...") comment_df, stats = crawler.crawl_comments( sku_id=TARGET_SKU_ID, max_pages=5, score=0 # 0-全部评论,1-差评,2-中评,3-好评 ) if comment_df.empty: print("未采集到评论数据") else: print(f"评论采集完成,共获取 {len(comment_df)} 条评论") print(f"总体评分: {stats.get('average_score', 0)} 分") print(f"评分分布: 好评 {stats['score_distribution'].get('good', 0)} 条, " f"中评 {stats['score_distribution'].get('general', 0)} 条, " f"差评 {stats['score_distribution'].get('poor', 0)} 条") # 2. 保存原始评论数据 comment_df.to_csv(f"jd_comments_{TARGET_SKU_ID}_raw.csv", index=False, encoding="utf-8-sig") print(f"原始评论数据已保存至 jd_comments_{TARGET_SKU_ID}_raw.csv") # 3. 评论情感分析 print("\n开始进行评论情感分析...") analysis_df = process_comment_analysis(comment_df) # 4. 生成分析报告 report = generate_comment_report(analysis_df) print("\n===== 评论分析报告 =====") print(f"情感分布: 正面 {report['sentiment_distribution']['positive_ratio']}%, " f"负面 {report['sentiment_distribution']['negative_ratio']}%, " f"中性 {report['sentiment_distribution']['neutral_ratio']}%") print(f"平均情感得分: {report['sentiment_distribution']['avg_sentiment_score']}") print(f"带图评论比例: {report['pic_comment_ratio']}%") print(f"追评比例: {report['after_comment_ratio']}%") print(f"热门关键词: {', '.join(report['top_keywords'])}") # 5. 保存分析结果 analysis_df.to_csv(f"jd_comments_{TARGET_SKU_ID}_analysis.csv", index=False, encoding="utf-8-sig") print(f"\n评论分析结果已保存至 jd_comments_{TARGET_SKU_ID}_analysis.csv") # 6. 展示部分负面评论 negative_comments = analysis_df[analysis_df["is_negative"]][ ["content", "score", "sentiment_score"] ].head(5) if not negative_comments.empty: print("\n===== 典型负面评论 =====") for idx, row in negative_comments.iterrows(): print(f"评分: {row['score']}分, 情感得分: {row['sentiment_score']}") print(f"内容: {row['content'][:100]}...\n")
四、缓存与性能优化策略
评论数据缓存实现
python
运行
import redisimport picklefrom datetime import timedeltaclass CachedJdCommentAPI(JdCommentAPI): def __init__(self, app_key, app_secret, redis_host="localhost", redis_port=6379): super().__init__(app_key, app_secret) self.redis = redis.Redis(host=redis_host, port=redis_port, db=0) # 评论数据缓存时间较短,因为会不断更新 self.cache_ttl = 300 # 5分钟 def get_cache_key(self, sku_id: str, **params) -> str: """生成缓存键""" sorted_params = sorted(params.items()) params_str = "_".join([f"{k}_{v}" for k, v in sorted_params]) return f"jd_comments:{sku_id}:{params_str}" def get_comment_list(self,** kwargs) -> Optional[Dict]: """带缓存的评论列表获取""" sku_id = kwargs.get("sku_id") cache_key = self.get_cache_key(sku_id, **kwargs) # 尝试从缓存获取 cached_data = self.redis.get(cache_key) if cached_data: return pickle.loads(cached_data) # 缓存未命中,调用接口 comment_data = super().get_comment_list(** kwargs) # 存入缓存 if comment_data: self.redis.setex(cache_key, timedelta(seconds=self.cache_ttl), pickle.dumps(comment_data)) return comment_data
增量采集策略
python
运行
def incremental_crawl_comments(crawler: JdCommentCrawler, sku_id: str, last_time: datetime) -> pd.DataFrame: """ 增量采集新评论 :param crawler: 评论采集器 :param sku_id: 商品ID :param last_time: 上次采集时间 :return: 新增评论DataFrame """ new_comments = [] page = 1 while True: # 获取当前页评论 page_data = crawler.api.get_comment_list( sku_id=sku_id, page=page, page_size=100, sort_type=2 # 按时间倒序 ) if not page_data or "comments" not in page_data: break page_df, _ = parse_comment_data(page_data) if page_df.empty: break # 筛选出上次采集之后的评论 mask = page_df["creation_time"] > last_time if mask.any(): new_comments.append(page_df[mask]) # 如果本页还有更早的评论,继续采集 if page_df[~mask].empty: page += 1 time.sleep(1) else: break # 已到上次采集时间点,停止采集 else: break # 没有新评论了 if not new_comments: return pd.DataFrame() return pd.concat(new_comments, ignore_index=True)
五、常见问题与解决方案
接口调用错误处理
错误码 | 错误信息 | 解决方案 |
---|---|---|
1001 | 签名错误 | 检查签名生成逻辑,确保参数排序正确,时间戳有效 |
1003 | 权限不足 | 确认已申请评论接口权限,企业认证是否通过 |
2001 | SKU 不存在或无评论 | 验证 sku_id 是否正确,商品可能为新品暂无评论 |
429 | 调用频率超限 | 增加请求间隔,优化缓存策略,使用增量采集 |
502 | 服务暂时不可用 | 实现重试机制,记录错误日志,稍后再试 |
110 | IP 不在白名单 | 在京东开放平台添加服务器 IP 到白名单 |
数据采集优化建议
- 分时段采集:python
运行
运行
def filter_abnormal_comments(df: pd.DataFrame) -> pd.DataFrame: """过滤异常评论(如广告、重复内容)""" if df.empty: return df # 过滤过短评论(可能为无效内容) df = df[df["content"].str.len() >= 5] # 过滤重复评论 df = df.drop_duplicates(subset=["content"]) # 过滤包含广告特征的评论 ad_patterns = ["微信", "电话", "网址", "购买链接", "私聊"] def has_ad(text): return any(pattern in str(text) for pattern in ad_patterns) df = df[~df["content"].apply(has_ad)] return df
六、合规使用与场景应用
合规使用规范
- 数据使用限制:
不得展示完整评论内容,可展示脱敏后的片段或统计结果
评论图片不得用于商业用途,需保留原始水印
不得将评论数据用于攻击竞争对手或商品
- 调用规范:
单 SKU 每日采集不超过 1000 条评论
必须在应用中注明 "评论数据来源:京东开放平台"
不得对评论数据进行恶意修改或误导性展示
典型应用场景
- 产品优化分析:
通过评论关键词提取和情感分析,识别产品优缺点,为产品迭代提供依据 - 竞品对比系统:
采集同类商品评论数据,对比分析用户对不同品牌的评价差异 - 用户反馈监控:
实时监控新评论,及时发现产品质量问题或服务投诉 - 智能客服辅助:
基于评论数据构建常见问题库,提升客服响应效率
京东商品评论接口提供了宝贵的用户反馈数据,通过本文介绍的技术方案,开发者可以构建高效的评论采集与分析系统。在实际应用中,应特别注意合规使用,控制调用频率,同时结合缓存和增量采集策略提升性能,让评论数据真正为业务决策提供支持。
def is_peak_hour() -> bool: """判断是否为接口调用高峰期""" hour = datetime.now().hour # 避开京东接口高峰期(假设9:00-12:00, 15:00-18:00为高峰期) return (9 <= hour < 12) or (15 <= hour < 18)def get_optimal_delay() -> float: """根据时段获取最优请求间隔""" return 2.0 if is_peak_hour() else 1.0
- 异常评论过滤:python