Перейти к основному содержимому

Техническая архитектура

Принцип архитектуры

Simple first, scale later. MVP строится как монолит с чёткими модульными границами. Каждый компонент можно вынести в отдельный сервис, когда нагрузка того потребует. До 100 клиентов — монолит достаточен.

Обзор компонентов

graph TB
subgraph "External APIs"
SF_API["SalesFinder API<br/>REST, JSON<br/>100 req/min"]
WB_API["Wildberries API<br/>(через SF прокси)"]
OZ_API["Ozon API<br/>(через SF прокси)"]
end

subgraph "Ingestion Layer"
API_CLIENT["SF API Client<br/>httpx + retry<br/>Rate Limiter"]
CELERY_BEAT["Celery Beat<br/>Scheduler<br/>cron-расписание"]
CELERY_WORKER["Celery Workers<br/>x3 воркера<br/>prefork pool"]
end

subgraph "Processing Layer"
NORMALIZER["Data Normalizer<br/>Pydantic v2 models<br/>Валидация + очистка"]
CS_ENGINE["ContentScore Engine<br/>5 суб-скоров<br/>Калибровка весов"]
BA_ENGINE["Before/After Engine<br/>Планировщик замеров<br/>Delta-расчёт"]
REPORT_GEN["Report Generator<br/>WeasyPrint<br/>Jinja2 шаблоны"]
end

subgraph "Storage Layer"
PG["PostgreSQL 16<br/>+ TimescaleDB<br/>Основное хранилище"]
REDIS["Redis 7<br/>Кэш + очереди<br/>TTL 6 часов"]
S3["S3/MinIO<br/>PDF отчёты<br/>Статика"]
end

subgraph "Presentation Layer"
STREAMLIT["Streamlit App<br/>Клиентский дашборд<br/>Auth по токену"]
FAST_API["FastAPI<br/>REST API<br/>Интеграция"]
EMAIL["Email Service<br/>SendGrid<br/>Авто-отчёты"]
end

SF_API --> API_CLIENT
WB_API -.-> SF_API
OZ_API -.-> SF_API

CELERY_BEAT --> CELERY_WORKER
CELERY_WORKER --> API_CLIENT
API_CLIENT --> NORMALIZER
NORMALIZER --> PG
NORMALIZER --> REDIS

PG --> CS_ENGINE
PG --> BA_ENGINE
CS_ENGINE --> PG
BA_ENGINE --> PG
CS_ENGINE --> REPORT_GEN
BA_ENGINE --> REPORT_GEN

PG --> STREAMLIT
PG --> FAST_API
REDIS --> STREAMLIT
REDIS --> FAST_API
REPORT_GEN --> S3
REPORT_GEN --> EMAIL
S3 --> STREAMLIT

style SF_API fill:#4CAF50,color:#fff
style PG fill:#336791,color:#fff
style REDIS fill:#DC382D,color:#fff
style CS_ENGINE fill:#2196F3,color:#fff
style STREAMLIT fill:#FF4B4B,color:#fff

Технологический стек

Core

КомпонентТехнологияВерсияОбоснование
ЯзыкPython3.11+Экосистема data science, быстрая разработка
API FrameworkFastAPI0.109+Async, автодокументация, Pydantic нативно
ORMSQLAlchemy2.0+Async support, отличная работа с PostgreSQL
ВалидацияPydanticv2Быстрая валидация, сериализация
HTTP клиентhttpx0.27+Async, HTTP/2, retry из коробки

Data & Storage

КомпонентТехнологияВерсияОбоснование
СУБДPostgreSQL16JSONB, оконные функции, надёжность
Time seriesTimescaleDB2.14+Гипертаблицы, компрессия, continuous aggregates
Кэш/ОчередиRedis7Кэш API, Celery broker, rate limiter
МиграцииAlembic1.13+Проверенный инструмент, batch mode
Объектное хранилищеMinIO/S3PDF отчёты, бэкапы

Workers & Scheduling

КомпонентТехнологияВерсияОбоснование
Task QueueCelery5.3+Надёжные воркеры, cron-расписание
SchedulerCelery Beat5.3+Переодический polling SF API
MonitoringFlower2.0+Web-UI для Celery мониторинга

Frontend & Visualization

КомпонентТехнологияВерсияОбоснование
DashboardStreamlit1.31+Быстрый UI для data apps
ГрафикиPlotly5.18+Интерактивные, красивые
PDFWeasyPrint61+HTML→PDF, CSS поддержка
ШаблоныJinja23.1+Шаблоны отчётов

DevOps

КомпонентТехнологияОбоснование
КонтейнеризацияDocker + ComposeОдинаковое окружение
CI/CDGitHub ActionsАвтоматический деплой
МониторингSentry + PrometheusОшибки + метрики
Логированиеstructlog + LokiСтруктурированные логи

Структура проекта

