Перейти к основному содержимому

Before/After трекер

Ключевая ценность

Before/After трекер — это доказательная база для ROI-Content. Вместо "мы сделали красивые фото" клиент получает: "После обновления контента позиция выросла с #85 на #23, продажи увеличились на 47% за 30 дней, ROI = 340%".

Концепция: 5 контрольных точек

Timeline замеров

gantt
title Before/After Timeline
dateFormat YYYY-MM-DD
axisFormat %d.%m

section Baseline
T0 Baseline (1 день до) :milestone, t0, 2026-03-01, 0d

section Content Update
Обновление контента :crit, update, 2026-03-02, 1d

section Tracking
T1 Short-term (7 дней) :milestone, t1, 2026-03-09, 0d
T2 Medium-term (14 дней) :milestone, t2, 2026-03-16, 0d
T3 Long-term (30 дней) :milestone, t3, 2026-04-01, 0d
T4 Extended (90 дней) :milestone, t4, 2026-05-31, 0d

Что замеряем на каждой точке

ТочкаКогдаЦель замераОжидаемый паттерн
T01 день до обновленияBaseline — фиксация текущего состоянияНачальные значения
T1+7 днейБыстрый эффект — индексация, первые реакцииПозиция может упасть (переиндексация)
T2+14 днейСтабилизация — алгоритм WB/Ozon адаптировалсяНачало роста позиции
T3+30 днейПолный эффект — устоявшиеся метрикиОсновной рост продаж
T4+90 днейДолгосрочный тренд — устойчивость результатаСтабильный уровень или продолжение роста

Метрики на каждой точке

@dataclass
class MeasurementSnapshot:
"""Полный снапшот метрик товара в момент замера."""
measured_at: datetime
measurement_type: str # t0, t1, t2, t3, t4

# Позиционные метрики
category_position: int | None
search_positions: dict[str, int] # {keyword: position}

# Продажи (за последние 7 дней от точки замера)
sales_7d: int
revenue_7d: float
orders_7d: int

# Продажи (за последние 30 дней)
sales_30d: int
revenue_30d: float

# Рейтинг и отзывы
rating: float
review_count: int
review_count_delta: int # Новых отзывов с прошлого замера

# Контент метрики
photo_count: int
video_count: int
has_infographics: bool
description_length: int

# Цена
price: float
price_with_discount: float

# ContentScore на момент замера
content_score: float
content_score_details: dict

# Конкурентная среда
category_avg_position: float
category_avg_price: float
category_avg_sales: float

def to_dict(self) -> dict:
"""Сериализация для хранения в JSONB."""
return {
"measured_at": self.measured_at.isoformat(),
"type": self.measurement_type,
"position": {
"category": self.category_position,
"search": self.search_positions,
},
"sales": {
"sales_7d": self.sales_7d,
"revenue_7d": self.revenue_7d,
"orders_7d": self.orders_7d,
"sales_30d": self.sales_30d,
"revenue_30d": self.revenue_30d,
},
"reviews": {
"rating": self.rating,
"count": self.review_count,
"new": self.review_count_delta,
},
"content": {
"photos": self.photo_count,
"videos": self.video_count,
"infographics": self.has_infographics,
"description_length": self.description_length,
},
"price": {
"current": self.price,
"with_discount": self.price_with_discount,
},
"score": {
"total": self.content_score,
"details": self.content_score_details,
},
"category": {
"avg_position": self.category_avg_position,
"avg_price": self.category_avg_price,
"avg_sales": self.category_avg_sales,
}
}

SF API endpoints для замеров

Какие запросы делаем на каждый замер

flowchart LR
subgraph "1 замер = 4 запроса к SF API"
A["GET /products/{mp}/{sku}<br/>Базовая информация"]
B["GET /products/{mp}/{sku}/sales<br/>Продажи за 30 дней"]
C["GET /products/{mp}/{sku}/position<br/>Позиция в категории"]
D["GET /products/{mp}/{sku}/keywords<br/>Ключевые слова"]
end

