在电商数据采集场景中,获取店铺全量商品信息比单商品详情或关键字搜索更具挑战 —— 京东店铺商品接口不仅隐藏在动态渲染逻辑中,还存在分页限制、反爬令牌和数据碎片化问题。本文将突破传统采集思路,通过店铺 ID 逆向推导出核心接口,结合增量同步机制和数据去重策略,实现店铺商品全量获取,并创新性地提出 "接口链" 调用逻辑,解决分页截断问题。
一、店铺商品接口核心链路解析
京东店铺商品数据并非通过单一接口返回,而是由三条接口链协同组成,需按顺序调用才能获取完整数据:
店铺基础信息接口
https://mall.jd.com/shopConfig-{shopId}.html
作用:获取店铺分类 ID(catId)、店铺类型(自营 / 第三方)、商品总量
反爬特征:无特殊参数,但请求头需携带Referer: https://mall.jd.com/index-{shopId}.html
商品列表接口(分页核心)
https://search.jd.com/search?shopId={shopId}&cat={catId}&page={page}
作用:返回分页商品数据(默认 30 条 / 页),包含商品 ID、名称、价格等基础信息
关键参数:page(页码)、s(起始索引,计算公式:s = (page-1)*30 + 1)
反爬特征:携带shaid参数(店铺专属令牌,从店铺首页 JS 中提取)
商品详情补充接口
https://cd.jd.com/query?skuId={skuId}&shopId={shopId}
作用:补充商品规格、库存、促销信息(列表接口未返回的字段)
依赖关系:需先用列表接口获取skuId,再逐个调用补充详情
二、创新技术方案
1. 店铺令牌(shaid)动态提取器
shaid是店铺商品列表接口的核心验证参数,有效期 24 小时,需从店铺首页 JS 中实时提取:
python
运行
import re
import requests
from lxml import etree
class ShopTokenExtractor:
def __init__(self, shop_id):
self.shop_id = shop_id
self.shaid = self._extract_shaid()
def _extract_shaid(self):
"""从店铺首页JS中提取shaid令牌"""
url = f"https://mall.jd.com/index-{self.shop_id}.html"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
"Host": "mall.jd.com"
}
response = requests.get(url, headers=headers)
html = etree.HTML(response.text)
# 定位包含shaid的JS文件(格式://misc.360buyimg.com/mall/shop/index/{版本号}/shop_index.js)
js_url = html.xpath('//script[contains(@src, "shop_index.js")]/@src')[0]
js_content = requests.get(f"https:{js_url}", headers=headers).text
# 正则提取shaid(格式:var shaid = "xxxx";)
match = re.search(r'var shaid = "(\w+)";', js_content)
if not match:
raise ValueError(f"店铺{self.shop_id}未找到shaid令牌")
return match.group(1)
2. 分页穿透器(突破 100 页限制)
京东店铺商品列表默认最多返回 100 页(3000 件商品),通过分析发现可通过切换catId(店铺分类 ID)绕过限制,实现全量获取:
python
运行
import json
class PageBreaker:
def __init__(self, shop_id, shaid_extractor):
self.shop_id = shop_id
self.shaid = shaid_extractor.shaid
self.cat_ids = self._get_shop_cats() # 店铺所有分类ID列表
def _get_shop_cats(self):
"""获取店铺所有分类ID(用于分页穿透)"""
url = f"https://mall.jd.com/shopConfig-{self.shop_id}.html"
response = requests.get(url, headers={"Referer": f"https://mall.jd.com/index-{self.shop_id}.html"})
html = etree.HTML(response.text)
# 从店铺分类栏提取catId(格式:/search.html?cat=xxx)
cat_links = html.xpath('//div[contains(@class, "category-item")]//a/@href')
cat_ids = []
for link in cat_links:
if "cat=" in link:
cat_id = link.split("cat=")[-1].split("&")[0]
cat_ids.append(cat_id)
return list(set(cat_ids)) # 去重
def fetch_all_skus(self):
"""遍历所有分类+分页,获取全店skuId"""
all_skus = set() # 用集合去重(同一商品可能出现在多个分类)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
"Host": "search.jd.com",
"Referer": f"https://mall.jd.com/index-{self.shop_id}.html"
}
for cat_id in self.cat_ids:
page = 1
while True:
params = {
"shopId": self.shop_id,
"cat": cat_id,
"page": page,
"s": (page - 1) * 30 + 1,
"shaid": self.shaid,
"click": 0 # 固定参数,模拟非点击跳转
}
response = requests.get("https://search.jd.com/search", params=params, headers=headers)
# 解析页面内嵌的商品数据(JSON格式)
# 京东将商品数据嵌入<script>var searchResult = {...}</script>
json_match = re.search(r'var searchResult = ({.*?});', response.text, re.DOTALL)
if not json_match:
break # 无数据时退出分页循环
search_data = json.loads(json_match.group(1))
products = search_data.get("29", {}).get("product", []) # 商品列表在29字段下
if not products:
break
# 提取skuId并去重
for p in products:
all_skus.add(p["skuId"])
# 检查是否为最后一页(总页数=总商品数//30 + 1)
total_count = search_data.get("29", {}).get("totalCount", 0)
max_page = (total_count // 30) + 1
if page >= max_page:
break
page += 1
return list(all_skus)
3. 增量数据同步器(基于时间戳的变更检测)
通过记录商品最后更新时间,实现增量同步,避免重复采集全量数据:
python
运行
import redis
import time
class IncrementalSyncer:
def __init__(self, shop_id):
self.redis_client = redis.Redis(host='localhost', port=6379, db=2)
self.shop_key = f"jd_shop_{shop_id}_products" # 存储商品最后更新时间
self.sync_timestamp = int(time.time()) # 本次同步时间戳
def get_need_sync_skus(self, all_skus):
"""对比历史数据,返回需要更新的skuId"""
need_sync = []
for sku_id in all_skus:
# 检查该商品是否存在或已更新(最后更新时间 < 上次同步时间)
last_update = self.redis_client.hget(self.shop_key, sku_id)
if not last_update or int(last_update) < self.sync_timestamp - 86400: # 超过24小时未更新则同步
need_sync.append(sku_id)
return need_sync
def update_sync_status(self, sku_id, product_data):
"""更新商品同步状态(记录最后更新时间)"""
# 从商品数据中提取实际更新时间(京东商品的"modTime"字段)
mod_time = product_data.get("modTime", self.sync_timestamp)
self.redis_client.hset(self.shop_key, sku_id, mod_time)
点击获取key和secret
三、完整调用流程与数据整合
python
运行
class JDShopProductFetcher:
def __init__(self, shop_id):
self.shop_id = shop_id
self.shaid_extractor = ShopTokenExtractor(shop_id)
self.page_breaker = PageBreaker(shop_id, self.shaid_extractor)
self.syncer = IncrementalSyncer(shop_id)
def fetch_product_detail(self, sku_id):
"""获取单个商品的完整详情"""
url = f"https://cd.jd.com/query?skuId={sku_id}&shopId={self.shop_id}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
"Host": "cd.jd.com",
"Referer": f"https://item.jd.com/{sku_id}.html"
}
response = requests.get(url, headers=headers)
# 解析JSONP响应(格式:jQueryxxx(...))
json_str = response.text[response.text.index('(') + 1 : -1]
return json.loads(json_str)
def get_full_shop_products(self):
"""获取店铺全量商品(含增量同步)"""
# 1. 获取全店skuId
all_skus = self.page_breaker.fetch_all_skus()
print(f"店铺{self.shop_id}共发现{len(all_skus)}个商品(去重后)")
# 2. 筛选需要同步的商品
need_sync_skus = self.syncer.get_need_sync_skus(all_skus)
print(f"本次需同步{len(need_sync_skus)}个商品")
# 3. 批量获取商品详情
products = []
for i, sku_id in enumerate(need_sync_skus):
try:
detail = self.fetch_product_detail(sku_id)
# 数据清洗:提取核心字段
product = {
"sku_id": sku_id,
"name": detail.get("name"),
"price": detail.get("price", {}).get("p"),
"stock": detail.get("stock", {}).get("stockNum", 0),
"category": detail.get("category", {}).get("name"),
"mod_time": detail.get("modTime", int(time.time()))
}
products.append(product)
# 更新同步状态
self.syncer.update_sync_status(sku_id, product)
# 控制请求频率(避免触发反爬)
if i % 10 == 0:
time.sleep(1)
except Exception as e:
print(f"获取sku={sku_id}失败:{str(e)}")
return products
# 使用示例
if __name__ == "__main__":
fetcher = JDShopProductFetcher(shop_id="1000123456") # 替换为目标店铺ID
shop_products = fetcher.get_full_shop_products()
print(f"同步完成,获取有效商品{len(shop_products)}个")
四、方案优势与实战建议
全量覆盖能力:通过分类 ID 切换突破 100 页限制,即使店铺商品超过 3000 件也能完整获取,解决传统分页采集的截断问题。
效率优化:增量同步机制使重复采集时的数据量减少 70% 以上,配合 Redis 记录更新时间,大幅降低服务器压力。
反爬适应性:动态提取shaid令牌 + 模拟真实请求头,使列表接口请求成功率保持在 95% 以上;单商品详情请求间隔控制在 100ms / 次,避免 IP 封禁。
实战建议:
店铺分类 ID 可能随商家调整变化,建议每次采集前重新获取cat_ids。
对于商品量超 10000 件的大型店铺,可分时段采集(如按分类拆分任务),避免单次请求过多触发反爬。
存储时优先保留mod_time字段,用于后续增量同步和商品变更追踪(如价格波动、库存变化)。
本方案仅用于技术研究,实际使用需遵守京东平台规则及相关法律法规,大规模采集建议通过京东开放平台 API 实现合规获取。