roi-content-mvp/
├── docker-compose.yml # Все сервисы
├── docker-compose.prod.yml # Прод оверрайды
├── Dockerfile # Python app
├── .env.example # Шаблон конфига
├── alembic/ # Миграции БД
│ ├── alembic.ini
│ ├── env.py
│ └── versions/
│ ├── 001_initial_schema.py
│ └── 002_add_timescale_hypertables.py
├── src/
│ ├── __init__.py
│ ├── config/
│ │ ├── __init__.py
│ │ └── settings.py # Pydantic Settings
│ ├── api/
│ │ ├── __init__.py
│ │ ├── sf_client.py # SalesFinder API клиент
│ │ ├── rate_limiter.py # Token bucket rate limiter
│ │ └── models.py # SF API response models
│ ├── database/
│ │ ├── __init__.py
│ │ ├── models.py # SQLAlchemy models
│ │ ├── session.py # Session factory
│ │ └── repositories/
│ │ ├── products.py # Product CRUD
│ │ ├── measurements.py # Measurement CRUD
│ │ └── scores.py # ContentScore CRUD
│ ├── engine/
│ │ ├── __init__.py
│ │ ├── content_score.py # ContentScore calculator
│ │ ├── before_after.py # Before/After tracker
│ │ ├── normalizer.py # Data normalization
│ │ └── benchmarks.py # Category benchmarks
│ ├── workers/
│ │ ├── __init__.py
│ │ ├── app.py # Celery app
│ │ ├── tasks.py # Celery tasks
│ │ └── schedules.py # Beat schedule config
│ ├── reports/
│ │ ├── __init__.py
│ │ ├── generator.py # Report engine
│ │ ├── templates/
│ │ │ ├── weekly_report.html
│ │ │ └── monthly_report.html
│ │ └── email_sender.py # SendGrid integration
│ ├── dashboard/
│ │ ├── app.py # Streamlit main app
│ │ ├── pages/
│ │ │ ├── 01_overview.py
│ │ │ ├── 02_products.py
│ │ │ ├── 03_before_after.py
│ │ │ └── 04_reports.py
│ │ ├── components/
│ │ │ ├── score_gauge.py
│ │ │ ├── sales_chart.py
│ │ │ └── position_heatmap.py
│ │ └── auth.py # Token auth
│ └── rest_api/
│ ├── __init__.py
│ ├── app.py # FastAPI app
│ ├── routers/
│ │ ├── products.py
│ │ ├── scores.py
│ │ └── reports.py
│ └── deps.py # Dependencies
├── tests/
│ ├── conftest.py
│ ├── test_sf_client.py
│ ├── test_content_score.py
│ ├── test_before_after.py
│ └── test_api.py
└── scripts/
├── seed_test_data.py
└── benchmark_scoring.py

Data Flow: от API до дашборда

1. Ingestion Flow (сбор данных)

sequenceDiagram
participant Beat as Celery Beat
participant Worker as Celery Worker
participant SF as SalesFinder API
participant Redis as Redis Cache
participant PG as PostgreSQL

Beat->>Worker: trigger poll_products (каждые 6ч)
Worker->>PG: SELECT products WHERE active=true
PG-->>Worker: [product_1, product_2, ...]

loop Для каждого продукта (batch по 10)
Worker->>Redis: GET cache:sf:{sku}
alt Cache hit (TTL не истёк)
Redis-->>Worker: cached_data
else Cache miss
Worker->>SF: POST /api/products/get {sku}
SF-->>Worker: product_data
Worker->>Redis: SET cache:sf:{sku} TTL=21600
Worker->>Worker: rate_limit.wait()
end
Worker->>Worker: normalize(data)
Worker->>PG: INSERT INTO measurements (...)
end

Worker->>Worker: calculate_content_scores()
Worker->>PG: INSERT INTO content_scores (...)
Worker->>Beat: task_complete(stats)

2. Scoring Flow (расчёт ContentScore)

flowchart LR
subgraph "Input Data"
M["measurements<br/>(позиция, продажи,<br/>рейтинг, отзывы)"]
P["product_info<br/>(фото, видео,<br/>текст, ключи)"]
C["category_benchmarks<br/>(средние по категории)"]
end

subgraph "Sub-Scores"
PS["PositionScore<br/>25%"]
SS["SalesScore<br/>25%"]
KC["KeywordCoverage<br/>20%"]
VC["VisualComplete<br/>15%"]
CP["CompetitivePos<br/>15%"]
end

subgraph "Output"
CS["ContentScore<br/>0-100"]
REC["Recommendations<br/>Что улучшить"]
end

M --> PS
M --> SS
P --> KC
P --> VC
M --> CP
C --> CP

PS --> CS
SS --> CS
KC --> CS
VC --> CS
CP --> CS
CS --> REC

3. Reporting Flow (генерация отчётов)

sequenceDiagram
participant Beat as Celery Beat
participant Worker as Celery Worker
participant PG as PostgreSQL
participant S3 as S3/MinIO
participant Email as SendGrid

Beat->>Worker: trigger generate_weekly_reports (пн 9:00)
Worker->>PG: SELECT clients WHERE report_day = 'monday'
PG-->>Worker: [client_1, client_2, ...]

loop Для каждого клиента
Worker->>PG: SELECT scores, measurements<br/>WHERE client_id = X AND period = last_week
PG-->>Worker: weekly_data
Worker->>Worker: render_template(weekly_report.html, data)
Worker->>Worker: weasyprint.HTML(html).write_pdf()
Worker->>S3: upload(report.pdf)
S3-->>Worker: report_url
Worker->>Email: send(client.email, report_url)
Worker->>PG: INSERT INTO reports (client_id, url, sent_at)
end