A --> SNAP["MeasurementSnapshot"]
B --> SNAP
C --> SNAP
D --> SNAP

SNAP --> DB["PostgreSQL<br/>before_after_tracks"]

Пример запросов и ответов

Запрос 1: Базовая информация

# GET /api/v2/products/wildberries/456789012
curl -H "Authorization: Bearer {SF_API_KEY}" \
"https://salesfinder.ru/api/v2/products/wildberries/456789012"
{
"sku": 456789012,
"name": "Кухонный нож Chef Master шеф-повара 20 см",
"brand": "Chef Master",
"category": "Кухонные ножи",
"category_id": 7834,
"price": 2990.00,
"price_with_discount": 2490.00,
"rating": 4.6,
"review_count": 847,
"photo_count": 6,
"video_count": 1,
"in_stock": true,
"category_position": 45,
"category_total": 12500
}

Запрос 2: Продажи

# GET /api/v2/products/wildberries/456789012/sales?days=30
curl -H "Authorization: Bearer {SF_API_KEY}" \
"https://salesfinder.ru/api/v2/products/wildberries/456789012/sales?days=30"
{
"sku": 456789012,
"period_days": 30,
"sales_count": 342,
"revenue": 851580.00,
"avg_daily_sales": 11.4,
"orders_count": 328,
"returns_count": 14,
"return_rate": 4.1,
"daily_sales": [
{"date": "2026-02-01", "sales": 8, "revenue": 19920},
{"date": "2026-02-02", "sales": 12, "revenue": 29880},
{"date": "2026-02-03", "sales": 15, "revenue": 37350}
// ... ещё 27 дней
]
}

Запрос 3: Позиция

# GET /api/v2/products/wildberries/456789012/position
curl -H "Authorization: Bearer {SF_API_KEY}" \
"https://salesfinder.ru/api/v2/products/wildberries/456789012/position"
{
"sku": 456789012,
"category_position": 45,
"category_total": 12500,
"position_change_7d": -3,
"position_change_30d": -12,
"search_positions": {
"кухонный нож": 12,
"нож шеф-повара": 8,
"нож для кухни": 23,
"профессиональный нож": 67,
"нож 20 см": null
}
}

Запрос 4: Ключевые слова

# GET /api/v2/products/wildberries/456789012/keywords
curl -H "Authorization: Bearer {SF_API_KEY}" \
"https://salesfinder.ru/api/v2/products/wildberries/456789012/keywords"
{
"sku": 456789012,
"keywords_count": 48,
"keywords": [
{"keyword": "кухонный нож", "frequency": 45000, "position": 12, "clicks": 3200},
{"keyword": "нож шеф-повара", "frequency": 18000, "position": 8, "clicks": 1500},
{"keyword": "нож для кухни", "frequency": 32000, "position": 23, "clicks": 2100},
{"keyword": "профессиональный нож", "frequency": 8000, "position": 67, "clicks": 320},
{"keyword": "нож 20 см", "frequency": 5000, "position": null, "clicks": null}
]
}

BeforeAfterTracker: полный класс

# src/engine/before_after.py

import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
from uuid import UUID, uuid4
import structlog

from src.api.sf_client import SalesFinderClient
from src.database.session import get_session
from src.engine.content_score import ContentScoreEngine

logger = structlog.get_logger()


class TrackStatus(str, Enum):
ACTIVE = "active"
COMPLETED = "completed"
CANCELLED = "cancelled"


class MeasurementType(str, Enum):
T0 = "t0" # Baseline (1 day before)
T1 = "t1" # +7 days
T2 = "t2" # +14 days
T3 = "t3" # +30 days
T4 = "t4" # +90 days


# Расписание замеров: тип → дней после обновления контента
MEASUREMENT_SCHEDULE = {
MeasurementType.T0: -1, # 1 день ДО обновления
MeasurementType.T1: 7,
MeasurementType.T2: 14,
MeasurementType.T3: 30,
MeasurementType.T4: 90,
}


