京东商品评论接口作为获取用户反馈的核心渠道,包含了丰富的用户评价数据、评分分布、晒图信息等关键内容。与商品详情接口不同,评论接口具有数据量大、更新频繁、情感倾向明显等特点,在竞品分析、产品优化、用户画像构建等场景中具有重要价值。本文将从接口权限配置、分页数据采集、评论内容解析到情感分析,结合完整代码示例,详解京东商品评论接口的开发实战,帮助开发者合规高效地利用评论数据。
一、接口特性与核心数据结构
京东商品评论接口(
jd.comment.list.get)具有鲜明的 UGC(用户生成内容)特性,主要特点包括:多层级评论体系:包含主评论、追评、回复等层级结构
多维度评分数据:涵盖商品质量、物流速度、服务态度等细分评分
丰富的多媒体内容:支持图片、视频等富媒体评论内容获取
用户行为标签:包含是否匿名、是否认证、购买属性等用户标签
核心参数解析
| 参数类别 | 具体参数 | 作用说明 | 约束条件 |
|---|---|---|---|
| 基础参数 | app_key | 应用标识 | 开放平台注册获取 |
method | 接口方法名 | 固定为jd.comment.list.get | |
timestamp | 时间戳 | 格式 yyyy-MM-dd HH:mm:ss,误差≤5 分钟 | |
sign | 签名 | HMAC-SHA256 加密 | |
| 业务参数 | sku_id | 商品 ID | 必选,京东商品唯一标识 |
page | 页码 | 正整数,默认 1 | |
page_size | 每页条数 | 1-100,默认 20 | |
| 筛选参数 | score | 评分筛选 | 0 - 全部,1 - 差评,2 - 中评,3 - 好评 |
sort_type | 排序方式 | 0 - 推荐,1 - 时间正序,2 - 时间倒序 | |
has_pic | 是否有图 | 0 - 全部,1 - 有图 |
响应数据结构
接口返回数据包含评论列表和统计信息,核心结构如下:
json
{
"jd_comment_list_get_response": {
"result": {
"total": 1568, // 总评论数
"page": 1,
"page_size": 20,
"score_count": { // 评分分布
"total": 1568,
"good": 1320,
"general": 156,
"poor": 92
},
"comment_average": 4.8, // 平均评分
"comments": [
{
"id": "1234567890123", // 评论ID
"user_nickname": "jd_123456", // 用户名
"is_anonymous": true, // 是否匿名
"user_level": 5, // 用户等级
"score": 5, // 评分
"content": "商品质量很好,物流也快,非常满意", // 评论内容
"creation_time": "2024-08-01 15:30:22", // 评论时间
"product_attributes": "颜色:白色;版本:标准版", // 购买属性
"reply_count": 2, // 回复数
"useful_vote_count": 35, // 有用数
"pictures": [ // 评论图片
"https://img10.360buyimg.com/comment/jfs/t1/12345/67/8901/234567/abcdef.jpg"
],
"after_comment": { // 追评
"content": "用了一周,效果不错,值得购买",
"creation_time": "2024-08-08 10:15:33"
},
"score_details": { // 详细评分
"product_quality": 5,
"logistics_speed": 5,
"service_attitude": 4
}
}
// 更多评论...
]
}
}}点击获取key和secret
二、开发环境与权限配置
环境要求
开发语言:Python 3.8+
核心依赖:
requests(HTTP 请求)、pandas(数据处理)、snownlp(情感分析)开发工具:PyCharm/VS Code
运行环境:支持 Windows/macOS/Linux,需联网访问京东开放平台
依赖安装
bash
pip install requests pandas snownlp redis pycryptodome
权限申请特殊说明
京东评论接口权限申请比普通接口更严格,需注意:
开发者账号需完成企业认证(个人开发者难以获取权限)
需提供详细的使用场景说明,说明评论数据的具体用途
接口调用限制更严格:默认 50 次 / 分钟,日调用上限 10000 次
不得将评论数据用于商业竞争或负面营销
三、接口开发实战实现
步骤 1:签名生成与基础工具
复用京东统一的 HMAC-SHA256 签名算法,并扩展评论专用工具:
python
运行
import timeimport hashlibimport hmacimport urllib.parseimport logging# 配置日志logging.basicConfig(
filename='jd_comment_api.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')def generate_jd_sign(params: dict, app_secret: str) -> str:
"""生成京东接口签名"""
sorted_params = sorted(params.items(), key=lambda x: x[0])
query_string = urllib.parse.urlencode(sorted_params)
signature = hmac.new(
app_secret.encode('utf-8'),
query_string.encode('utf-8'),
hashlib.sha256 ).hexdigest().upper()
return signaturedef validate_comment_params(params: dict) -> tuple[bool, str]:
"""验证评论接口参数"""
required = ["sku_id", "page", "page_size"]
for param in required:
if param not in params:
return False, f"缺少必选参数: {param}"
# 验证分页参数
if not isinstance(params["page"], int) or params["page"] < 1:
return False, "page必须是正整数"
if not isinstance(params["page_size"], int) or not (1 <= params["page_size"] <= 100):
return False, "page_size必须是1-100的整数"
# 验证筛选参数
if "score" in params and params["score"] not in [0, 1, 2, 3]:
return False, "score必须是0-3的整数"
if "has_pic" in params and params["has_pic"] not in [0, 1]:
return False, "has_pic必须是0或1"
return True, "参数验证通过"步骤 2:评论接口客户端封装
python
运行
import requestsimport jsonfrom typing import Dict, Optional, List, Anyclass JdCommentAPI:
def __init__(self, app_key: str, app_secret: str):
self.app_key = app_key
self.app_secret = app_secret
self.api_url = "https://api.jd.com/routerjson"
self.max_retries = 3 # 最大重试次数
def get_comment_list(self,
sku_id: str,
page: int = 1,
page_size: int = 20,
score: int = 0,
sort_type: int = 2,
has_pic: int = 0) -> Optional[Dict[str, Any]]:
"""
获取京东商品评论列表
:param sku_id: 商品SKU ID
:param page: 页码
:param page_size: 每页条数
:param score: 评分筛选 0-全部 1-差评 2-中评 3-好评
:param sort_type: 排序方式 0-推荐 1-时间正序 2-时间倒序
:param has_pic: 是否有图 0-全部 1-有图
:return: 评论数据字典
"""
# 参数验证
params = {
"sku_id": sku_id,
"page": page,
"page_size": page_size,
"score": score,
"sort_type": sort_type,
"has_pic": has_pic }
valid, msg = validate_comment_params(params)
if not valid:
logging.error(f"参数错误: {msg}")
return None
# 构建完整参数
full_params = {
"method": "jd.comment.list.get",
"app_key": self.app_key,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"format": "json",
"v": "2.0",** params }
# 生成签名
full_params["sign"] = generate_jd_sign(full_params, self.app_secret)
try:
# 发送请求
response = requests.post(
self.api_url,
data=full_params,
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=15
)
response.raise_for_status()
result = json.loads(response.text)
# 错误处理
if "error_response" in result:
error = result["error_response"]
logging.error(f"接口错误: {error.get('msg')} (错误码: {error.get('code')})")
return None
return result.get("jd_comment_list_get_response", {}).get("result", {})
except requests.exceptions.RequestException as e:
logging.error(f"HTTP请求异常: {str(e)}")
return None
except json.JSONDecodeError as e:
logging.error(f"JSON解析错误: {str(e)}")
return None步骤 3:评论数据解析与结构化
python
运行
import pandas as pdfrom datetime import datetimedef parse_comment_data(raw_data: Dict) -> tuple[pd.DataFrame, Dict]:
"""
解析评论数据为结构化格式
:param raw_data: 接口返回的原始数据
:return: 评论DataFrame和统计信息
"""
if not raw_data or "comments" not in raw_data:
return pd.DataFrame(), {}
# 提取统计信息
stats = {
"total_comments": raw_data.get("total", 0),
"page": raw_data.get("page", 1),
"page_size": raw_data.get("page_size", 20),
"total_pages": (raw_data.get("total", 0) + raw_data.get("page_size", 20) - 1)
// raw_data.get("page_size", 20),
"average_score": raw_data.get("comment_average", 0),
"score_distribution": raw_data.get("score_count", {})
}
# 解析评论列表
comments = []
for item in raw_data["comments"]:
# 处理追评
after_comment = item.get("after_comment", {})
# 处理详细评分
score_details = item.get("score_details", {})
comment_info = {
"comment_id": item.get("id"),
"user_nickname": item.get("user_nickname"),
"is_anonymous": item.get("is_anonymous", False),
"user_level": item.get("user_level", 0),
"score": item.get("score", 0),
"content": item.get("content", ""),
"creation_time": item.get("creation_time"),
"product_attributes": item.get("product_attributes", ""),
"reply_count": item.get("reply_count", 0),
"useful_vote_count": item.get("useful_vote_count", 0),
"pic_count": len(item.get("pictures", [])),
"has_after_comment": 1 if after_comment else 0,
"after_content": after_comment.get("content", ""),
"after_time": after_comment.get("creation_time"),
"product_quality_score": score_details.get("product_quality", 0),
"logistics_score": score_details.get("logistics_speed", 0),
"service_score": score_details.get("service_attitude", 0)
}
comments.append(comment_info)
# 转换为DataFrame
df = pd.DataFrame(comments)
# 数据类型转换
if not df.empty:
df["is_anonymous"] = df["is_anonymous"].astype(bool)
df["user_level"] = df["user_level"].astype(int)
df["score"] = df["score"].astype(int)
df["reply_count"] = df["reply_count"].astype(int)
df["useful_vote_count"] = df["useful_vote_count"].astype(int)
df["pic_count"] = df["pic_count"].astype(int)
df["has_after_comment"] = df["has_after_comment"].astype(bool)
df["creation_time"] = pd.to_datetime(df["creation_time"], errors="coerce")
df["after_time"] = pd.to_datetime(df["after_time"], errors="coerce")
return df, stats步骤 4:评论数据批量采集工具
python
运行
import timefrom tqdm import tqdmclass JdCommentCrawler:
def __init__(self, api: JdCommentAPI):
self.api = api
self.delay_seconds = 1 # 请求间隔,避免触发限流
def crawl_comments(self,
sku_id: str,
max_pages: int = 10,
score: int = 0,
has_pic: int = 0) -> tuple[pd.DataFrame, Dict]:
"""
批量采集商品评论
:param sku_id: 商品SKU ID
:param max_pages: 最大采集页数
:param score: 评分筛选
:param has_pic: 是否有图
:return: 合并后的评论DataFrame和总统计信息
"""
all_comments = []
total_stats = None
# 获取第一页数据,确定总页数
first_page = self.api.get_comment_list(
sku_id=sku_id,
page=1,
page_size=100,
score=score,
has_pic=has_pic,
sort_type=2 # 按时间倒序,获取最新评论
)
if not first_page:
return pd.DataFrame(), {}
first_df, first_stats = parse_comment_data(first_page)
all_comments.append(first_df)
total_stats = first_stats
# 计算需要采集的总页数
actual_max_pages = min(max_pages, first_stats.get("total_pages", 1))
if actual_max_pages <= 1:
return pd.concat(all_comments, ignore_index=True), total_stats
# 采集剩余页数
for page in tqdm(range(2, actual_max_pages + 1), desc="采集评论"):
# 控制请求频率
time.sleep(self.delay_seconds)
page_data = self.api.get_comment_list(
sku_id=sku_id,
page=page,
page_size=100,
score=score,
has_pic=has_pic,
sort_type=2
)
if page_data:
page_df, _ = parse_comment_data(page_data)
all_comments.append(page_df)
else:
logging.warning(f"第{page}页评论采集失败,跳过")
# 合并所有数据
combined_df = pd.concat(all_comments, ignore_index=True)
return combined_df, total_stats步骤 5:评论情感分析与挖掘
python
运行
from snownlp import SnowNLPimport refrom collections import Counterdef clean_comment_text(text: str) -> str:
"""清洗评论文本"""
if not text:
return ""
# 移除特殊字符和HTML标签
text = re.sub(r'<[^>]*>', '', text)
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s]', ' ', text)
# 去除多余空格
return re.sub(r'\s+', ' ', text).strip()def analyze_sentiment(text: str) -> Dict:
"""分析评论情感倾向"""
if not text:
return {"sentiment": 0.5, "positive": False}
s = SnowNLP(text)
sentiment_score = s.sentiment # 0-1之间,越大越积极
return {
"sentiment_score": round(sentiment_score, 4),
"is_positive": sentiment_score >= 0.6,
"is_negative": sentiment_score <= 0.4
}def extract_keywords(text: str, top_n: int = 5) -> List[str]:
"""提取评论关键词"""
if not text:
return []
s = SnowNLP(text)
# 提取关键词
keywords = s.keywords(top_n)
return [str(keyword) for keyword in keywords]def process_comment_analysis(comment_df: pd.DataFrame) -> pd.DataFrame:
"""处理评论情感分析和关键词提取"""
if comment_df.empty:
return comment_df
df = comment_df.copy()
# 清洗评论内容
df["clean_content"] = df["content"].apply(clean_comment_text)
# 情感分析
sentiment_results = df["clean_content"].apply(analyze_sentiment)
df["sentiment_score"] = [res["sentiment_score"] for res in sentiment_results]
df["is_positive"] = [res["is_positive"] for res in sentiment_results]
df["is_negative"] = [res["is_negative"] for res in sentiment_results]
# 提取关键词
df["keywords"] = df["clean_content"].apply(lambda x: extract_keywords(x, top_n=3))
return dfdef generate_comment_report(analysis_df: pd.DataFrame) -> Dict:
"""生成评论分析报告"""
if analysis_df.empty:
return {}
# 总体情感分布
sentiment_dist = {
"positive_ratio": round(analysis_df["is_positive"].mean() * 100, 2),
"negative_ratio": round(analysis_df["is_negative"].mean() * 100, 2),
"neutral_ratio": round((1 - analysis_df["is_positive"].mean() - analysis_df["is_negative"].mean()) * 100, 2),
"avg_sentiment_score": round(analysis_df["sentiment_score"].mean(), 4)
}
# 高频关键词
all_keywords = []
for keywords in analysis_df["keywords"]:
all_keywords.extend(keywords)
top_keywords = [kw for kw, _ in Counter(all_keywords).most_common(10)]
# 评分与情感相关性
score_correlation = analysis_df[["score", "sentiment_score"]].corr().iloc[0, 1]
return {
"sentiment_distribution": sentiment_dist,
"top_keywords": top_keywords,
"score_correlation": round(score_correlation, 4),
"comment_count": len(analysis_df),
"pic_comment_ratio": round(analysis_df[analysis_df["pic_count"] > 0].shape[0] / len(analysis_df) * 100, 2),
"after_comment_ratio": round(analysis_df["has_after_comment"].mean() * 100, 2)
}步骤 6:完整调用与分析示例
python
运行
if __name__ == "__main__":
# 配置应用信息(替换为实际值)
APP_KEY = "your_app_key_here"
APP_SECRET = "your_app_secret_here"
# 初始化API客户端和采集器
comment_api = JdCommentAPI(APP_KEY, APP_SECRET)
crawler = JdCommentCrawler(comment_api)
# 目标商品SKU ID
TARGET_SKU_ID = "100012345678" # 替换为实际SKU ID
# 1. 批量采集评论数据(最多采集5页)
print(f"开始采集商品 {TARGET_SKU_ID} 的评论数据...")
comment_df, stats = crawler.crawl_comments(
sku_id=TARGET_SKU_ID,
max_pages=5,
score=0 # 0-全部评论,1-差评,2-中评,3-好评
)
if comment_df.empty:
print("未采集到评论数据")
else:
print(f"评论采集完成,共获取 {len(comment_df)} 条评论")
print(f"总体评分: {stats.get('average_score', 0)} 分")
print(f"评分分布: 好评 {stats['score_distribution'].get('good', 0)} 条, "
f"中评 {stats['score_distribution'].get('general', 0)} 条, "
f"差评 {stats['score_distribution'].get('poor', 0)} 条")
# 2. 保存原始评论数据
comment_df.to_csv(f"jd_comments_{TARGET_SKU_ID}_raw.csv", index=False, encoding="utf-8-sig")
print(f"原始评论数据已保存至 jd_comments_{TARGET_SKU_ID}_raw.csv")
# 3. 评论情感分析
print("\n开始进行评论情感分析...")
analysis_df = process_comment_analysis(comment_df)
# 4. 生成分析报告
report = generate_comment_report(analysis_df)
print("\n===== 评论分析报告 =====")
print(f"情感分布: 正面 {report['sentiment_distribution']['positive_ratio']}%, "
f"负面 {report['sentiment_distribution']['negative_ratio']}%, "
f"中性 {report['sentiment_distribution']['neutral_ratio']}%")
print(f"平均情感得分: {report['sentiment_distribution']['avg_sentiment_score']}")
print(f"带图评论比例: {report['pic_comment_ratio']}%")
print(f"追评比例: {report['after_comment_ratio']}%")
print(f"热门关键词: {', '.join(report['top_keywords'])}")
# 5. 保存分析结果
analysis_df.to_csv(f"jd_comments_{TARGET_SKU_ID}_analysis.csv", index=False, encoding="utf-8-sig")
print(f"\n评论分析结果已保存至 jd_comments_{TARGET_SKU_ID}_analysis.csv")
# 6. 展示部分负面评论
negative_comments = analysis_df[analysis_df["is_negative"]][
["content", "score", "sentiment_score"]
].head(5)
if not negative_comments.empty:
print("\n===== 典型负面评论 =====")
for idx, row in negative_comments.iterrows():
print(f"评分: {row['score']}分, 情感得分: {row['sentiment_score']}")
print(f"内容: {row['content'][:100]}...\n")四、缓存与性能优化策略
评论数据缓存实现
python
运行
import redisimport picklefrom datetime import timedeltaclass CachedJdCommentAPI(JdCommentAPI):
def __init__(self, app_key, app_secret, redis_host="localhost", redis_port=6379):
super().__init__(app_key, app_secret)
self.redis = redis.Redis(host=redis_host, port=redis_port, db=0)
# 评论数据缓存时间较短,因为会不断更新
self.cache_ttl = 300 # 5分钟
def get_cache_key(self, sku_id: str, **params) -> str:
"""生成缓存键"""
sorted_params = sorted(params.items())
params_str = "_".join([f"{k}_{v}" for k, v in sorted_params])
return f"jd_comments:{sku_id}:{params_str}"
def get_comment_list(self,** kwargs) -> Optional[Dict]:
"""带缓存的评论列表获取"""
sku_id = kwargs.get("sku_id")
cache_key = self.get_cache_key(sku_id, **kwargs)
# 尝试从缓存获取
cached_data = self.redis.get(cache_key)
if cached_data:
return pickle.loads(cached_data)
# 缓存未命中,调用接口
comment_data = super().get_comment_list(** kwargs)
# 存入缓存
if comment_data:
self.redis.setex(cache_key, timedelta(seconds=self.cache_ttl), pickle.dumps(comment_data))
return comment_data增量采集策略
python
运行
def incremental_crawl_comments(crawler: JdCommentCrawler, sku_id: str, last_time: datetime) -> pd.DataFrame: """ 增量采集新评论 :param crawler: 评论采集器 :param sku_id: 商品ID :param last_time: 上次采集时间 :return: 新增评论DataFrame """ new_comments = [] page = 1 while True: # 获取当前页评论 page_data = crawler.api.get_comment_list( sku_id=sku_id, page=page, page_size=100, sort_type=2 # 按时间倒序 ) if not page_data or "comments" not in page_data: break page_df, _ = parse_comment_data(page_data) if page_df.empty: break # 筛选出上次采集之后的评论 mask = page_df["creation_time"] > last_time if mask.any(): new_comments.append(page_df[mask]) # 如果本页还有更早的评论,继续采集 if page_df[~mask].empty: page += 1 time.sleep(1) else: break # 已到上次采集时间点,停止采集 else: break # 没有新评论了 if not new_comments: return pd.DataFrame() return pd.concat(new_comments, ignore_index=True)
五、常见问题与解决方案
接口调用错误处理
| 错误码 | 错误信息 | 解决方案 |
|---|---|---|
| 1001 | 签名错误 | 检查签名生成逻辑,确保参数排序正确,时间戳有效 |
| 1003 | 权限不足 | 确认已申请评论接口权限,企业认证是否通过 |
| 2001 | SKU 不存在或无评论 | 验证 sku_id 是否正确,商品可能为新品暂无评论 |
| 429 | 调用频率超限 | 增加请求间隔,优化缓存策略,使用增量采集 |
| 502 | 服务暂时不可用 | 实现重试机制,记录错误日志,稍后再试 |
| 110 | IP 不在白名单 | 在京东开放平台添加服务器 IP 到白名单 |
数据采集优化建议
- 分时段采集:python
运行
运行
def filter_abnormal_comments(df: pd.DataFrame) -> pd.DataFrame: """过滤异常评论(如广告、重复内容)""" if df.empty: return df # 过滤过短评论(可能为无效内容) df = df[df["content"].str.len() >= 5] # 过滤重复评论 df = df.drop_duplicates(subset=["content"]) # 过滤包含广告特征的评论 ad_patterns = ["微信", "电话", "网址", "购买链接", "私聊"] def has_ad(text): return any(pattern in str(text) for pattern in ad_patterns) df = df[~df["content"].apply(has_ad)] return df
六、合规使用与场景应用
合规使用规范
- 数据使用限制:
不得展示完整评论内容,可展示脱敏后的片段或统计结果
评论图片不得用于商业用途,需保留原始水印
不得将评论数据用于攻击竞争对手或商品
- 调用规范:
单 SKU 每日采集不超过 1000 条评论
必须在应用中注明 "评论数据来源:京东开放平台"
不得对评论数据进行恶意修改或误导性展示
典型应用场景
- 产品优化分析:
通过评论关键词提取和情感分析,识别产品优缺点,为产品迭代提供依据 - 竞品对比系统:
采集同类商品评论数据,对比分析用户对不同品牌的评价差异 - 用户反馈监控:
实时监控新评论,及时发现产品质量问题或服务投诉 - 智能客服辅助:
基于评论数据构建常见问题库,提升客服响应效率
京东商品评论接口提供了宝贵的用户反馈数据,通过本文介绍的技术方案,开发者可以构建高效的评论采集与分析系统。在实际应用中,应特别注意合规使用,控制调用频率,同时结合缓存和增量采集策略提升性能,让评论数据真正为业务决策提供支持。
def is_peak_hour() -> bool: """判断是否为接口调用高峰期""" hour = datetime.now().hour # 避开京东接口高峰期(假设9:00-12:00, 15:00-18:00为高峰期) return (9 <= hour < 12) or (15 <= hour < 18)def get_optimal_delay() -> float: """根据时段获取最优请求间隔""" return 2.0 if is_peak_hour() else 1.0
- 异常评论过滤:python