Схема базы данных

ER-диаграмма

erDiagram
CLIENTS {
uuid id PK
varchar name
varchar email
varchar phone
varchar api_token UK
varchar plan_tier
boolean is_active
timestamp created_at
timestamp updated_at
}

PRODUCTS {
uuid id PK
uuid client_id FK
varchar sku UK
varchar marketplace
varchar name
varchar category
varchar brand
jsonb metadata
boolean is_active
timestamp created_at
}

MEASUREMENTS {
bigint id PK
uuid product_id FK
timestamp measured_at
integer position
integer sales_count
decimal revenue
decimal price
float rating
integer review_count
integer photo_count
integer video_count
integer keyword_coverage_pct
jsonb raw_data
}

CONTENT_SCORES {
bigint id PK
uuid product_id FK
timestamp scored_at
float total_score
float position_score
float sales_score
float keyword_score
float visual_score
float competitive_score
jsonb weights
jsonb details
}

CONTENT_EVENTS {
uuid id PK
uuid product_id FK
varchar event_type
timestamp event_date
varchar description
jsonb before_state
jsonb after_state
uuid created_by FK
}

BEFORE_AFTER_TRACKS {
uuid id PK
uuid product_id FK
uuid content_event_id FK
varchar track_status
jsonb baseline_t0
jsonb measurement_t1
jsonb measurement_t2
jsonb measurement_t3
jsonb measurement_t4
timestamp next_measurement_at
jsonb delta_summary
}

REPORTS {
uuid id PK
uuid client_id FK
varchar report_type
varchar period
varchar file_url
jsonb summary
timestamp generated_at
timestamp sent_at
}

CATEGORY_BENCHMARKS {
varchar category PK
varchar marketplace PK
float avg_position
float avg_sales
float avg_rating
float avg_photos
integer sample_size
timestamp updated_at
}