@dataclass
class DeltaResult:
"""Результат сравнения двух замеров."""
metric_name: str
baseline_value: float
current_value: float
absolute_change: float
percentage_change: float
is_improvement: bool
significance: str # "significant", "marginal", "not_significant"

@property
def direction_emoji(self) -> str:
if self.is_improvement:
return "📈" if self.significance == "significant" else "↗️"
else:
return "📉" if self.significance == "significant" else "↘️"


@dataclass
class TrackSummary:
"""Итоговое резюме трека Before/After."""
product_sku: str
product_name: str
content_event_type: str
event_date: datetime
track_duration_days: int
deltas: list[DeltaResult]
roi_estimate: float | None
overall_verdict: str # "positive", "neutral", "negative"
confidence_level: float # 0.0-1.0

@property
def is_positive(self) -> bool:
return self.overall_verdict == "positive"


class BeforeAfterTracker:
"""
Движок трекинга Before/After.

Жизненный цикл:
1. Клиент регистрирует обновление контента (content_event)
2. Система собирает T0 baseline
3. По расписанию собирает T1, T2, T3, T4
4. После T4 — расчёт итогового ROI

Usage:
tracker = BeforeAfterTracker()

# 1. Создать трек
track_id = await tracker.create_track(product_id, event_id)

# 2. Собрать baseline (вызывается автоматически)
await tracker.collect_measurement(track_id, "t0")

# 3. Проверка и сбор по расписанию (Celery task)
await tracker.process_pending_measurements()

# 4. Получить текущий статус
summary = await tracker.get_summary(track_id)
"""

def __init__(self):
self.sf_client = SalesFinderClient()
self.score_engine = ContentScoreEngine()

async def create_track(
self,
product_id: UUID,
content_event_id: UUID,
) -> UUID:
"""
Создать новый Before/After трек.

Автоматически планирует первый замер (T0) через 1 день
или немедленно, если событие уже произошло.
"""
track_id = uuid4()

async with get_session() as session:
# Получаем event для расчёта расписания
event = await session.execute(
"SELECT * FROM content_events WHERE id = :id",
{"id": str(content_event_id)}
)
event = event.fetchone()

if not event:
raise ValueError(f"Content event {content_event_id} not found")

event_date = event.event_date

# Планируем T0 (baseline) — за 1 день до события
t0_date = event_date - timedelta(days=1)

# Если T0 в прошлом — собираем немедленно
if t0_date <= datetime.utcnow():
next_measurement = datetime.utcnow()
next_type = MeasurementType.T0
else:
next_measurement = t0_date
next_type = MeasurementType.T0

await session.execute(
"""
INSERT INTO before_after_tracks
(id, product_id, content_event_id, track_status,
next_measurement_at, next_measurement_type)
VALUES
(:id, :product_id, :event_id, :status,
:next_at, :next_type)
""",
{
"id": str(track_id),
"product_id": str(product_id),
"event_id": str(content_event_id),
"status": TrackStatus.ACTIVE.value,
"next_at": next_measurement,
"next_type": next_type.value,
}
)
await session.commit()

logger.info(
"ba_track_created",
track_id=str(track_id),
product_id=str(product_id),
next_measurement=next_measurement.isoformat(),
)
return track_id

async def collect_measurement(
self,
track_id: UUID,
measurement_type: MeasurementType,
) -> MeasurementSnapshot:
"""
Собрать замер для конкретной контрольной точки.

Делает 4 запроса к SF API и формирует полный снапшот.
"""
async with get_session() as session:
# Получаем трек и продукт
track = await session.execute(
"""
SELECT bat.*, p.sku, p.marketplace, p.name as product_name
FROM before_after_tracks bat
JOIN products p ON p.id = bat.product_id
WHERE bat.id = :id
""",
{"id": str(track_id)}
)
track = track.fetchone()

if not track:
raise ValueError(f"Track {track_id} not found")

