京东平台的店铺商品接口是获取商家全量商品信息的核心通道,相比单商品接口,它能批量获取店铺内所有商品的基础信息、价格、库存和促销状态,在店铺竞品分析、品类结构研究、价格监测等场景中具有重要应用价值。本文将聚焦京东店铺商品接口的批量采集技术,从接口权限申请、分页递归采集、数据去重清洗到品类结构可视化,结合完整代码实现,帮助开发者合规高效地获取店铺全量商品数据。
一、接口特性与核心数据结构
京东店铺商品接口(
jd.shop.item.list.get
)具有鲜明的批量处理特性,主要特点包括:分页批量返回:支持按页获取店铺商品,单页最大 100 条
多维度筛选:可按商品状态、品类、价格区间等筛选
完整商品属性:包含基础信息、价格、库存、促销等核心字段
店铺关联数据:返回商品与店铺的关联信息,如是否主推商品
核心参数解析
参数类别 | 具体参数 | 作用说明 | 约束条件 |
---|---|---|---|
基础参数 | app_key | 应用标识 | 开放平台注册获取 |
method | 接口方法名 | 固定为jd.shop.item.list.get | |
timestamp | 时间戳 | 格式 yyyy-MM-dd HH:mm:ss,误差≤5 分钟 | |
sign | 签名 | HMAC-SHA256 加密 | |
业务参数 | shop_id | 店铺 ID | 必选,京东店铺唯一标识 |
page | 页码 | 正整数,默认 1 | |
page_size | 每页条数 | 1-100,默认 20 | |
筛选参数 | item_status | 商品状态 | 0 - 全部,1 - 在售,2 - 下架 |
cid3 | 三级分类 ID | 可选,筛选指定分类商品 | |
price_from | 最低价格 | 可选,单位元 | |
price_to | 最高价格 | 可选,单位元 |
响应数据结构
接口返回数据包含商品列表和分页信息,核心结构如下:
json
{ "jd_shop_item_list_get_response": { "result": { "total": 856, // 店铺商品总数 "page": 1, "page_size": 50, "items": [ { "sku_id": "100012345678", // 商品SKU ID "spu_id": "1234567", // 商品SPU ID "title": "XX品牌无线蓝牙耳机 主动降噪", "brand_name": "XX品牌", "cid1": 122, // 一级分类ID "cid1_name": "数码", "cid2": 1223, // 二级分类ID "cid2_name": "耳机", "cid3": 12234, // 三级分类ID "cid3_name": "无线耳机", "price": "299.00", // 价格 "market_price": "399.00", // 市场价 "stock": 1250, // 库存 "stock_state": 33, // 库存状态(33-有货) "is_main": 1, // 是否为主推商品(1-是) "is_hot": 0, // 是否为热销商品(0-否) "create_time": "2024-05-15 09:30:00", // 上架时间 "update_time": "2024-08-10 14:20:30", // 更新时间 "sales_count": 3560, // 30天销量 "comment_count": 1250 // 评论数 } // 更多商品... ] } }}
二、开发环境与权限配置
环境要求
开发语言:Python 3.8+
核心依赖:
requests
(HTTP 请求)、pandas
(数据处理)、matplotlib
(可视化)开发工具:PyCharm/VS Code
运行环境:支持 Windows/macOS/Linux,需联网访问京东开放平台
依赖安装
bash
pip install requests pandas matplotlib redis pycryptodome tqdm
店铺商品接口权限说明
京东店铺商品接口权限获取需注意:
个人开发者几乎无法获取,需企业认证账号
需明确说明使用场景,不得用于恶意爬取竞争店铺数据
调用限制严格:默认 30 次 / 分钟,单店铺日采集上限 1000 次
部分高销量店铺可能需要特殊权限申请
三、接口开发实战实现
步骤 1:基础工具与签名生成
python
运行
import timeimport hashlibimport hmacimport urllib.parseimport loggingfrom typing import Dict, Tuple# 配置日志logging.basicConfig( filename='jd_shop_items.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')def generate_jd_sign(params: Dict, app_secret: str) -> str: """生成京东接口签名(HMAC-SHA256)""" sorted_params = sorted(params.items(), key=lambda x: x[0]) query_string = urllib.parse.urlencode(sorted_params) signature = hmac.new( app_secret.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256 ).hexdigest().upper() return signaturedef validate_shop_item_params(params: Dict) -> Tuple[bool, str]: """验证店铺商品接口参数""" required = ["shop_id", "page", "page_size"] for param in required: if param not in params: return False, f"缺少必选参数: {param}" # 验证分页参数 if not isinstance(params["page"], int) or params["page"] < 1: return False, "page必须是正整数" if not isinstance(params["page_size"], int) or not (1 <= params["page_size"] <= 100): return False, "page_size必须是1-100的整数" # 验证商品状态参数 if "item_status" in params and params["item_status"] not in [0, 1, 2]: return False, "item_status必须是0-2的整数" return True, "参数验证通过"
步骤 2:店铺商品接口客户端
python
运行
import requestsimport jsonfrom typing import Dict, Optional, Anyclass JdShopItemAPI: def __init__(self, app_key: str, app_secret: str): self.app_key = app_key self.app_secret = app_secret self.api_url = "https://api.jd.com/routerjson" def get_shop_items(self, shop_id: str, page: int = 1, page_size: int = 50, item_status: int = 1, # 默认获取在售商品 cid3: int = None, price_from: float = None, price_to: float = None) -> Optional[Dict[str, Any]]: """ 获取店铺商品列表 :param shop_id: 店铺ID :param page: 页码 :param page_size: 每页条数 :param item_status: 商品状态 0-全部 1-在售 2-下架 :param cid3: 三级分类ID :param price_from: 最低价格 :param price_to: 最高价格 :return: 商品列表数据 """ # 构建业务参数 params = { "shop_id": shop_id, "page": page, "page_size": page_size, "item_status": item_status } # 添加可选参数 if cid3: params["cid3"] = cid3 if price_from is not None: params["price_from"] = price_from if price_to is not None: params["price_to"] = price_to # 参数验证 valid, msg = validate_shop_item_params(params) if not valid: logging.error(f"参数错误: {msg}") return None # 构建完整请求参数 full_params = { "method": "jd.shop.item.list.get", "app_key": self.app_key, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "format": "json", "v": "2.0",** params } # 生成签名 full_params["sign"] = generate_jd_sign(full_params, self.app_secret) try: # 发送请求 response = requests.post( self.api_url, data=full_params, headers={"Content-Type": "application/x-www-form-urlencoded"}, timeout=15 ) response.raise_for_status() result = json.loads(response.text) # 错误处理 if "error_response" in result: error = result["error_response"] logging.error(f"接口错误: {error.get('msg')} (错误码: {error.get('code')})") return None return result.get("jd_shop_item_list_get_response", {}).get("result", {}) except requests.exceptions.RequestException as e: logging.error(f"HTTP请求异常: {str(e)}") return None except json.JSONDecodeError as e: logging.error(f"JSON解析错误: {str(e)}") return None
步骤 3:全量商品数据采集器
python
运行
import timeimport pandas as pdfrom tqdm import tqdmfrom typing import Tuple, Optionalclass JdShopItemCrawler: def __init__(self, api: JdShopItemAPI): self.api = api self.delay_seconds = 2 # 请求间隔,避免触发限流 self.max_retry = 3 # 单页最大重试次数 def crawl_shop_all_items(self, shop_id: str, item_status: int = 1, max_items: int = None) -> Tuple[pd.DataFrame, Dict]: """ 采集店铺所有商品 :param shop_id: 店铺ID :param item_status: 商品状态 :param max_items: 最大采集数量,None表示采集全部 :return: 商品DataFrame和统计信息 """ all_items = [] total_stats = None page = 1 page_size = 100 # 单页最大数量,提高采集效率 # 获取第一页数据,确定总数量 first_page = self._get_with_retry( shop_id=shop_id, page=1, page_size=page_size, item_status=item_status ) if not first_page: return pd.DataFrame(), {} # 解析第一页数据 first_df, first_stats = self.parse_shop_items(first_page) all_items.append(first_df) total_stats = first_stats # 计算需要采集的总页数 total_items = first_stats.get("total", 0) if max_items and total_items > max_items: total_items = max_items total_pages = (total_items + page_size - 1) // page_size if total_pages <= 1: combined_df = pd.concat(all_items, ignore_index=True) return (combined_df.iloc[:max_items] if max_items else combined_df), total_stats # 采集剩余页数 for page in tqdm(range(2, total_pages + 1), desc=f"采集店铺 {shop_id} 商品"): # 控制请求频率 time.sleep(self.delay_seconds) # 获取当前页数据 page_data = self._get_with_retry( shop_id=shop_id, page=page, page_size=page_size, item_status=item_status ) if page_data: page_df, _ = self.parse_shop_items(page_data) all_items.append(page_df) # 检查是否达到最大采集数量 if max_items and len(pd.concat(all_items, ignore_index=True)) >= max_items: break else: logging.warning(f"第{page}页商品采集失败,跳过") # 合并所有数据 combined_df = pd.concat(all_items, ignore_index=True) # 截取最大数量 if max_items and len(combined_df) > max_items: combined_df = combined_df.iloc[:max_items] return combined_df, total_stats def _get_with_retry(self, **kwargs) -> Optional[Dict]: """带重试机制的页面获取""" for retry in range(self.max_retry): data = self.api.get_shop_items(** kwargs) if data: return data logging.warning(f"第{retry+1}次重试获取数据") time.sleep(self.delay_seconds * (retry + 1)) # 指数退避 return None @staticmethod def parse_shop_items(raw_data: Dict) -> Tuple[pd.DataFrame, Dict]: """解析店铺商品数据""" if not raw_data or "items" not in raw_data: return pd.DataFrame(), {} # 提取统计信息 stats = { "total": raw_data.get("total", 0), "page": raw_data.get("page", 1), "page_size": raw_data.get("page_size", 20), "total_pages": (raw_data.get("total", 0) + raw_data.get("page_size", 20) - 1) // raw_data.get("page_size", 20) } # 解析商品列表 items = [] for item in raw_data["items"]: item_info = { "sku_id": item.get("sku_id"), "spu_id": item.get("spu_id"), "title": item.get("title"), "brand_name": item.get("brand_name"), "cid1": item.get("cid1"), "cid1_name": item.get("cid1_name"), "cid2": item.get("cid2"), "cid2_name": item.get("cid2_name"), "cid3": item.get("cid3"), "cid3_name": item.get("cid3_name"), "price": item.get("price"), "market_price": item.get("market_price"), "stock": item.get("stock"), "stock_state": item.get("stock_state"), "is_main": item.get("is_main", 0), "is_hot": item.get("is_hot", 0), "create_time": item.get("create_time"), "update_time": item.get("update_time"), "sales_count": item.get("sales_count", 0), "comment_count": item.get("comment_count", 0) } items.append(item_info) # 转换为DataFrame df = pd.DataFrame(items) # 数据类型转换 if not df.empty: df["price"] = pd.to_numeric(df["price"], errors="coerce") df["market_price"] = pd.to_numeric(df["market_price"], errors="coerce") df["stock"] = pd.to_numeric(df["stock"], errors="coerce") df["sales_count"] = pd.to_numeric(df["sales_count"], errors="coerce") df["comment_count"] = pd.to_numeric(df["comment_count"], errors="coerce") df["create_time"] = pd.to_datetime(df["create_time"], errors="coerce") df["update_time"] = pd.to_datetime(df["update_time"], errors="coerce") df["is_main"] = df["is_main"].astype(bool) df["is_hot"] = df["is_hot"].astype(bool) return df, stats
步骤 4:商品数据清洗与去重
python
运行
import redef clean_shop_item_data(df: pd.DataFrame) -> pd.DataFrame: """清洗店铺商品数据""" if df.empty: return df df_clean = df.copy() # 1. 去重(根据sku_id去重) df_clean = df_clean.drop_duplicates(subset=["sku_id"], keep="last") # 2. 处理缺失值 # 填充价格缺失值(用市场价代替) price_mask = df_clean["price"].isna() & ~df_clean["market_price"].isna() df_clean.loc[price_mask, "price"] = df_clean.loc[price_mask, "market_price"] # 填充品牌缺失值 df_clean["brand_name"] = df_clean["brand_name"].fillna("未知品牌") # 3. 计算折扣率 def calculate_discount(row): if pd.notna(row["price"]) and pd.notna(row["market_price"]) and row["market_price"] > 0: return round(row["price"] / row["market_price"], 2) return None df_clean["discount_rate"] = df_clean.apply(calculate_discount, axis=1) # 4. 提取商品标题关键词 def extract_title_keywords(title): if not title: return [] # 简单提取数字和品牌相关关键词 keywords = re.findall(r'(\d+[寸|英寸|g|kg|ml|L|W|H|mm|cm|m])', str(title)) # 提取品牌关键词 if df_clean["brand_name"].notna().any(): brand = row.get("brand_name", "") if brand and brand != "未知品牌" and brand in str(title): keywords.append(brand) return list(set(keywords)) df_clean["title_keywords"] = df_clean.apply( lambda row: extract_title_keywords(row["title"]), axis=1 ) # 5. 标记库存状态 df_clean["in_stock"] = df_clean["stock_state"] == 33 return df_clean
步骤 5:店铺商品数据分析与可视化
python
运行
import matplotlib.pyplot as pltimport seaborn as snsfrom matplotlib.font_manager import FontPropertiesimport pandas as pd# 设置中文字体plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]sns.set(font_scale=1.2)sns.set_style("whitegrid")def analyze_shop_category(df: pd.DataFrame) -> Tuple[Dict, pd.DataFrame]: """分析店铺品类结构""" if df.empty: return {}, pd.DataFrame() # 一级分类分布 cid1_dist = df["cid1_name"].value_counts().to_dict() # 二级分类分布(按一级分类分组) cid2_dist = df.groupby("cid1_name")["cid2_name"].value_counts().unstack().fillna(0).to_dict() # 分类销量分析 category_sales = df.groupby(["cid1_name", "cid2_name"]).agg( total_sales=("sales_count", "sum"), item_count=("sku_id", "count") ).reset_index() return {"cid1_dist": cid1_dist, "cid2_dist": cid2_dist}, category_salesdef visualize_category_distribution(cid1_dist: Dict): """可视化一级分类分布""" if not cid1_dist: return plt.figure(figsize=(12, 8)) # 饼图:分类占比 plt.subplot(1, 2, 1) labels = list(cid1_dist.keys()) sizes = list(cid1_dist.values()) plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90) plt.title('店铺一级分类商品数量占比') # 柱状图:分类数量 plt.subplot(1, 2, 2) sns.barplot(x=list(cid1_dist.values()), y=labels, palette="viridis") plt.xlabel('商品数量') plt.title('各分类商品数量分布') plt.tight_layout() plt.savefig('category_distribution.png', dpi=300, bbox_inches='tight') plt.close() print("分类分布图表已保存至 category_distribution.png")def analyze_price_distribution(df: pd.DataFrame) -> pd.DataFrame: """分析价格分布""" if df.empty: return pd.DataFrame() # 价格区间划分 bins = [0, 50, 100, 200, 500, 1000, float('inf')] labels = ['0-50元', '50-100元', '100-200元', '200-500元', '500-1000元', '1000元以上'] df["price_range"] = pd.cut(df["price"], bins=bins, labels=labels) # 按价格区间统计 price_analysis = df.groupby("price_range").agg( item_count=("sku_id", "count"), avg_price=("price", "mean"), total_sales=("sales_count", "sum") ).reset_index() # 可视化价格分布 plt.figure(figsize=(12, 6)) sns.barplot(x="price_range", y="item_count", data=price_analysis, palette="coolwarm") plt.title('商品价格区间分布') plt.xlabel('价格区间') plt.ylabel('商品数量') plt.xticks(rotation=45) plt.tight_layout() plt.savefig('price_distribution.png', dpi=300, bbox_inches='tight') plt.close() print("价格分布图表已保存至 price_distribution.png") return price_analysisdef analyze_top_selling_items(df: pd.DataFrame, top_n: int = 10) -> pd.DataFrame: """分析热销商品""" if df.empty: return pd.DataFrame() # 按销量排序 top_selling = df.sort_values("sales_count", ascending=False).head(top_n).copy() # 可视化热销商品 plt.figure(figsize=(14, 8)) sns.barplot(x="sales_count", y="title", data=top_selling, palette="rocket") plt.title(f'店铺Top {top_n} 热销商品') plt.xlabel('30天销量') plt.ylabel('商品名称') plt.tight_layout() plt.savefig('top_selling_items.png', dpi=300, bbox_inches='tight') plt.close() print(f"Top {top_n} 热销商品图表已保存至 top_selling_items.png") return top_selling
步骤 6:完整调用与分析示例
python
运行
if __name__ == "__main__": # 配置应用信息(替换为实际值) APP_KEY = "your_app_key_here" APP_SECRET = "your_app_secret_here" # 目标店铺ID TARGET_SHOP_ID = "1000000000" # 替换为实际店铺ID # 初始化API客户端和采集器 shop_api = JdShopItemAPI(APP_KEY, APP_SECRET) crawler = JdShopItemCrawler(shop_api) # 1. 采集店铺所有在售商品(最多采集1000件) print(f"开始采集店铺 {TARGET_SHOP_ID} 的商品数据...") raw_items_df, stats = crawler.crawl_shop_all_items( shop_id=TARGET_SHOP_ID, item_status=1, # 只采集在售商品 max_items=1000 # 限制最大采集数量 ) if raw_items_df.empty: print("未采集到任何商品数据") else: print(f"商品采集完成,共获取 {len(raw_items_df)} 件商品(店铺总商品数: {stats.get('total', 0)})") # 2. 保存原始数据 raw_items_df.to_csv(f"jd_shop_{TARGET_SHOP_ID}_raw.csv", index=False, encoding="utf-8-sig") print(f"原始商品数据已保存至 jd_shop_{TARGET_SHOP_ID}_raw.csv") # 3. 清洗数据 cleaned_df = clean_shop_item_data(raw_items_df) print(f"数据清洗完成,去重后剩余 {len(cleaned_df)} 件商品") # 4. 保存清洗后的数据 cleaned_df.to_csv(f"jd_shop_{TARGET_SHOP_ID}_cleaned.csv", index=False, encoding="utf-8-sig") print(f"清洗后商品数据已保存至 jd_shop_{TARGET_SHOP_ID}_cleaned.csv") # 5. 品类分析 print("\n开始进行店铺品类分析...") category_data, category_sales = analyze_shop_category(cleaned_df) visualize_category_distribution(category_data["cid1_dist"]) # 6. 价格分布分析 print("\n开始进行价格分布分析...") price_analysis = analyze_price_distribution(cleaned_df) print("价格区间分析结果:") print(price_analysis[["price_range", "item_count", "avg_price"]].to_string(index=False)) # 7. 热销商品分析 print("\n开始进行热销商品分析...") top_selling = analyze_top_selling_items(cleaned_df, top_n=10) print("Top 10 热销商品:") print(top_selling[["title", "price", "sales_count", "cid1_name"]].to_string(index=False)) # 8. 折扣分析 discount_analysis = cleaned_df[cleaned_df["discount_rate"].notna()]["discount_rate"].describe() print("\n商品折扣分析:") print(f"平均折扣率: {discount_analysis['mean']:.2f}") print(f"最低折扣率: {discount_analysis['min']:.2f}") print(f"最高折扣率: {discount_analysis['max']:.2f}")
四、缓存与增量更新策略
店铺商品缓存实现
python
运行
import redisimport picklefrom datetime import timedeltaclass CachedJdShopItemAPI(JdShopItemAPI): def __init__(self, app_key, app_secret, redis_host="localhost", redis_port=6379): super().__init__(app_key, app_secret) self.redis = redis.Redis(host=redis_host, port=redis_port, db=0) # 店铺商品缓存时间(商品信息更新频率较低) self.cache_ttl = 3600 # 1小时 def get_cache_key(self, shop_id: str, **params) -> str: """生成缓存键""" sorted_params = sorted(params.items()) params_str = "_".join([f"{k}_{v}" for k, v in sorted_params]) return f"jd_shop_items:{shop_id}:{params_str}" def get_shop_items(self,** kwargs) -> Optional[Dict]: """带缓存的店铺商品获取""" shop_id = kwargs.get("shop_id") cache_key = self.get_cache_key(shop_id, **kwargs) # 尝试从缓存获取 cached_data = self.redis.get(cache_key) if cached_data: return pickle.loads(cached_data) # 缓存未命中,调用接口 item_data = super().get_shop_items(** kwargs) # 存入缓存 if item_data: self.redis.setex(cache_key, timedelta(seconds=self.cache_ttl), pickle.dumps(item_data)) return item_data
增量更新实现
python
运行
def incremental_update_shop_items(crawler: JdShopItemCrawler, shop_id: str, last_update_time: str) -> pd.DataFrame: """ 增量更新店铺商品(只获取更新时间在last_update_time之后的商品) :param crawler: 商品采集器 :param shop_id: 店铺ID :param last_update_time: 上次更新时间(字符串格式) :return: 新增或更新的商品DataFrame """ updated_items = [] page = 1 page_size = 100 has_more = True # 转换上次更新时间为datetime try: last_update_dt = pd.to_datetime(last_update_time) except: logging.error("上次更新时间格式错误,返回全量数据") return crawler.crawl_shop_all_items(shop_id)[0] while has_more: # 获取当前页数据 page_data = crawler._get_with_retry( shop_id=shop_id, page=page, page_size=page_size, item_status=1 ) if not page_data: break page_df, page_stats = crawler.parse_shop_items(page_data) if page_df.empty: break # 筛选出更新时间在last_update_time之后的商品 mask = page_df["update_time"] > last_update_dt if mask.any(): updated_items.append(page_df[mask]) # 检查是否还有更多页 total_pages = page_stats.get("total_pages", 1) has_more = page < total_pages page += 1 time.sleep(2) else: # 本页没有更新的商品,且按更新时间排序的话可以停止 has_more = False if not updated_items: return pd.DataFrame() return pd.concat(updated_items, ignore_index=True)
五、常见问题与解决方案
接口调用错误处理
错误码 | 错误信息 | 解决方案 |
---|---|---|
1001 | 签名错误 | 检查签名生成逻辑,确保参数排序正确,时间戳有效 |
1003 | 权限不足 | 确认已申请店铺商品接口权限,企业认证是否通过 |
2002 | 店铺不存在 | 验证 shop_id 是否正确,店铺可能已关闭 |
429 | 调用频率超限 | 增加请求间隔,使用缓存,错峰采集 |
500 | 服务器内部错误 | 实现重试机制,记录详细日志,稍后再试 |
110 | IP 不在白名单 | 在京东开放平台添加服务器 IP 到白名单 |
采集效率优化建议
- 分时段采集:python
运行
运行
from concurrent.futures import ThreadPoolExecutor, as_completeddef parallel_crawl_shops(shop_ids: list, api: JdShopItemAPI, max_workers: int = 3) -> Dict: """并行采集多个店铺商品""" results = {} crawler = JdShopItemCrawler(api) with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit(crawler.crawl_shop_all_items, shop_id): shop_id for shop_id in shop_ids } for future in as_completed(futures): shop_id = futures[future] try: df, stats = future.result() results[shop_id] = {"data": df, "stats": stats} print(f"店铺 {shop_id} 采集完成,获取 {len(df)} 件商品") except Exception as e: logging.error(f"店铺 {shop_id} 采集失败: {str(e)}") results[shop_id] = {"error": str(e)} return results
六、合规使用与场景应用
合规使用规范
- 数据使用限制:
不得将采集的店铺商品数据用于恶意竞争
展示店铺商品信息时必须注明来源店铺
不得批量下载商品图片用于其他商业用途
- 调用规范:
单店铺每日采集次数不超过平台限制
采集间隔不得小于 1 秒,避免给服务器造成压力
不得伪造请求来源,需使用真实的 app_key
典型应用场景
- 竞品店铺分析:
采集竞争对手店铺商品数据,分析其品类结构、价格策略和热销商品 - 市场行情监测:
定期采集多个同类店铺商品,监控市场价格波动和新品上架情况 - 供应商评估:
分析店铺商品的品牌分布、价格区间,评估潜在供应商实力 - 店铺运营优化:
对比自身店铺与优秀同行的商品结构差异,优化品类布局
京东店铺商品接口为商家分析提供了全面的数据基础,通过本文介绍的技术方案,开发者可以构建高效的店铺商品采集与分析系统。在实际应用中,应特别注意合规使用,控制采集频率,同时结合缓存和增量更新策略提升性能,让店铺商品数据真正为商业决策提供支持。
def get_optimal_crawl_time() -> bool: """判断是否为最佳采集时间(避开京东接口高峰期)""" hour = time.localtime().tm_hour # 最佳采集时间:凌晨2-6点,接口压力小 return 2 <= hour < 6
- 并行采集控制:python