CLIENTS ||--o{ PRODUCTS : "has"
PRODUCTS ||--o{ MEASUREMENTS : "tracked"
PRODUCTS ||--o{ CONTENT_SCORES : "scored"
PRODUCTS ||--o{ CONTENT_EVENTS : "events"
CONTENT_EVENTS ||--o{ BEFORE_AFTER_TRACKS : "tracked"
CLIENTS ||--o{ REPORTS : "receives"

SQL Schema

-- =============================================================
-- Основные таблицы
-- =============================================================

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- Клиенты
CREATE TABLE clients (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
phone VARCHAR(50),
company VARCHAR(255),
api_token VARCHAR(64) UNIQUE NOT NULL DEFAULT encode(gen_random_bytes(32), 'hex'),
plan_tier VARCHAR(20) NOT NULL DEFAULT 'start'
CHECK (plan_tier IN ('audit', 'start', 'growth', 'scale', 'enterprise')),
max_products INTEGER NOT NULL DEFAULT 10,
is_active BOOLEAN NOT NULL DEFAULT true,
settings JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_clients_api_token ON clients(api_token);
CREATE INDEX idx_clients_active ON clients(is_active) WHERE is_active = true;

-- Продукты (товары клиентов на маркетплейсах)
CREATE TABLE products (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE,
sku VARCHAR(50) NOT NULL,
marketplace VARCHAR(20) NOT NULL CHECK (marketplace IN ('wildberries', 'ozon')),
name VARCHAR(500),
category VARCHAR(255),
brand VARCHAR(255),
url VARCHAR(1000),
metadata JSONB DEFAULT '{}',
is_active BOOLEAN NOT NULL DEFAULT true,
monitoring_started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),

UNIQUE(sku, marketplace)
);

CREATE INDEX idx_products_client ON products(client_id);
CREATE INDEX idx_products_sku ON products(sku);
CREATE INDEX idx_products_active ON products(is_active) WHERE is_active = true;
CREATE INDEX idx_products_marketplace ON products(marketplace);

-- Замеры метрик (TimescaleDB hypertable)
CREATE TABLE measurements (
id BIGSERIAL,
product_id UUID NOT NULL REFERENCES products(id) ON DELETE CASCADE,
measured_at TIMESTAMPTZ NOT NULL DEFAULT now(),

-- Позиционные метрики
position INTEGER, -- Позиция в категории
position_in_search INTEGER, -- Позиция в поиске

-- Продажи
sales_count INTEGER DEFAULT 0, -- Продажи за период
sales_revenue DECIMAL(15,2), -- Выручка за период
orders_count INTEGER DEFAULT 0, -- Заказы

-- Ценовые метрики
price DECIMAL(10,2), -- Текущая цена
price_with_discount DECIMAL(10,2), -- Цена со скидкой
discount_pct SMALLINT DEFAULT 0, -- Процент скидки

-- Рейтинг и отзывы
rating FLOAT, -- Рейтинг 0-5
review_count INTEGER DEFAULT 0, -- Количество отзывов
review_count_new INTEGER DEFAULT 0, -- Новых отзывов за период

-- Контент метрики
photo_count SMALLINT DEFAULT 0, -- Фото на карточке
video_count SMALLINT DEFAULT 0, -- Видео на карточке
has_360 BOOLEAN DEFAULT false, -- Есть 360-фото
has_infographics BOOLEAN DEFAULT false, -- Есть инфографика
description_length INTEGER DEFAULT 0, -- Длина описания
keyword_coverage_pct FLOAT DEFAULT 0, -- % покрытия ключевых слов

-- Конкурентные метрики
category_position_pct FLOAT, -- Перцентиль в категории (0-100)
price_vs_category_avg FLOAT, -- Цена / средняя по категории

-- Сырые данные SF API
raw_data JSONB,

PRIMARY KEY (id, measured_at)
);

-- TimescaleDB: превращаем в гипертаблицу
SELECT create_hypertable('measurements', 'measured_at',
chunk_time_interval => INTERVAL '7 days'
);

CREATE INDEX idx_measurements_product ON measurements(product_id, measured_at DESC);
CREATE INDEX idx_measurements_product_time ON measurements(product_id, measured_at);

-- ContentScore — расчётные скоры
CREATE TABLE content_scores (
id BIGSERIAL,
product_id UUID NOT NULL REFERENCES products(id) ON DELETE CASCADE,
scored_at TIMESTAMPTZ NOT NULL DEFAULT now(),

-- Итоговый скор
total_score FLOAT NOT NULL CHECK (total_score >= 0 AND total_score <= 100),

-- Суб-скоры (каждый 0-100)
position_score FLOAT NOT NULL DEFAULT 0,
sales_score FLOAT NOT NULL DEFAULT 0,
keyword_score FLOAT NOT NULL DEFAULT 0,
visual_score FLOAT NOT NULL DEFAULT 0,
competitive_score FLOAT NOT NULL DEFAULT 0,

-- Весовые коэффициенты (для аудита)
weights JSONB NOT NULL DEFAULT '{"position": 0.25, "sales": 0.25, "keyword": 0.20, "visual": 0.15, "competitive": 0.15}',

-- Детали расчёта (для отладки)
details JSONB DEFAULT '{}',

-- Рекомендации (generated)
recommendations JSONB DEFAULT '[]',

PRIMARY KEY (id, scored_at)
);

SELECT create_hypertable('content_scores', 'scored_at',
chunk_time_interval => INTERVAL '7 days'
);

CREATE INDEX idx_scores_product ON content_scores(product_id, scored_at DESC);

-- События контента (обновление фото, видео, текстов)
CREATE TABLE content_events (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
product_id UUID NOT NULL REFERENCES products(id) ON DELETE CASCADE,
event_type VARCHAR(50) NOT NULL
CHECK (event_type IN ('photo_update', 'video_update', 'text_update',
'infographic_add', 'full_redesign', 'seo_optimization',
'price_change', 'other')),
event_date TIMESTAMPTZ NOT NULL DEFAULT now(),
description TEXT,
before_state JSONB, -- Снапшот до изменения
after_state JSONB, -- Снапшот после изменения
created_by UUID REFERENCES clients(id),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_events_product ON content_events(product_id, event_date DESC);
CREATE INDEX idx_events_type ON content_events(event_type);

-- Before/After трекинг
CREATE TABLE before_after_tracks (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
product_id UUID NOT NULL REFERENCES products(id) ON DELETE CASCADE,
content_event_id UUID NOT NULL REFERENCES content_events(id) ON DELETE CASCADE,

track_status VARCHAR(20) NOT NULL DEFAULT 'active'
CHECK (track_status IN ('active', 'completed', 'cancelled')),

-- Замеры на контрольных точках (JSONB с полными снапшотами)
baseline_t0 JSONB, -- За 1 день до изменения
measurement_t1 JSONB, -- +7 дней
measurement_t2 JSONB, -- +14 дней
measurement_t3 JSONB, -- +30 дней
measurement_t4 JSONB, -- +90 дней

-- Расписание следующего замера
next_measurement_at TIMESTAMPTZ,
next_measurement_type VARCHAR(5), -- t1, t2, t3, t4

-- Результат
delta_summary JSONB, -- Итоговое сравнение
roi_calculated DECIMAL(10,2), -- Расчётный ROI

created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
completed_at TIMESTAMPTZ
);

CREATE INDEX idx_ba_tracks_status ON before_after_tracks(track_status)
WHERE track_status = 'active';
CREATE INDEX idx_ba_tracks_next ON before_after_tracks(next_measurement_at)
WHERE track_status = 'active';

-- Отчёты
CREATE TABLE reports (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE,
report_type VARCHAR(30) NOT NULL
CHECK (report_type IN ('weekly', 'monthly', 'quarterly', 'ad_hoc', 'before_after')),
period_start DATE,
period_end DATE,
file_url VARCHAR(1000),
file_size_bytes INTEGER,
summary JSONB DEFAULT '{}',
generated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
sent_at TIMESTAMPTZ,
opened_at TIMESTAMPTZ -- Tracking: когда клиент открыл
);

CREATE INDEX idx_reports_client ON reports(client_id, generated_at DESC);

-- Бенчмарки по категориям
CREATE TABLE category_benchmarks (
category VARCHAR(255) NOT NULL,
marketplace VARCHAR(20) NOT NULL,

-- Средние показатели
avg_position FLOAT,
median_position FLOAT,
avg_sales_per_day FLOAT,
avg_revenue_per_day DECIMAL(12,2),
avg_rating FLOAT,
avg_review_count FLOAT,
avg_photo_count FLOAT,
avg_price DECIMAL(10,2),

-- Распределение
p25_position INTEGER, -- 25-й перцентиль позиции
p75_position INTEGER, -- 75-й перцентиль
p90_position INTEGER, -- 90-й перцентиль (топ 10%)

-- Мета
sample_size INTEGER NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),

PRIMARY KEY (category, marketplace)
);

-- =============================================================
-- Continuous Aggregates (TimescaleDB)
-- =============================================================

-- Дневные агрегаты продаж
CREATE MATERIALIZED VIEW daily_sales_agg
WITH (timescaledb.continuous) AS
SELECT
product_id,
time_bucket('1 day', measured_at) AS day,
AVG(position) AS avg_position,
SUM(sales_count) AS total_sales,
SUM(sales_revenue) AS total_revenue,
AVG(rating) AS avg_rating,
MAX(review_count) AS max_reviews,
MAX(photo_count) AS max_photos
FROM measurements
GROUP BY product_id, time_bucket('1 day', measured_at)
WITH NO DATA;

-- Недельные агрегаты
CREATE MATERIALIZED VIEW weekly_sales_agg
WITH (timescaledb.continuous) AS
SELECT
product_id,
time_bucket('7 days', measured_at) AS week,
AVG(position) AS avg_position,
SUM(sales_count) AS total_sales,
SUM(sales_revenue) AS total_revenue,
AVG(rating) AS avg_rating
FROM measurements
GROUP BY product_id, time_bucket('7 days', measured_at)
WITH NO DATA;

-- Политики обновления
SELECT add_continuous_aggregate_policy('daily_sales_agg',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);

SELECT add_continuous_aggregate_policy('weekly_sales_agg',
start_offset => INTERVAL '14 days',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day'
);

-- Политика компрессии (данные старше 30 дней)
ALTER TABLE measurements SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'product_id',
timescaledb.compress_orderby = 'measured_at DESC'
);

SELECT add_compression_policy('measurements', INTERVAL '30 days');

-- Политика ретенции (удаление данных старше 2 лет)
SELECT add_retention_policy('measurements', INTERVAL '2 years');

SalesFinder API клиент

Rate Limiting

SF API ограничение: 100 запросов в минуту (по API-ключу).

# src/api/rate_limiter.py

import asyncio
import time
from collections import deque
from dataclasses import dataclass, field


@dataclass
class TokenBucketRateLimiter:
"""
Token bucket rate limiter для SalesFinder API.

Гарантирует не более max_requests запросов за window секунд.
Использует sliding window для более равномерного распределения.
"""
max_requests: int = 90 # 90 из 100 (запас 10%)
window_seconds: float = 60.0 # Окно в секундах
_timestamps: deque = field(default_factory=deque, init=False)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)

async def acquire(self) -> float:
"""
Получить разрешение на запрос.
Возвращает время ожидания (0.0 если запрос выполнен сразу).
"""
async with self._lock:
now = time.monotonic()

# Удаляем устаревшие timestamps
while self._timestamps and (now - self._timestamps[0]) > self.window_seconds:
self._timestamps.popleft()

if len(self._timestamps) >= self.max_requests:
# Нужно подождать
oldest = self._timestamps[0]
wait_time = self.window_seconds - (now - oldest) + 0.1
await asyncio.sleep(wait_time)
now = time.monotonic()

# Повторная очистка после ожидания
while self._timestamps and (now - self._timestamps[0]) > self.window_seconds:
self._timestamps.popleft()

self._timestamps.append(now)
return 0.0

@property
def available_tokens(self) -> int:
"""Сколько запросов доступно прямо сейчас."""
now = time.monotonic()
active = sum(1 for ts in self._timestamps if (now - ts) <= self.window_seconds)
return max(0, self.max_requests - active)

SF API Client

# src/api/sf_client.py

import httpx
import structlog
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

from src.config.settings import settings
from src.api.rate_limiter import TokenBucketRateLimiter
from src.api.models import (
ProductInfo, ProductSales, ProductPosition,
CategoryInfo, KeywordData
)

logger = structlog.get_logger()


class SalesFinderClient:
"""
Клиент SalesFinder API с rate limiting, retry, кэшированием.

Usage:
async with SalesFinderClient() as sf:
product = await sf.get_product("wb", 12345678)
sales = await sf.get_product_sales("wb", 12345678, days=30)
"""

BASE_URL = "https://salesfinder.ru/api/v2"

def __init__(self):
self.api_key = settings.sf_api_key
self.rate_limiter = TokenBucketRateLimiter(max_requests=90, window_seconds=60)
self._client: httpx.AsyncClient | None = None
self._request_count = 0
self._error_count = 0

async def __aenter__(self):
self._client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"User-Agent": "ROI-Content-MVP/1.0"
},
timeout=httpx.Timeout(30.0, connect=10.0),
follow_redirects=True,
)
return self

async def __aexit__(self, *args):
if self._client:
await self._client.aclose()

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=2, min=1, max=30),
retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError)),
before_sleep=lambda retry_state: logger.warning(
"sf_api_retry",
attempt=retry_state.attempt_number,
wait=retry_state.next_action.sleep
)
)
async def _request(self, method: str, endpoint: str, **kwargs) -> dict:
"""Базовый запрос с rate limiting и retry."""
await self.rate_limiter.acquire()
self._request_count += 1