# Собираем данные из SF API
async with self.sf_client as sf:
product_info, sales_data, position_data, keywords_data = await asyncio.gather(
sf.get_product(track.marketplace, int(track.sku)),
sf.get_product_sales(track.marketplace, int(track.sku), days=30),
sf.get_product_position(track.marketplace, int(track.sku)),
sf.get_product_keywords(track.marketplace, int(track.sku)),
)

# Формируем снапшот
snapshot = MeasurementSnapshot(
measured_at=datetime.utcnow(),
measurement_type=measurement_type.value,
category_position=product_info.category_position,
search_positions={kw.keyword: kw.position for kw in keywords_data if kw.position},
sales_7d=sum(d.sales for d in sales_data.daily_sales[-7:]),
revenue_7d=sum(d.revenue for d in sales_data.daily_sales[-7:]),
orders_7d=sales_data.orders_count,
sales_30d=sales_data.sales_count,
revenue_30d=sales_data.revenue,
rating=product_info.rating,
review_count=product_info.review_count,
review_count_delta=0, # Рассчитывается из предыдущего замера
photo_count=product_info.photo_count,
video_count=product_info.video_count,
has_infographics=product_info.photo_count >= 5, # Эвристика для MVP
description_length=len(product_info.description or ""),
price=product_info.price,
price_with_discount=product_info.price_with_discount or product_info.price,
content_score=0, # Рассчитывается ниже
content_score_details={},
category_avg_position=position_data.category_total / 2 if position_data.category_total else 0,
category_avg_price=0, # Из бенчмарков
category_avg_sales=0, # Из бенчмарков
)

# Рассчитываем ContentScore
score_result = self.score_engine.calculate(
product_info.to_dict(),
sales_data.to_dict(),
[kw.to_dict() for kw in keywords_data],
{}, # Бенчмарки — из БД
)
snapshot.content_score = score_result.total_score
snapshot.content_score_details = score_result.sub_scores.to_dict()

# Сохраняем в БД
await self._save_measurement(track_id, measurement_type, snapshot)

# Планируем следующий замер
await self._schedule_next(track_id, measurement_type)

logger.info(
"ba_measurement_collected",
track_id=str(track_id),
type=measurement_type.value,
content_score=snapshot.content_score,
position=snapshot.category_position,
sales_7d=snapshot.sales_7d,
)

return snapshot

async def process_pending_measurements(self) -> int:
"""
Обработать все замеры, время которых наступило.
Вызывается из Celery task каждый час.
"""
processed = 0

async with get_session() as session:
pending = await session.execute(
"""
SELECT id, next_measurement_type
FROM before_after_tracks
WHERE track_status = 'active'
AND next_measurement_at <= :now
ORDER BY next_measurement_at
LIMIT 50
""",
{"now": datetime.utcnow()}
)
pending_tracks = pending.fetchall()

for track in pending_tracks:
try:
await self.collect_measurement(
UUID(track.id),
MeasurementType(track.next_measurement_type),
)
processed += 1
except Exception as e:
logger.error(
"ba_measurement_failed",
track_id=track.id,
error=str(e),
)

return processed

async def calculate_delta(
self,
track_id: UUID,
from_type: MeasurementType = MeasurementType.T0,
to_type: MeasurementType = MeasurementType.T3,
) -> list[DeltaResult]:
"""
Рассчитать delta между двумя замерами.
По умолчанию: T0 (baseline) vs T3 (30 дней).
"""
async with get_session() as session:
track = await session.execute(
"SELECT * FROM before_after_tracks WHERE id = :id",
{"id": str(track_id)}
)
track = track.fetchone()

if not track:
raise ValueError(f"Track {track_id} not found")

# Получаем снапшоты
baseline = getattr(track, f"baseline_{from_type.value}")
current = getattr(track, f"measurement_{to_type.value}")

if not baseline or not current:
raise ValueError(f"Missing measurement: {from_type.value} or {to_type.value}")

