ReviseInventoryStatusで複数商品の在庫・価格を同期する

前回の記事はこちら

eBay Trading API:ReviseInventoryStatusで複数商品の在庫・価格を高速同期する

はじめに

本記事は、全42回にわたる「eBay API 実践ガイド」の第8回です。

前回(#7)は ReviseFixedPriceItem を用いた「カタログ情報(タイトル等)の部分更新」を実装しましたが、今回は全く別の要件にフォーカスします。それは 「在庫と価格のリアルタイム同期」 です。

この記事で得られること:

  • なぜ在庫同期に ReviseFixedPriceItem を使ってはいけないのかというアーキテクチャの理解。
  • ReviseInventoryStatus を用いた、軽量かつ一括(Bulk)での在庫・価格更新メソッドの習得。
  • チャンク分割(4件制限)と並列処理(マルチスレッド)を組み合わせた、本番稼働に耐えうる高速同期エンジンの実装。

背景・なぜこれが重要か (Motivation)

自社ECや他モールと併売している越境ECセラーにとって、「在庫がズレて欠品キャンセルが発生する」ことはアカウントの致命傷(Defect Rate悪化)に直結します。そのため、在庫は最短数分〜十数分間隔で eBay に同期し続ける必要があります。

ここで、前回の ReviseFixedPriceItem を使って数千件の在庫を同期しようとすると、以下の壁に激突します。

  • APIが重すぎる: 検索インデックスの再構築が走るため、レスポンスに数秒かかります。
  • Bulk処理ができない: 1リクエストにつき1商品しか更新できません。
  • Rate Limitの枯渇: 1日あたりの Call Limit をあっという間に使い果たします。

この課題を解決するために eBay が用意した専用の軽量 API が ReviseInventoryStatus です。この API は「在庫数(Quantity)」と「価格(StartPrice)」の更新のみに特化しており、検索インデックスの深い更新をバイパスするため、圧倒的な速度で動作します。

基本的な使い方(ベースライン):一括更新XMLの全体像

ReviseInventoryStatus は、1回の API コールで 最大4件まで の商品を同時に更新できます。

第6回(バリエーション出品)で解説した InventoryTrackingMethod=SKU の設定を行っていれば、eBay の ItemID を知らなくても、自社の SKU だけで直接更新をかけることが可能です。

<?xml version="1.0" encoding="utf-8"?> <ReviseInventoryStatusRequest xmlns="urn:ebay:apis:eBLBaseComponents"> <ErrorLanguage>en_US</ErrorLanguage> <WarningLevel>High</WarningLevel> <InventoryStatus> <SKU>TSHIRT-RED-S</SKU> <Quantity>15</Quantity> <StartPrice currencyID="USD">18.99</StartPrice> </InventoryStatus> <InventoryStatus> <ItemID>112233445566</ItemID> <Quantity>0</Quantity> </InventoryStatus> </ReviseInventoryStatusRequest> 

実務で躓く場面・深いポイント (Core)

この API は強力ですが、独自の制約があるため実装時にいくつかの罠が存在します。

1. 厳格な「最大4件」ルール

1つのリクエストに <InventoryStatus> ブロックを5つ以上入れると、API は即座にエラーを返します。プログラム側で、更新対象のリストを必ず4件ずつのチャンク(Chunk)に分割してリクエストを投げるロジックを組む必要があります。

2. レスポンス解析の罠(トップレベルの Ack)

eBay API に慣れてくると、「リクエストで配列を送ったのだから、結果も配列ごとに Ack(成功/失敗)が入っているはずだ」と思い込みがちですが、この API のレスポンスはトップレベルに1つの <Ack> があり、エラーは <Errors> に集約される構造になっています。個別のノードで成否を判定しようとするとバグの温床になります。

3. バリエーション商品の ItemID 指定の罠

InventoryTrackingMethod=SKU を設定していないバリエーション商品の在庫を更新する場合、<ItemID> だけを送ると「どのバリエーションの在庫か分からない」とエラーになります。この場合は <ItemID><SKU> を両方指定しなければなりません。(※だからこそ、第6回で「絶対に SKU ベースでのトラッキングを有効にせよ」と強調しました。)

堅牢な実装:チャンク分割と高速同期エンジン

実務で数千件の在庫を同期するための「バッチプロセッサ」を実装します。入力されたリストを自動で4件ずつに分割(Chunking)し、エラーハンドリングを厳格に行った動的ジェネレータです。

# sync_inventory.py import requests import xml.etree.ElementTree as ET import html import time import random from typing import List, Dict, Any from config import eBayConfig from ebay_token_manager import eBayTokenManager def _get_text(node: ET.Element, tag: str, ns: dict) -> str: """要素が存在しない場合の NoneType クラッシュを防ぐ""" if node is None: return "" el = node.find(tag, ns) return el.text if el is not None and el.text is not None else "" def api_call_with_backoff(fn, max_retries=3): """HTTP 429エラー(Too Many Requests)対策の指数バックオフ""" for attempt in range(max_retries): try: return fn() except requests.exceptions.HTTPError as e: if e.response is not None and e.response.status_code == 429:
                wait = (2 ** attempt) + random.uniform(0, 1) print(f"[WARNING] Rate limited. Retrying in {wait:.2f}s...")
                time.sleep(wait) else: raise raise Exception("Max retries exceeded after HTTP errors") def revise_inventory_batch(config: eBayConfig, token: str, status_list: List[Dict[str, Any]]) -> Dict[str, Any]: """
    最大4件の在庫/価格を一括更新するコア関数
    """ if len(status_list) > 4: raise ValueError("ReviseInventoryStatus accepts a maximum of 4 items per request.") if not status_list: return {"success": [], "errors": []}

    status_tags = "" for item in status_list: # 【深いポイント①】: 必須キーの欠如に対する防御 if 'sku' not in item and 'item_id' not in item: raise ValueError(f"Each item must have 'sku' or 'item_id': {item}")

        sku_tag = f"<SKU>{html.escape(str(item['sku']))}</SKU>" if 'sku' in item else "" item_id_tag = f"<ItemID>{html.escape(str(item['item_id']))}</ItemID>" if 'item_id' in item else "" qty_tag = "" if 'quantity' in item:
            qty_tag = f"<Quantity>{int(item['quantity'])}</Quantity>" price_tag = "" if 'price' in item:
            price_tag = f'<StartPrice currencyID="USD">{float(item["price"])}</StartPrice>' status_tags += f"""
        <InventoryStatus>
            {sku_tag}
            {item_id_tag}
            {qty_tag}
            {price_tag}
        </InventoryStatus>
        """ xml_payload = f"""<?xml version="1.0" encoding="utf-8"?>
    <ReviseInventoryStatusRequest xmlns="urn:ebay:apis:eBLBaseComponents">
        <ErrorLanguage>en_US</ErrorLanguage>
        <WarningLevel>High</WarningLevel>
        {status_tags}
    </ReviseInventoryStatusRequest>
    """ headers = { "X-EBAY-API-CALL-NAME": "ReviseInventoryStatus", "X-EBAY-API-SITEID": "0", "X-EBAY-API-COMPATIBILITY-LEVEL": "1323", "X-EBAY-API-IAF-TOKEN": token, "Content-Type": "text/xml" } def execute():
        res = requests.post(config.trading_api_url, headers=headers, data=xml_payload.encode('utf-8'), timeout=30)
        res.raise_for_status() return res

    response = api_call_with_backoff(execute)

    namespace = {'ns': 'urn:ebay:apis:eBLBaseComponents'}
    root = ET.fromstring(response.text)
    
    results = {"success": [], "errors": []} # 【深いポイント②】: 正しいレスポンス解析ロジック(トップレベルのAckを確認) ack = _get_text(root, 'ns:Ack', namespace) if ack not in ['Success', 'Warning']:
        errors = [{"code": _get_text(e, 'ns:ErrorCode', namespace), "msg": _get_text(e, 'ns:LongMessage', namespace)} for e in root.findall('ns:Errors', namespace)] raise Exception(f"ReviseInventoryStatus failed: {errors}") # 成功した場合、各 InventoryStatus ノードから更新された ItemID / SKU を取得 for node in root.findall('ns:InventoryStatus', namespace):
        sku = _get_text(node, 'ns:SKU', namespace)
        item_id = _get_text(node, 'ns:ItemID', namespace)
        identifier = sku if sku else item_id
        results["success"].append(identifier) return results def run_bulk_inventory_sync(config: eBayConfig, manager: eBayTokenManager, all_updates: List[Dict[str, Any]]): """
    大量の更新リストを4件ずつのチャンクに分割して処理する(直列版)
    """ chunk_size = 4 total_success = 0 total_errors = 0 chunks = [all_updates[i:i + chunk_size] for i in range(0, len(all_updates), chunk_size)] for idx, chunk in enumerate(chunks):
        token = manager.get_token() try:
            res = revise_inventory_batch(config, token, chunk)
            total_success += len(res["success"]) if res["errors"]:
                total_errors += len(res["errors"]) print(f"[ERROR] Chunk {idx+1} Errors: {res['errors']}") else: print(f"[SUCCESS] Chunk {idx+1} Processed successfully.") except Exception as e: print(f"[FATAL ERROR] Fatal Error on Chunk {idx+1}: {e}")
            total_errors += len(chunk)
            
        time.sleep(0.2) # 直列時の簡易Rate Limit対策 print("\n--- Sync Complete ---") print(f"Successfully Updated: {total_success} | Errors: {total_errors}")

パフォーマンス・スケーリング視点 (深度)

さらなる高速化:並列処理(Threading)の導入

数万件規模の SKU を管理するセラーの場合、4件ずつ直列で処理していても時間がかかりすぎます。
I/O 待ちがボトルネックとなるため、チャンク分割した後に concurrent.futures.ThreadPoolExecutor を用いて 複数のチャンクを並列で送信する のが究極のスケーリング手法です。

【注意】: 連載第1回で eBayTokenManagerthreading.Lock を組み込んだのはこのためです。マルチスレッド環境では、各スレッド内get_token() を評価することで、トークン更新時の競合を防ぐことができます。
import concurrent.futures import threading def run_concurrent_inventory_sync(config: eBayConfig, manager: eBayTokenManager, all_updates: List[Dict[str, Any]]):
    chunk_size = 4 chunks = [all_updates[i:i + chunk_size] for i in range(0, len(all_updates), chunk_size)] # 【深いポイント③】: スレッド内でのトークン評価とRate Limit制御 # Semaphoreを用いて同時実行数を制限し、急激なスパイク(429エラー)を防ぐ semaphore = threading.Semaphore(5) # 内部関数として定義し、直列版関数との混同を避ける def _sync_chunk_concurrent(chunk_data): with semaphore:
            time.sleep(0.1) # スレッド起動時の最小間隔を確保 # 必ずスレッド内部で get_token を呼ぶこと(メインスレッドで評価するとタイミングがズレます) token = manager.get_token() return revise_inventory_batch(config, token, chunk_data) with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(_sync_chunk_concurrent, chunk) for chunk in chunks] for future in concurrent.futures.as_completed(futures): try:
                res = future.result() # 成功ログ・エラーログの記録など except Exception as e: print(f"[ERROR] Thread execution failed: {e}") # --- 使い方の例 --- if __name__ == "__main__":
    config = eBayConfig()
    manager = eBayTokenManager(config.client_id, config.client_secret, config.refresh_token, config.env)
    
    sync_data = [
        {"sku": "TS-RED-S", "quantity": 10, "price": 19.99}, # 【注記】: OutOfStockControlが有効な場合は「検索非表示」になるだけですが、 # 無効の場合は quantity=0 を送ると出品が終了(End)します。第6回の設定を必ず確認してください。 {"sku": "TS-RED-M", "quantity": 0},  
        {"sku": "TS-BLU-S", "quantity": 5},
        {"sku": "TS-BLU-M", "quantity": 2, "price": 18.99},
        {"sku": "ACC-WATCH-01", "quantity": 15} # 5件目(自動的に次のチャンクへ) ]
    
    run_concurrent_inventory_sync(config, manager, sync_data)

この実装により、API の Call Limit を節約しつつ、自社システムの在庫変動を「数分以内」に eBay 側の全 SKU へ反映させる強靭な同期パイプラインが完成します。

まとめ

本記事では、eBay 運用における心臓部とも言える「在庫と価格の高速同期」について解説しました。

  • ベースライン: ReviseFixedPriceItem ではなく、軽量な ReviseInventoryStatus を使う意義と、最大4件の制約。
  • 深いポイント: トップレベルの Ack 判定による正しいレスポンス解析。必須項目(SKU/ItemID)の欠落防止。
  • スケーリング: マルチスレッド環境下におけるスレッドセーフなトークン取得と、セマフォを用いた Rate Limit 制御アーキテクチャ。

これで、商品の「出品」から「日々のメンテナンス(カタログ更新・在庫同期)」までを自動化する基盤が完全に整いました。

次のステップ

これまでは「こちらから eBay にデータを送る(Push)」処理を実装してきました。しかし、「今、eBay 上に何が出品されているのか?」という現状をプログラムから把握できなければ、同期システムは成り立ちません。

次回(#9)は、「GetSellerList / GetMyeBaySellingで出品中の一覧・販売状況を自動取得する」 方法について解説します。取得した一覧データをローカルDBにマッピングする実務的な手法をお楽しみに!

技術的なサポートやご質問について

APIの実装や仕様に関してご不明な点がございましたら、以下のeBay Japan 技術サポート窓口までお気軽にお問い合わせください:
ebayjapan-techsupport@ebay.com

トップに戻る