response = await self._client.request(method, endpoint, **kwargs)
response.raise_for_status()

data = response.json()
logger.debug("sf_api_request", endpoint=endpoint, status=response.status_code)
return data

# ----- Product endpoints -----

async def get_product(self, marketplace: str, sku: int) -> ProductInfo:
"""Получить информацию о товаре."""
data = await self._request(
"GET",
f"/products/{marketplace}/{sku}"
)
return ProductInfo.model_validate(data)

async def get_product_sales(
self, marketplace: str, sku: int, days: int = 30
) -> ProductSales:
"""Получить продажи товара за период."""
data = await self._request(
"GET",
f"/products/{marketplace}/{sku}/sales",
params={"days": days}
)
return ProductSales.model_validate(data)

async def get_product_position(
self, marketplace: str, sku: int, keyword: str | None = None
) -> ProductPosition:
"""Получить позицию товара в категории или по ключевому слову."""
params = {}
if keyword:
params["keyword"] = keyword
data = await self._request(
"GET",
f"/products/{marketplace}/{sku}/position",
params=params
)
return ProductPosition.model_validate(data)

async def get_product_keywords(
self, marketplace: str, sku: int
) -> list[KeywordData]:
"""Получить ключевые слова товара с позициями."""
data = await self._request(
"GET",
f"/products/{marketplace}/{sku}/keywords"
)
return [KeywordData.model_validate(item) for item in data.get("keywords", [])]