# Рассчитываем delta по каждой метрике
deltas = []
metrics_to_compare = [
("Позиция в категории", "position.category", True), # True = lower is better
("Продажи (7 дней)", "sales.sales_7d", False),
("Выручка (7 дней)", "sales.revenue_7d", False),
("Продажи (30 дней)", "sales.sales_30d", False),
("Рейтинг", "reviews.rating", False),
("Количество отзывов", "reviews.count", False),
("ContentScore", "score.total", False),
]

for metric_name, json_path, lower_is_better in metrics_to_compare:
baseline_val = self._get_nested(baseline, json_path)
current_val = self._get_nested(current, json_path)

if baseline_val is None or current_val is None:
continue

absolute = current_val - baseline_val
pct = ((current_val - baseline_val) / max(abs(baseline_val), 0.001)) * 100

# Определяем, улучшение ли это
if lower_is_better:
is_improvement = current_val < baseline_val
else:
is_improvement = current_val > baseline_val

# Статистическая значимость (упрощённо для MVP)
if abs(pct) > 20:
significance = "significant"
elif abs(pct) > 5:
significance = "marginal"
else:
significance = "not_significant"

deltas.append(DeltaResult(
metric_name=metric_name,
baseline_value=round(baseline_val, 2),
current_value=round(current_val, 2),
absolute_change=round(absolute, 2),
percentage_change=round(pct, 1),
is_improvement=is_improvement,
significance=significance,
))

return deltas

async def calculate_roi(self, track_id: UUID) -> float | None:
"""
Расчёт ROI обновления контента.

ROI = (Дополнительная выручка - Стоимость контента) / Стоимость контента * 100%

Дополнительная выручка = (Revenue_T3 - Revenue_T0) * 3 (проекция на 90 дней)
"""
deltas = await self.calculate_delta(track_id)

revenue_delta = None
for d in deltas:
if d.metric_name == "Выручка (30 дней)":
revenue_delta = d

if not revenue_delta:
return None

# Дополнительная месячная выручка
additional_monthly = revenue_delta.absolute_change

# Проекция на 90 дней (с затуханием: 100% первый месяц, 80% второй, 60% третий)
projected_90d = additional_monthly * (1.0 + 0.8 + 0.6)

# Стоимость контента (берём из content_event)
async with get_session() as session:
track = await session.execute(
"""
SELECT ce.metadata->>'cost' as content_cost
FROM before_after_tracks bat
JOIN content_events ce ON ce.id = bat.content_event_id
WHERE bat.id = :id
""",
{"id": str(track_id)}
)
row = track.fetchone()

content_cost = float(row.content_cost) if row and row.content_cost else 30000.0 # default

if content_cost <= 0:
return None

roi = ((projected_90d - content_cost) / content_cost) * 100
return round(roi, 1)

# ----- Private methods -----

async def _save_measurement(
self, track_id: UUID, mtype: MeasurementType, snapshot: MeasurementSnapshot
):
"""Сохранить замер в БД."""
import json
snapshot_json = json.dumps(snapshot.to_dict(), ensure_ascii=False, default=str)

field_map = {
MeasurementType.T0: "baseline_t0",
MeasurementType.T1: "measurement_t1",
MeasurementType.T2: "measurement_t2",
MeasurementType.T3: "measurement_t3",
MeasurementType.T4: "measurement_t4",
}

field_name = field_map[mtype]

async with get_session() as session:
await session.execute(
f"UPDATE before_after_tracks SET {field_name} = :data WHERE id = :id",
{"data": snapshot_json, "id": str(track_id)}
)
await session.commit()

async def _schedule_next(self, track_id: UUID, current_type: MeasurementType):
"""Запланировать следующий замер."""
type_order = [
MeasurementType.T0, MeasurementType.T1, MeasurementType.T2,
MeasurementType.T3, MeasurementType.T4,
]
current_idx = type_order.index(current_type)

async with get_session() as session:
if current_idx >= len(type_order) - 1:
# Последний замер (T4) — завершаем трек
deltas = await self.calculate_delta(track_id)
roi = await self.calculate_roi(track_id)
delta_json = [d.__dict__ for d in deltas]