# ----- Category endpoints -----

async def get_category_stats(
self, marketplace: str, category_id: int
) -> CategoryInfo:
"""Получить статистику категории (бенчмарки)."""
data = await self._request(
"GET",
f"/categories/{marketplace}/{category_id}/stats"
)
return CategoryInfo.model_validate(data)

# ----- Batch operations -----

async def get_products_batch(
self, marketplace: str, skus: list[int]
) -> list[ProductInfo]:
"""
Пакетное получение данных по нескольким товарам.
Разбивает на chunks по 10 для rate limiting.
"""
results = []
for i in range(0, len(skus), 10):
chunk = skus[i:i + 10]
tasks = [self.get_product(marketplace, sku) for sku in chunk]
chunk_results = await asyncio.gather(*tasks, return_exceptions=True)

for sku, result in zip(chunk, chunk_results):
if isinstance(result, Exception):
logger.error("sf_batch_error", sku=sku, error=str(result))
self._error_count += 1
else:
results.append(result)

return results

@property
def stats(self) -> dict:
"""Статистика использования API."""
return {
"total_requests": self._request_count,
"errors": self._error_count,
"available_tokens": self.rate_limiter.available_tokens,
"error_rate": self._error_count / max(self._request_count, 1),
}

Кэширование

Стратегия кэширования

flowchart TB
REQ["Запрос данных<br/>по SKU"] --> CACHE_CHECK{"Redis cache<br/>существует?"}

CACHE_CHECK -->|"HIT (TTL не истёк)"| RETURN_CACHE["Вернуть из кэша<br/>⚡ ~1ms"]
CACHE_CHECK -->|"MISS"| SF_REQUEST["Запрос к SF API<br/>🌐 ~200ms"]

SF_REQUEST --> NORMALIZE["Нормализация данных"]
NORMALIZE --> SAVE_CACHE["Сохранить в Redis<br/>TTL = 6 часов"]
NORMALIZE --> SAVE_DB["Сохранить в PostgreSQL<br/>(measurement)"]
SAVE_CACHE --> RETURN_FRESH["Вернуть свежие данные"]

Redis Schema

# Ключи Redis:

# 1. Кэш SF API ответов
# Key: sf:cache:{marketplace}:{sku}
# Value: JSON с данными продукта
# TTL: 6 часов (21600 секунд)

# 2. Rate limiter
# Key: sf:ratelimit:{api_key}
# Value: sorted set с timestamps
# TTL: 2 минуты

# 3. Последний ContentScore
# Key: cs:latest:{product_id}
# Value: JSON с последним скором
# TTL: 1 час (обновляется при пересчёте)

# 4. Клиентская сессия (дашборд)
# Key: dash:session:{token}
# Value: JSON с client_id, permissions
# TTL: 24 часа

# 5. Лок на задачи (идемпотентность)
# Key: lock:poll:{product_id}
# Value: worker_id
# TTL: 10 минут

REDIS_KEY_PATTERNS = {
"sf_cache": "sf:cache:{marketplace}:{sku}",
"rate_limit": "sf:ratelimit:{api_key}",
"latest_score": "cs:latest:{product_id}",
"session": "dash:session:{token}",
"task_lock": "lock:poll:{product_id}",
}

REDIS_TTL = {
"sf_cache": 21600, # 6 часов
"rate_limit": 120, # 2 минуты
"latest_score": 3600, # 1 час
"session": 86400, # 24 часа
"task_lock": 600, # 10 минут
}

Celery Workers

Task Definitions

# src/workers/tasks.py

from celery import shared_task
from datetime import datetime, timedelta
import structlog

logger = structlog.get_logger()


@shared_task(
name="poll_product_metrics",
bind=True,
max_retries=3,
default_retry_delay=300, # 5 минут
rate_limit="10/m", # Макс 10 задач в минуту
acks_late=True, # Подтверждение после выполнения
reject_on_worker_lost=True, # Повторить при падении воркера
)
def poll_product_metrics(self, product_id: str):
"""
Собирает метрики одного товара из SF API.
Запускается по расписанию для каждого активного товара.
"""
try:
# Реализация в engine/collector.py
from src.engine.collector import collect_product_metrics
result = collect_product_metrics(product_id)
logger.info("poll_complete", product_id=product_id, metrics=result.metrics_count)
return result.to_dict()
except Exception as exc:
logger.error("poll_failed", product_id=product_id, error=str(exc))
raise self.retry(exc=exc)


@shared_task(name="calculate_content_scores")
def calculate_content_scores(product_ids: list[str] | None = None):
"""
Пересчитывает ContentScore для указанных товаров.
Если product_ids не указан — пересчитывает для всех активных.
"""
from src.engine.content_score import ContentScoreEngine
engine = ContentScoreEngine()
scores = engine.calculate_batch(product_ids)
logger.info("scores_calculated", count=len(scores))
return {"calculated": len(scores)}


@shared_task(name="check_before_after_measurements")
def check_before_after_measurements():
"""
Проверяет, есть ли before/after треки с наступившим временем замера.
Выполняет замер и планирует следующий.
"""
from src.engine.before_after import BeforeAfterEngine
engine = BeforeAfterEngine()
processed = engine.process_pending_measurements()
logger.info("ba_measurements_processed", count=processed)
return {"processed": processed}


@shared_task(name="generate_weekly_reports")
def generate_weekly_reports():
"""Генерирует и рассылает еженедельные отчёты для всех клиентов."""
from src.reports.generator import ReportGenerator
generator = ReportGenerator()
reports = generator.generate_weekly_all()
logger.info("weekly_reports_generated", count=len(reports))
return {"reports": len(reports)}


@shared_task(name="update_category_benchmarks")
def update_category_benchmarks():
"""Обновляет бенчмарки категорий (раз в сутки)."""
from src.engine.benchmarks import BenchmarkEngine
engine = BenchmarkEngine()
updated = engine.update_all()
logger.info("benchmarks_updated", categories=updated)
return {"updated": updated}

Schedule Configuration

# src/workers/schedules.py

from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
# Polling метрик — каждые 6 часов
"poll-all-products": {
"task": "poll_all_active_products",
"schedule": crontab(minute=0, hour="*/6"),
"options": {"queue": "polling"},
},

# Пересчёт ContentScore — после каждого polling
"recalculate-scores": {
"task": "calculate_content_scores",
"schedule": crontab(minute=30, hour="*/6"),
"options": {"queue": "scoring"},
},

# Before/After замеры — каждый час (проверка расписания)
"check-ba-measurements": {
"task": "check_before_after_measurements",
"schedule": crontab(minute=0, hour="*"),
"options": {"queue": "tracking"},
},

# Еженедельные отчёты — понедельник 9:00 MSK
"weekly-reports": {
"task": "generate_weekly_reports",
"schedule": crontab(minute=0, hour=6, day_of_week=1), # UTC
"options": {"queue": "reports"},
},

# Бенчмарки категорий — ежедневно в 3:00 MSK
"update-benchmarks": {
"task": "update_category_benchmarks",
"schedule": crontab(minute=0, hour=0), # UTC
"options": {"queue": "maintenance"},
},

# Очистка устаревшего кэша — ежедневно
"cleanup-cache": {
"task": "cleanup_expired_cache",
"schedule": crontab(minute=0, hour=2),
"options": {"queue": "maintenance"},
},
}

Deployment

Docker Compose

# docker-compose.yml

version: "3.9"

services:
# PostgreSQL + TimescaleDB
db:
image: timescale/timescaledb:latest-pg16
environment:
POSTGRES_DB: roi_content
POSTGRES_USER: ${DB_USER:-roi}
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- pgdata:/var/lib/postgresql/data
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U roi -d roi_content"]
interval: 10s
timeout: 5s
retries: 5

# Redis
redis:
image: redis:7-alpine
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
ports:
- "6379:6379"
volumes:
- redisdata:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s

# FastAPI app
api:
build: .
command: uvicorn src.rest_api.app:app --host 0.0.0.0 --port 8000 --workers 2
environment:
- DATABASE_URL=postgresql+asyncpg://${DB_USER:-roi}:${DB_PASSWORD}@db:5432/roi_content
- REDIS_URL=redis://redis:6379/0
- SF_API_KEY=${SF_API_KEY}
ports:
- "8000:8000"
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy

# Streamlit Dashboard
dashboard:
build: .
command: streamlit run src/dashboard/app.py --server.port 8501 --server.address 0.0.0.0
environment:
- DATABASE_URL=postgresql+asyncpg://${DB_USER:-roi}:${DB_PASSWORD}@db:5432/roi_content
- REDIS_URL=redis://redis:6379/0
ports:
- "8501:8501"
depends_on:
- db
- redis