await session.execute(
"""
UPDATE before_after_tracks
SET track_status = 'completed',
completed_at = :now,
delta_summary = :deltas,
roi_calculated = :roi,
next_measurement_at = NULL,
next_measurement_type = NULL
WHERE id = :id
""",
{
"id": str(track_id),
"now": datetime.utcnow(),
"deltas": str(delta_json),
"roi": roi,
}
)
else:
# Планируем следующий
next_type = type_order[current_idx + 1]
days_offset = MEASUREMENT_SCHEDULE[next_type]

# Получаем дату события
track = await session.execute(
"""
SELECT ce.event_date
FROM before_after_tracks bat
JOIN content_events ce ON ce.id = bat.content_event_id
WHERE bat.id = :id
""",
{"id": str(track_id)}
)
row = track.fetchone()
event_date = row.event_date

next_date = event_date + timedelta(days=days_offset)

await session.execute(
"""
UPDATE before_after_tracks
SET next_measurement_at = :next_at,
next_measurement_type = :next_type
WHERE id = :id
""",
{
"id": str(track_id),
"next_at": next_date,
"next_type": next_type.value,
}
)

await session.commit()

@staticmethod
def _get_nested(data: dict, path: str):
"""Получить вложенное значение по dot-path."""
keys = path.split(".")
value = data
for key in keys:
if isinstance(value, dict):
value = value.get(key)
else:
return None
return value

Статистическая значимость

Проблема: шум vs реальный эффект

Продажи на маркетплейсах крайне волатильны. Изменение на +15% может быть просто случайным колебанием. Нужно отличать реальный эффект от шума.

Подход для MVP: Bootstrap Confidence Interval

import numpy as np
from scipy import stats


def is_change_significant(
daily_sales_before: list[float], # Дневные продажи ДО обновления (14-30 дней)
daily_sales_after: list[float], # Дневные продажи ПОСЛЕ обновления (14-30 дней)
confidence: float = 0.95, # Уровень доверия
) -> dict:
"""
Проверяет статистическую значимость изменения продаж.

Использует:
1. Welch's t-test (parametric)
2. Mann-Whitney U test (non-parametric)
3. Bootstrap confidence interval

Возвращает:
{
"is_significant": bool,
"p_value": float,
"confidence_interval": (lower, upper),
"effect_size": float,
"method": str,
}
"""
before = np.array(daily_sales_before)
after = np.array(daily_sales_after)

# Минимальный размер выборки
if len(before) < 7 or len(after) < 7:
return {
"is_significant": False,
"p_value": 1.0,
"confidence_interval": (None, None),
"effect_size": 0,
"method": "insufficient_data",
"message": "Нужно минимум 7 дней данных до и после изменения"
}

# 1. Welch's t-test
t_stat, p_value = stats.ttest_ind(after, before, equal_var=False)

# 2. Effect size (Cohen's d)
pooled_std = np.sqrt((before.std()**2 + after.std()**2) / 2)
effect_size = (after.mean() - before.mean()) / pooled_std if pooled_std > 0 else 0

# 3. Bootstrap confidence interval для разницы средних
n_bootstrap = 10000
diffs = []
combined = np.concatenate([before, after])

for _ in range(n_bootstrap):
boot_before = np.random.choice(before, size=len(before), replace=True)
boot_after = np.random.choice(after, size=len(after), replace=True)
diffs.append(boot_after.mean() - boot_before.mean())

diffs = np.array(diffs)
ci_lower = np.percentile(diffs, (1 - confidence) / 2 * 100)
ci_upper = np.percentile(diffs, (1 + confidence) / 2 * 100)

# Значимо, если p < 0.05 И доверительный интервал не содержит 0
is_significant = p_value < (1 - confidence) and (ci_lower > 0 or ci_upper < 0)

# Интерпретация effect size
if abs(effect_size) < 0.2:
effect_label = "negligible"
elif abs(effect_size) < 0.5:
effect_label = "small"
elif abs(effect_size) < 0.8:
effect_label = "medium"
else:
effect_label = "large"

return {
"is_significant": is_significant,
"p_value": round(p_value, 4),
"confidence_interval": (round(ci_lower, 2), round(ci_upper, 2)),
"effect_size": round(effect_size, 3),
"effect_label": effect_label,
"mean_before": round(before.mean(), 1),
"mean_after": round(after.mean(), 1),
"change_pct": round((after.mean() - before.mean()) / before.mean() * 100, 1) if before.mean() > 0 else 0,
"method": "welch_t_test + bootstrap_ci",
}

Минимальные размеры выборки

Ожидаемый эффектМин. дней данныхConfidence
Большой (> 50% рост)7 дней до + 7 дней после95%
Средний (20-50% рост)14 дней до + 14 дней после95%
Малый (5-20% рост)30 дней до + 30 дней после90%

Confounding factors: что может исказить результат

Факторы и как их учитываем

flowchart TB
CF["Confounding Factors"]
CF --> S["Сезонность<br/>Праздники, погода"]
CF --> P["Промо-акции<br/>Скидки WB/Ozon"]
CF --> C["Конкуренты<br/>Новые товары, цены"]
CF --> PR["Цена<br/>Изменение цены товара"]
CF --> ST["Остатки<br/>Out of stock"]

S --> S_FIX["Коррекция:<br/>YoY сравнение"]
P --> P_FIX["Коррекция:<br/>Исключение промо-дней"]
C --> C_FIX["Коррекция:<br/>Категорийный индекс"]
PR --> PR_FIX["Коррекция:<br/>Фиксация цены<br/>в условиях замера"]
ST --> ST_FIX["Коррекция:<br/>Нормализация<br/>по дням в наличии"]

style CF fill:#FF5722,color:#fff
style S_FIX fill:#4CAF50,color:#fff
style P_FIX fill:#4CAF50,color:#fff
style C_FIX fill:#4CAF50,color:#fff
style PR_FIX fill:#4CAF50,color:#fff
style ST_FIX fill:#4CAF50,color:#fff

1. Сезонность

def adjust_for_seasonality(
current_sales: list[float],
same_period_last_year: list[float] | None,
category_trend: float, # Рост/падение категории в %
) -> list[float]:
"""
Корректировка на сезонность.

Метод: вычитаем категорийный тренд.
Если продажи категории выросли на 20%, а товара на 30% —
реальный эффект контента = 10%.
"""
if category_trend != 0:
# Нормализуем продажи на категорийный тренд
adjustment_factor = 1.0 / (1.0 + category_trend / 100)
adjusted = [s * adjustment_factor for s in current_sales]
return adjusted
return current_sales

2. Промо-акции

def exclude_promo_days(
daily_sales: list[dict], # [{date, sales, revenue, price}]
normal_price: float,
discount_threshold: float = 0.1, # 10% скидка = промо
) -> list[dict]:
"""
Исключаем дни с активными промо-акциями.
Промо искажает продажи и не связано с качеством контента.
"""
non_promo = []
for day in daily_sales:
discount = (normal_price - day["price"]) / normal_price
if discount < discount_threshold:
non_promo.append(day)
else:
logger.debug("promo_day_excluded", date=day["date"], discount=f"{discount:.0%}")

return non_promo

3. Категорийный индекс

Категорийный индекс

Для нейтрализации общих рыночных трендов сравниваем изменение товара с изменением всей категории. Если категория "Ножи" выросла на 25% за месяц (предновогодний сезон), а наш товар на 35% — реальный эффект контента = +10%.

def calculate_category_adjusted_growth(
product_sales_before: float,
product_sales_after: float,
category_sales_before: float,
category_sales_after: float,
) -> float:
"""
Рост товара с поправкой на категорию.

Formula:
Adjusted Growth = Product Growth - Category Growth

Example:
Product: 100 → 135 (+35%)
Category: 1000 → 1250 (+25%)
Adjusted: 35% - 25% = +10% (реальный эффект контента)
"""
if product_sales_before <= 0 or category_sales_before <= 0:
return 0.0