# Celery Worker
worker:
build: .
command: celery -A src.workers.app worker -l info -c 3 -Q polling,scoring,tracking,reports,maintenance
environment:
- DATABASE_URL=postgresql+asyncpg://${DB_USER:-roi}:${DB_PASSWORD}@db:5432/roi_content
- REDIS_URL=redis://redis:6379/0
- SF_API_KEY=${SF_API_KEY}
depends_on:
- db
- redis

# Celery Beat (Scheduler)
beat:
build: .
command: celery -A src.workers.app beat -l info
environment:
- DATABASE_URL=postgresql+asyncpg://${DB_USER:-roi}:${DB_PASSWORD}@db:5432/roi_content
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis

# Flower (Celery мониторинг)
flower:
build: .
command: celery -A src.workers.app flower --port=5555
ports:
- "5555:5555"
depends_on:
- redis
- worker

volumes:
pgdata:
redisdata:

Production оверрайды

# docker-compose.prod.yml

version: "3.9"

services:
db:
deploy:
resources:
limits:
memory: 2G

api:
command: gunicorn src.rest_api.app:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
deploy:
resources:
limits:
memory: 512M
restart: always

dashboard:
deploy:
resources:
limits:
memory: 512M
restart: always

worker:
command: celery -A src.workers.app worker -l warning -c 4 -Q polling,scoring,tracking,reports,maintenance --max-tasks-per-child=1000
deploy:
resources:
limits:
memory: 1G
restart: always

beat:
restart: always

# Nginx reverse proxy
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- api
- dashboard

Мониторинг и алерты

Health Checks

# src/rest_api/routers/health.py

from fastapi import APIRouter, Depends
from datetime import datetime, timedelta

router = APIRouter(tags=["health"])


@router.get("/health")
async def health_check(db=Depends(get_db), redis=Depends(get_redis)):
"""Полная проверка здоровья системы."""
checks = {}

# Database
try:
await db.execute("SELECT 1")
checks["database"] = {"status": "ok"}
except Exception as e:
checks["database"] = {"status": "error", "detail": str(e)}

# Redis
try:
await redis.ping()
checks["redis"] = {"status": "ok"}
except Exception as e:
checks["redis"] = {"status": "error", "detail": str(e)}

# SF API (проверяем последний успешный запрос)
last_poll = await db.fetch_one(
"SELECT MAX(measured_at) as last FROM measurements"
)
if last_poll and last_poll["last"]:
hours_ago = (datetime.utcnow() - last_poll["last"]).total_seconds() / 3600
if hours_ago > 12:
checks["sf_polling"] = {"status": "warning", "hours_since_last": round(hours_ago, 1)}
else:
checks["sf_polling"] = {"status": "ok", "hours_since_last": round(hours_ago, 1)}
else:
checks["sf_polling"] = {"status": "no_data"}

# Overall status
all_ok = all(c.get("status") == "ok" for c in checks.values())

return {
"status": "healthy" if all_ok else "degraded",
"timestamp": datetime.utcnow().isoformat(),
"checks": checks,
}

Telegram-алерты

# src/monitoring/alerts.py

import httpx
from src.config.settings import settings


async def send_telegram_alert(message: str, level: str = "warning"):
"""Отправить алерт в Telegram-канал мониторинга."""
emoji = {"info": "ℹ️", "warning": "⚠️", "error": "🔴", "critical": "🚨"}
prefix = emoji.get(level, "📢")

text = f"{prefix} *ROI-Content {level.upper()}*\n\n{message}"

async with httpx.AsyncClient() as client:
await client.post(
f"https://api.telegram.org/bot{settings.telegram_bot_token}/sendMessage",
json={
"chat_id": settings.telegram_alert_chat_id,
"text": text,
"parse_mode": "Markdown",
}
)

Безопасность

Изоляция данных клиентов

# Каждый запрос к данным ОБЯЗАТЕЛЬНО фильтруется по client_id.
# Row-Level Security на уровне PostgreSQL:

"""
-- RLS для таблицы products
ALTER TABLE products ENABLE ROW LEVEL SECURITY;

CREATE POLICY products_client_isolation ON products
FOR ALL
USING (client_id = current_setting('app.current_client_id')::uuid);

-- Устанавливается при каждом запросе:
SET app.current_client_id = '{client_uuid}';
"""

API Authentication

# src/rest_api/deps.py

from fastapi import Header, HTTPException, Depends

async def get_current_client(
authorization: str = Header(..., alias="X-API-Token")
) -> Client:
"""Аутентификация клиента по API-токену."""
client = await db.fetch_one(
"SELECT * FROM clients WHERE api_token = $1 AND is_active = true",
authorization
)
if not client:
raise HTTPException(status_code=401, detail="Invalid or inactive API token")
return Client(**client)

Связанные документы

ДокументСодержание
MVP ОбзорВидение, roadmap, бюджет
ContentScore алгоритмФормула скоринга
Before/After трекерСистема замеров
Клиентский дашбордUI/UX дашборда
Финансовая модельUnit-экономика