product_growth = (product_sales_after - product_sales_before) / product_sales_before
category_growth = (category_sales_after - category_sales_before) / category_sales_before

adjusted = product_growth - category_growth
return round(adjusted * 100, 1) # В процентах

Практический пример: 90-дневный цикл

Товар: Набор полотенец "SoftHome" (WB SKU 789012345)

Событие: Полная пересъёмка карточки (7 новых фото, видеообзор, инфографика) Стоимость контента: 45 000 руб.

МетрикаT0 (baseline)T1 (+7д)T2 (+14д)T3 (+30д)T4 (+90д)
Позиция#142#138#89#52#38
Продажи/день3.22.8 (-13%)5.1 (+59%)7.8 (+144%)8.5 (+166%)
Выручка/день5 760 руб.5 040 руб.9 180 руб.14 040 руб.15 300 руб.
Рейтинг4.34.34.44.54.6
Отзывов8992103128184
ContentScore3462687882
T1 — Просадка после обновления

Обратите внимание: на T1 (через 7 дней) продажи снизились на 13%. Это нормально! Маркетплейс переиндексирует карточку, алгоритм ранжирования пересчитывает скоры. Клиента нужно предупреждать об этом заранее.

ROI расчёт

Дополнительная выручка (30 дней):
T3 revenue/day - T0 revenue/day = 14 040 - 5 760 = 8 280 руб./день
За 30 дней: 8 280 × 30 = 248 400 руб.

Проекция на 90 дней (с затуханием):
Месяц 1: 248 400 руб.
Месяц 2: 248 400 × 0.8 = 198 720 руб.
Месяц 3: 248 400 × 0.6 = 149 040 руб.
Итого 90 дней: 596 160 руб.

ROI = (596 160 - 45 000) / 45 000 × 100% = 1 225%

Payback period: 45 000 / 8 280 = 5.4 дня

Визуализация trajectory

Продажи/день

9 │ ●─────●
│ ●──
7 │ ●───

5 │ ●───

3 │ ●─────● ●
│ ●─────●
1 │
└──┬──────┬──────┬──────┬──────┬──────────────
T0 T1 T2 T3 T4
base +7д +14д +30д +90д

← Просадка →← Рост →← Стабилизация →

Автоматические уведомления

Когда отправляем уведомления клиенту

СобытиеКаналШаблон
T0 baseline собранEmail"Начали отслеживание товара {name}, baseline зафиксирован"
T1 — просадкаTelegram"Нормальная просадка после обновления. Это временно, следующий замер через 7 дней"
T2 — рост началсяEmail + Telegram"Первые результаты: позиция {pos_before}{pos_after}, продажи {change}%"
T3 — итоги 30 днейEmail (PDF)Полный отчёт Before/After с ROI
T4 — долгосрочные итогиEmail (PDF)Итоговый отчёт с рекомендациями на следующий цикл
# Шаблон уведомления T3
T3_NOTIFICATION_TEMPLATE = """
📊 **Before/After Отчёт — 30 дней**

Товар: {product_name}
SKU: {sku}
Обновление контента: {event_date}

📈 **Результаты:**
• Позиция: {pos_before} → {pos_after} ({pos_change})
• Продажи/день: {sales_before} → {sales_after} ({sales_change})
• Выручка/день: {rev_before} → {rev_after} ({rev_change})
• ContentScore: {score_before} → {score_after}
• Рейтинг: {rating_before} → {rating_after}

💰 **ROI:**
• Стоимость контента: {content_cost} руб.
• Дополнительная выручка (30д): {additional_revenue} руб.
• ROI: {roi}%
• Окупаемость: {payback_days} дней

📋 Полный отчёт: {report_url}

Следующий замер через 60 дней (T4 — долгосрочный эффект).
"""

Связанные документы

ДокументСодержание
MVP ОбзорВидение, roadmap, бюджет
Техническая архитектураКомпоненты, схема БД
ContentScore алгоритмФормула скоринга
Клиентский дашбордВизуализация Before/After данных
Финансовая модельROI и unit-экономика