API-контракты и Интеграция
Protobuf + buf.build для контрактов, ConnectRPC для синхронных вызовов, RabbitMQ для асинхронного обмена.
API-контракты и Интеграция
Все межсервисные контракты описываются в Protocol Buffers (Protobuf) и управляются через buf.build. Синхронные вызовы — через ConnectRPC/gRPC. Асинхронный обмен командами и событиями — через RabbitMQ.
Почему Schema-First для ISP? В телеком-платформе BSS и OSS слои разрабатываются на разных языках (NestJS для BSS, Go для OSS high-load) разными командами. Protobuf-контракты — единственный source of truth, гарантирующий, что
subscription.activatedиз Product Service иActivateServiceCommandв Provisioning имеют одинаковую структуру. Это критично, поскольку рассогласование контрактов в ISP = абонент без интернета или потеря платежа. В отличие от монолитных ISP-систем (Hydra Billing, Splynx), где интеграция — внутренние вызовы функций, наш schema-first подход обеспечивает breaking change detection в CI до деплоя.
Schema-First: Protobuf + buf.build
Почему Protobuf, а не JSON/OpenAPI
| Критерий | Protobuf | JSON / OpenAPI |
|---|---|---|
| Типобезопасность | Строгая, compile-time | Runtime валидация |
| Производительность | Бинарный, ~10× быстрее сериализации | Текстовый, verbose |
| Кодогенерация | Нативная (Go, TS, Swift, Kotlin) | Сторонние генераторы |
| Обратная совместимость | Встроенные правила (buf breaking) | Ручная проверка |
| Документация | Комментарии в .proto = документация | Отдельный файл |
| Экосистема | buf.build BSR, lint, format | Разрозненная |
Структура proto-репозитория
proto/
├── buf.yaml # Конфигурация buf модуля
├── buf.gen.yaml # Правила кодогенерации
├── buf.lock # Lock-файл зависимостей
├── common/
│ └── v1/
│ ├── pagination.proto # Курсорная пагинация
│ ├── metadata.proto # RequestMetadata (correlation/causation)
│ ├── money.proto # Money (amount + currency as string)
│ └── events.proto # EventEnvelope, CommandEnvelope
├── customer/
│ └── v1/
│ ├── customer.proto # Модели: Customer, Contract, Contact
│ ├── customer_service.proto # RPC: CustomerService
│ └── events.proto # CustomerCreated, ContractSigned...
├── product/
│ └── v1/
│ ├── product.proto # ProductOffering, Subscription
│ ├── product_service.proto # RPC: ProductService
│ └── events.proto # SubscriptionActivated, TariffChanged...
├── billing/
│ └── v1/
│ ├── billing.proto # Account, Transaction, Invoice
│ ├── billing_service.proto # RPC: BillingService
│ ├── events.proto # PaymentReceived, DunningStageChanged...
│ └── commands.proto # ChargeAccountCommand, RefundCommand
├── provisioning/
│ └── v1/
│ ├── provisioning.proto # ProvisioningTask
│ ├── provisioning_service.proto # RPC (read-only)
│ ├── events.proto # ProvisioningSuccess/Failed/Rollback
│ └── commands.proto # ActivateService, SuspendAccess...
├── inventory/
│ └── v1/
│ ├── inventory.proto # Device, Port, LogicalResource
│ ├── inventory_service.proto # RPC: InventoryService
│ └── events.proto # PortReserved, ResourceExhausted...
├── aaa/
│ └── v1/
│ ├── aaa.proto # RadiusProfile, Session, RadiusAttribute
│ └── aaa_service.proto # RPC: AAAService (CoA, profiles, sessions)
├── orchestration/
│ └── v1/
│ ├── order.proto # Order, OrderStep, OrderStatus
│ └── order_service.proto # RPC: OrderService
└── notification/
└── v1/
├── notification_service.proto # RPC: NotificationService
└── commands.proto # SendNotificationCommandbuf.yaml
version: v2
modules:
- path: proto
name: buf.build/isp-platform/api
lint:
use:
- STANDARD
- COMMENTS
except:
- PACKAGE_VERSION_SUFFIX
breaking:
use:
- PACKAGEbuf.gen.yaml (кодогенерация)
version: v2
managed:
enabled: true
override:
- file_option: go_package_prefix
value: github.com/isp-platform/api/gen/go
plugins:
# Go Protobuf
- remote: buf.build/protocolbuffers/go
out: gen/go
opt: paths=source_relative
# ConnectRPC (Go)
- remote: buf.build/connectrpc/go
out: gen/go
opt: paths=source_relative
# ConnectRPC (TypeScript — для фронтенда)
- remote: buf.build/connectrpc/es
out: gen/ts
opt: target=ts
# Protovalidate (валидация полей)
- remote: buf.build/bufbuild/protovalidate-go
out: gen/go
opt: paths=source_relativeCI Pipeline: buf в CI/CD
# .gitlab-ci.yml / GitHub Actions
buf-check:
steps:
- buf lint # Проверка стиля и именования
- buf format --diff # Форматирование
- buf breaking # Детекция ломающих изменений (против main)
- buf generate # Кодогенерация
- buf push # Публикация в BSR (Buf Schema Registry)Правила:
buf lint— обязателен в CI. Блокирует merge при нарушении стандартов.buf breaking— сравнивает сmainветкой. Ломающие изменения требуют ревью и bump версии пакета (v1→v2).buf push— публикует в BSR при merge в main. Все сервисы тянут сгенерированный код из BSR.
Пример: Protobuf-контракт
common/v1/metadata.proto
syntax = "proto3";
package common.v1;
import "google/protobuf/timestamp.proto";
// Метаданные, обязательные для каждого запроса и события.
message RequestMetadata {
// Уникальный ID запроса (UUID v4).
string request_id = 1;
// Сквозной ID процесса для трейсинга.
string correlation_id = 2;
// ID события/запроса, породившего текущий.
string causation_id = 3;
}
// Стандартная обёртка для событий RabbitMQ.
message EventEnvelope {
string event_id = 1;
string event_type = 2; // e.g. "billing.payment.received"
string source = 3; // e.g. "billing-service"
string subject = 4; // e.g. "account/acc-123"
google.protobuf.Timestamp occurred_at = 5;
RequestMetadata metadata = 6;
bytes payload = 7; // Сериализованный Protobuf конкретного события
}customer/v1/customer_service.proto
syntax = "proto3";
package customer.v1;
import "common/v1/pagination.proto";
import "common/v1/metadata.proto";
import "buf/validate/validate.proto";
// Клиент (физлицо или юрлицо).
message Customer {
string id = 1;
CustomerType type = 2;
string full_name = 3;
string inn = 4;
repeated Contact contacts = 5;
repeated Contract contracts = 6;
}
enum CustomerType {
CUSTOMER_TYPE_UNSPECIFIED = 0;
CUSTOMER_TYPE_INDIVIDUAL = 1;
CUSTOMER_TYPE_LEGAL_ENTITY = 2;
}
message Contact {
string id = 1;
ContactType type = 2;
string value = 3 [(buf.validate.field).string.min_len = 1];
bool is_verified = 4;
}
enum ContactType {
CONTACT_TYPE_UNSPECIFIED = 0;
CONTACT_TYPE_PHONE = 1;
CONTACT_TYPE_EMAIL = 2;
}
message Contract {
string id = 1;
string number = 2;
string status = 3;
google.protobuf.Timestamp signed_at = 4;
}
// --- RPC ---
service CustomerService {
// Получить клиента по ID.
rpc GetCustomer(GetCustomerRequest) returns (GetCustomerResponse);
// Список клиентов с пагинацией и фильтрацией.
rpc ListCustomers(ListCustomersRequest) returns (ListCustomersResponse);
// Создать клиента.
rpc CreateCustomer(CreateCustomerRequest) returns (CreateCustomerResponse);
// Обновить клиента (partial update).
rpc UpdateCustomer(UpdateCustomerRequest) returns (UpdateCustomerResponse);
// Полнотекстовый поиск.
rpc SearchCustomers(SearchCustomersRequest) returns (SearchCustomersResponse);
// Проверка технической возможности по адресу.
rpc CheckAvailability(CheckAvailabilityRequest) returns (CheckAvailabilityResponse);
}
message GetCustomerRequest {
string id = 1 [(buf.validate.field).string.uuid = true];
}
message GetCustomerResponse {
Customer customer = 1;
}
message ListCustomersRequest {
common.v1.PaginationRequest pagination = 1;
string filter = 2; // CEL-выражение или строка фильтра
}
message ListCustomersResponse {
repeated Customer customers = 1;
common.v1.PaginationResponse pagination = 2;
}common/v1/money.proto
syntax = "proto3";
package common.v1;
// Денежная сумма (precise). Не используем float/double для денег!
message Money {
string amount = 1; // Decimal as string: "1500.50"
string currency = 2; // ISO 4217: "RUB", "USD"
}common/v1/pagination.proto
syntax = "proto3";
package common.v1;
message PaginationRequest {
int32 page_size = 1 [(buf.validate.field).int32 = {gte: 1, lte: 100}];
string page_token = 2; // Cursor-based (opaque token)
}
message PaginationResponse {
string next_page_token = 1;
int32 total_count = 2;
}common/v1/events.proto
syntax = "proto3";
package common.v1;
import "google/protobuf/timestamp.proto";
// Обёртка для ВСЕХ событий в RabbitMQ. Единый конверт.
message EventEnvelope {
string event_id = 1; // UUID — ключ идемпотентности (inbox)
string event_type = 2; // "billing.payment.received"
string source = 3; // "billing-service"
string subject = 4; // "account/acc-123"
int32 schema_version = 5; // Версия схемы payload (1, 2, ...)
google.protobuf.Timestamp occurred_at = 6;
RequestMetadata metadata = 7;
bytes payload = 8; // Сериализованный Protobuf конкретного события
}
// Обёртка для ВСЕХ команд в RabbitMQ.
message CommandEnvelope {
string command_id = 1; // UUID — ключ идемпотентности
string command_type = 2; // "provisioning.activate_service"
string source = 3; // "oms-service"
string target = 4; // "provisioning-service"
google.protobuf.Timestamp issued_at = 5;
RequestMetadata metadata = 6;
bytes payload = 7;
}product/v1/product_service.proto
syntax = "proto3";
package product.v1;
import "common/v1/pagination.proto";
import "common/v1/money.proto";
import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";
// --- Агрегат: ProductOffering (read model, не мутируется клиентом) ---
message ProductOffering {
string id = 1;
string code = 2; // "inet-100-gpon"
string name = 3; // "Интернет 100 Мбит/с (GPON)"
ProductCategory category = 4;
CustomerSegment segment = 5;
common.v1.Money base_price = 6;
repeated ProductCharacteristic characteristics = 7;
repeated string availability_zones = 8; // Коды зон покрытия
bool is_active = 9;
}
enum ProductCategory {
PRODUCT_CATEGORY_UNSPECIFIED = 0;
PRODUCT_CATEGORY_INTERNET = 1;
PRODUCT_CATEGORY_IPTV = 2;
PRODUCT_CATEGORY_TELEPHONY = 3;
PRODUCT_CATEGORY_BUNDLE = 4;
PRODUCT_CATEGORY_EQUIPMENT_RENTAL = 5;
}
enum CustomerSegment {
CUSTOMER_SEGMENT_UNSPECIFIED = 0;
CUSTOMER_SEGMENT_INDIVIDUAL = 1;
CUSTOMER_SEGMENT_LEGAL_ENTITY = 2;
CUSTOMER_SEGMENT_ALL = 3;
}
message ProductCharacteristic {
string name = 1; // "download_speed_mbps"
string value = 2; // "100"
string unit = 3; // "Mbps"
}
// --- Агрегат: Subscription (жизненный цикл услуги клиента) ---
message Subscription {
string id = 1;
string customer_id = 2;
string product_offering_id = 3;
SubscriptionStatus status = 4;
common.v1.Money effective_price = 5; // С учётом скидок
map<string, string> parameters = 6; // runtime-параметры (ip, vlan, speed)
google.protobuf.Timestamp activated_at = 7;
google.protobuf.Timestamp suspended_at = 8;
google.protobuf.Timestamp terminated_at = 9;
}
enum SubscriptionStatus {
SUBSCRIPTION_STATUS_UNSPECIFIED = 0;
SUBSCRIPTION_STATUS_PENDING = 1;
SUBSCRIPTION_STATUS_ACTIVE = 2;
SUBSCRIPTION_STATUS_SUSPENDED = 3;
SUBSCRIPTION_STATUS_TERMINATED = 4;
}
// --- RPC ---
service ProductService {
rpc GetProduct(GetProductRequest) returns (GetProductResponse);
rpc ListProducts(ListProductsRequest) returns (ListProductsResponse);
rpc CreateProduct(CreateProductRequest) returns (CreateProductResponse);
rpc CreateSubscription(CreateSubscriptionRequest) returns (CreateSubscriptionResponse);
rpc GetSubscription(GetSubscriptionRequest) returns (GetSubscriptionResponse);
rpc ChangeSubscription(ChangeSubscriptionRequest) returns (ChangeSubscriptionResponse);
rpc SuspendSubscription(SuspendSubscriptionRequest) returns (SuspendSubscriptionResponse);
rpc ResumeSubscription(ResumeSubscriptionRequest) returns (ResumeSubscriptionResponse);
rpc TerminateSubscription(TerminateSubscriptionRequest) returns (TerminateSubscriptionResponse);
}
message CreateSubscriptionRequest {
string customer_id = 1 [(buf.validate.field).string.uuid = true];
string product_offering_id = 2 [(buf.validate.field).string.uuid = true];
map<string, string> parameters = 3;
string idempotency_key = 4 [(buf.validate.field).string.uuid = true];
}
message ChangeSubscriptionRequest {
string subscription_id = 1 [(buf.validate.field).string.uuid = true];
string new_product_offering_id = 2 [(buf.validate.field).string.uuid = true];
google.protobuf.Timestamp effective_date = 3; // Когда вступает в силу
string idempotency_key = 4 [(buf.validate.field).string.uuid = true];
}billing/v1/billing_service.proto
syntax = "proto3";
package billing.v1;
import "common/v1/pagination.proto";
import "common/v1/money.proto";
import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";
// --- Агрегат: Account (финансовый аккаунт клиента) ---
message Account {
string id = 1;
string customer_id = 2;
common.v1.Money balance = 3;
AccountStatus status = 4;
DunningStage dunning_stage = 5;
string legacy_ls = 6; // Legacy лицевой счёт (для миграции)
google.protobuf.Timestamp created_at = 7;
}
enum AccountStatus {
ACCOUNT_STATUS_UNSPECIFIED = 0;
ACCOUNT_STATUS_ACTIVE = 1;
ACCOUNT_STATUS_SUSPENDED = 2;
ACCOUNT_STATUS_CLOSED = 3;
}
enum DunningStage {
DUNNING_STAGE_NONE = 0;
DUNNING_STAGE_SOFT = 1; // D+0: уведомление + captive portal
DUNNING_STAGE_HARD = 2; // D+3: блокировка (64kbps)
DUNNING_STAGE_TERMINATE = 3; // D+N: расторжение
}
// --- Value Object: Transaction (double-entry ledger) ---
message Transaction {
string id = 1;
string account_id = 2;
TransactionType type = 3;
common.v1.Money amount = 4;
common.v1.Money balance_after = 5;
string description = 6;
string external_id = 7; // ID от внешней системы (банк, 1С)
google.protobuf.Timestamp created_at = 8;
}
enum TransactionType {
TRANSACTION_TYPE_UNSPECIFIED = 0;
TRANSACTION_TYPE_PAYMENT = 1; // Зачисление
TRANSACTION_TYPE_CHARGE = 2; // Списание (рекуррент)
TRANSACTION_TYPE_REFUND = 3; // Возврат
TRANSACTION_TYPE_ADJUSTMENT = 4; // Ручная корректировка
TRANSACTION_TYPE_CONNECTION_FEE = 5; // Разовый платёж за подключение
}
// --- Value Object: Invoice ---
message Invoice {
string id = 1;
string account_id = 2;
string number = 3; // "ИСП-2025-000123"
common.v1.Money total = 4;
InvoiceStatus status = 5;
google.protobuf.Timestamp period_start = 6;
google.protobuf.Timestamp period_end = 7;
google.protobuf.Timestamp issued_at = 8;
google.protobuf.Timestamp paid_at = 9;
}
enum InvoiceStatus {
INVOICE_STATUS_UNSPECIFIED = 0;
INVOICE_STATUS_DRAFT = 1;
INVOICE_STATUS_ISSUED = 2;
INVOICE_STATUS_PAID = 3;
INVOICE_STATUS_OVERDUE = 4;
INVOICE_STATUS_CANCELLED = 5;
}
// --- RPC ---
service BillingService {
rpc GetAccount(GetAccountRequest) returns (GetAccountResponse);
rpc GetBalance(GetBalanceRequest) returns (GetBalanceResponse);
rpc ListTransactions(ListTransactionsRequest) returns (ListTransactionsResponse);
// Мутации (все идемпотентные)
rpc RegisterPayment(RegisterPaymentRequest) returns (RegisterPaymentResponse);
rpc ManualAdjust(ManualAdjustRequest) returns (ManualAdjustResponse);
rpc GenerateInvoice(GenerateInvoiceRequest) returns (GenerateInvoiceResponse);
rpc GetInvoice(GetInvoiceRequest) returns (GetInvoiceResponse);
}
message RegisterPaymentRequest {
string account_id = 1 [(buf.validate.field).string.uuid = true];
common.v1.Money amount = 2;
string source = 3; // "sberbank", "tinkoff", "cash"
string external_id = 4 [(buf.validate.field).string.min_len = 1];
string idempotency_key = 5 [(buf.validate.field).string.uuid = true];
}
message ManualAdjustRequest {
string account_id = 1 [(buf.validate.field).string.uuid = true];
common.v1.Money amount = 2; // Может быть отрицательным
string reason = 3 [(buf.validate.field).string.min_len = 5];
string idempotency_key = 4 [(buf.validate.field).string.uuid = true];
}orchestration/v1/order_service.proto
syntax = "proto3";
package orchestration.v1;
import "common/v1/pagination.proto";
import "common/v1/metadata.proto";
import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";
// --- Агрегат: Order (оркестрационный процесс) ---
message Order {
string id = 1;
string customer_id = 2;
OrderType type = 3;
OrderStatus status = 4;
OrderPriority priority = 5;
string title = 6;
string description = 7;
repeated OrderStep steps = 8;
map<string, string> context = 9; // Данные для шагов (subscription_id, address, etc.)
int32 legacy_id = 10; // Маппинг из Legacy CRM
google.protobuf.Timestamp created_at = 11;
google.protobuf.Timestamp updated_at = 12;
google.protobuf.Timestamp deadline = 13;
}
enum OrderType {
ORDER_TYPE_UNSPECIFIED = 0;
ORDER_TYPE_NEW_CONNECTION = 1;
ORDER_TYPE_TARIFF_CHANGE = 2;
ORDER_TYPE_DISCONNECTION = 3;
ORDER_TYPE_EQUIPMENT_REPLACEMENT = 4;
ORDER_TYPE_REPAIR = 5;
ORDER_TYPE_SUSPENSION = 6;
ORDER_TYPE_RESUMPTION = 7;
}
enum OrderStatus {
ORDER_STATUS_UNSPECIFIED = 0;
ORDER_STATUS_DRAFT = 1;
ORDER_STATUS_SUBMITTED = 2;
ORDER_STATUS_IN_PROGRESS = 3;
ORDER_STATUS_WAITING_EXTERNAL = 4;
ORDER_STATUS_COMPENSATING = 5;
ORDER_STATUS_COMPLETED = 6;
ORDER_STATUS_FAILED = 7;
ORDER_STATUS_CANCELLED = 8;
}
enum OrderPriority {
ORDER_PRIORITY_UNSPECIFIED = 0;
ORDER_PRIORITY_LOW = 1;
ORDER_PRIORITY_MEDIUM = 2;
ORDER_PRIORITY_HIGH = 3;
ORDER_PRIORITY_CRITICAL = 4;
}
// Шаг Saga-оркестрации
message OrderStep {
string name = 1; // "reserve_port", "create_profile", "activate"
StepStatus status = 2;
string error_message = 3;
int32 retry_count = 4;
google.protobuf.Timestamp started_at = 5;
google.protobuf.Timestamp completed_at = 6;
}
enum StepStatus {
STEP_STATUS_UNSPECIFIED = 0;
STEP_STATUS_PENDING = 1;
STEP_STATUS_RUNNING = 2;
STEP_STATUS_COMPLETED = 3;
STEP_STATUS_FAILED = 4;
STEP_STATUS_COMPENSATED = 5;
STEP_STATUS_SKIPPED = 6;
}
// --- RPC ---
service OrderService {
rpc CreateOrder(CreateOrderRequest) returns (CreateOrderResponse);
rpc GetOrder(GetOrderRequest) returns (GetOrderResponse);
rpc ListOrders(ListOrdersRequest) returns (ListOrdersResponse);
rpc CancelOrder(CancelOrderRequest) returns (CancelOrderResponse);
rpc RetryOrder(RetryOrderRequest) returns (RetryOrderResponse);
}
message CreateOrderRequest {
OrderType type = 1;
string customer_id = 2 [(buf.validate.field).string.uuid = true];
string title = 3;
map<string, string> context = 4;
OrderPriority priority = 5;
string idempotency_key = 6 [(buf.validate.field).string.uuid = true];
}inventory/v1/inventory_service.proto
syntax = "proto3";
package inventory.v1;
import "common/v1/pagination.proto";
import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";
// --- Агрегат: Device ---
message Device {
string id = 1;
string hostname = 2;
string management_ip = 3;
DeviceType type = 4;
string vendor = 5; // "huawei", "mikrotik", "eltex", "cisco"
string model = 6;
string firmware_version = 7;
DeviceStatus status = 8;
string location = 9; // Физическое расположение
repeated Port ports = 10;
}
enum DeviceType {
DEVICE_TYPE_UNSPECIFIED = 0;
DEVICE_TYPE_OLT = 1;
DEVICE_TYPE_SWITCH = 2;
DEVICE_TYPE_ROUTER = 3;
DEVICE_TYPE_BRAS = 4;
DEVICE_TYPE_CPE = 5;
DEVICE_TYPE_SPLITTER = 6;
}
enum DeviceStatus {
DEVICE_STATUS_UNSPECIFIED = 0;
DEVICE_STATUS_ACTIVE = 1;
DEVICE_STATUS_MAINTENANCE = 2;
DEVICE_STATUS_DECOMMISSIONED = 3;
}
// --- Value Object: Port ---
message Port {
string id = 1;
string device_id = 2;
string number = 3; // "0/1/3"
PortStatus status = 4;
string subscriber_id = 5; // Кому выделен (если occupied)
string subscription_id = 6;
google.protobuf.Timestamp reserved_until = 7; // TTL резерва
}
enum PortStatus {
PORT_STATUS_UNSPECIFIED = 0;
PORT_STATUS_FREE = 1;
PORT_STATUS_RESERVED = 2;
PORT_STATUS_OCCUPIED = 3;
PORT_STATUS_FAULTY = 4;
}
// --- Value Object: LogicalResource ---
message LogicalResource {
string id = 1;
ResourceType type = 2;
string value = 3; // "192.168.1.100", "AA:BB:CC:DD:EE:FF", "100"
string pool_id = 4;
string subscription_id = 5;
}
enum ResourceType {
RESOURCE_TYPE_UNSPECIFIED = 0;
RESOURCE_TYPE_IPV4 = 1;
RESOURCE_TYPE_IPV6 = 2;
RESOURCE_TYPE_VLAN = 3;
RESOURCE_TYPE_MAC = 4;
}
// --- RPC ---
service InventoryService {
rpc ListDevices(ListDevicesRequest) returns (ListDevicesResponse);
rpc GetDevice(GetDeviceRequest) returns (GetDeviceResponse);
rpc GetPortStatus(GetPortStatusRequest) returns (GetPortStatusResponse);
// Resource management
rpc CheckAvailability(CheckAvailabilityRequest) returns (CheckAvailabilityResponse);
rpc ReservePort(ReservePortRequest) returns (ReservePortResponse);
rpc ReleasePort(ReleasePortRequest) returns (ReleasePortResponse);
rpc BindServiceInstance(BindServiceInstanceRequest) returns (BindServiceInstanceResponse);
rpc AllocateIP(AllocateIPRequest) returns (AllocateIPResponse);
rpc ReleaseIP(ReleaseIPRequest) returns (ReleaseIPResponse);
}
message CheckAvailabilityRequest {
string address_id = 1; // Ссылка на адрес из Customer Core
string product_offering_id = 2; // Для проверки технических требований
}
message CheckAvailabilityResponse {
bool available = 1;
string device_id = 2;
string port_id = 3;
string reason = 4; // Если unavailable — причина
}
message ReservePortRequest {
string port_id = 1 [(buf.validate.field).string.uuid = true];
string order_id = 2 [(buf.validate.field).string.uuid = true];
int32 ttl_hours = 3; // Через сколько часов автоосвобождение
string idempotency_key = 4 [(buf.validate.field).string.uuid = true];
}provisioning/v1/provisioning_service.proto
syntax = "proto3";
package provisioning.v1;
import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";
// --- Агрегат: ProvisioningTask ---
message ProvisioningTask {
string id = 1;
string order_id = 2;
ProvisioningAction action = 3;
ProvisioningStatus status = 4;
string device_id = 5;
string port_id = 6;
map<string, string> parameters = 7; // vendor-specific params
int32 retry_count = 8;
string error_message = 9;
google.protobuf.Timestamp created_at = 10;
google.protobuf.Timestamp completed_at = 11;
}
enum ProvisioningAction {
PROVISIONING_ACTION_UNSPECIFIED = 0;
PROVISIONING_ACTION_ACTIVATE = 1;
PROVISIONING_ACTION_DEACTIVATE = 2;
PROVISIONING_ACTION_CHANGE_SPEED = 3;
PROVISIONING_ACTION_SUSPEND = 4;
PROVISIONING_ACTION_RESUME = 5;
PROVISIONING_ACTION_REPLACE_DEVICE = 6;
}
enum ProvisioningStatus {
PROVISIONING_STATUS_UNSPECIFIED = 0;
PROVISIONING_STATUS_RECEIVED = 1;
PROVISIONING_STATUS_VALIDATING = 2;
PROVISIONING_STATUS_EXECUTING = 3;
PROVISIONING_STATUS_VERIFYING = 4;
PROVISIONING_STATUS_COMPLETED = 5;
PROVISIONING_STATUS_FAILED = 6;
PROVISIONING_STATUS_ROLLING_BACK = 7;
}
// --- RPC (минимальный, т.к. основной вход через RabbitMQ) ---
service ProvisioningService {
rpc GetTaskStatus(GetTaskStatusRequest) returns (GetTaskStatusResponse);
rpc RetryTask(RetryTaskRequest) returns (RetryTaskResponse);
rpc ListTasks(ListTasksRequest) returns (ListTasksResponse);
}aaa/v1/aaa_service.proto
syntax = "proto3";
package aaa.v1;
import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";
// --- Агрегат: RadiusProfile ---
message RadiusProfile {
string id = 1;
string subscription_id = 2;
string username = 3;
AuthType auth_type = 4;
repeated RadiusAttribute check_attrs = 5; // radcheck
repeated RadiusAttribute reply_attrs = 6; // radreply
google.protobuf.Timestamp created_at = 7;
google.protobuf.Timestamp updated_at = 8;
}
enum AuthType {
AUTH_TYPE_UNSPECIFIED = 0;
AUTH_TYPE_PPPOE = 1;
AUTH_TYPE_IPOE = 2;
AUTH_TYPE_HOTSPOT = 3;
}
message RadiusAttribute {
string name = 1; // "Mikrotik-Rate-Limit", "Framed-IP-Address"
string value = 2; // "100M/100M", "192.168.1.100"
string op = 3; // ":=", "==", "+="
}
// --- Value Object: Session ---
message Session {
string id = 1; // acct_session_id
string username = 2;
string nas_ip = 3;
string framed_ip = 4;
string mac_address = 5;
string nas_port_id = 6;
google.protobuf.Timestamp started_at = 7;
int64 bytes_in = 8;
int64 bytes_out = 9;
}
// --- RPC ---
service AAAService {
rpc CreateProfile(CreateProfileRequest) returns (CreateProfileResponse);
rpc UpdateProfile(UpdateProfileRequest) returns (UpdateProfileResponse);
rpc DeleteProfile(DeleteProfileRequest) returns (DeleteProfileResponse);
rpc SendCoA(SendCoARequest) returns (SendCoAResponse);
rpc DisconnectSession(DisconnectSessionRequest) returns (DisconnectSessionResponse);
rpc ListActiveSessions(ListActiveSessionsRequest) returns (ListActiveSessionsResponse);
rpc GetSessionHistory(GetSessionHistoryRequest) returns (GetSessionHistoryResponse);
}
message SendCoARequest {
string username = 1;
string nas_ip = 2;
repeated RadiusAttribute attributes = 3; // Новые атрибуты для CoA
}notification/v1/notification_service.proto
syntax = "proto3";
package notification.v1;
import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";
message NotificationRequest {
string customer_id = 1 [(buf.validate.field).string.uuid = true];
Channel channel = 2;
string template_id = 3;
map<string, string> variables = 4; // Переменные для шаблона
Priority priority = 5;
string idempotency_key = 6 [(buf.validate.field).string.uuid = true];
}
enum Channel {
CHANNEL_UNSPECIFIED = 0;
CHANNEL_SMS = 1;
CHANNEL_EMAIL = 2;
CHANNEL_PUSH = 3;
CHANNEL_TELEGRAM = 4;
CHANNEL_WEBHOOK = 5;
}
enum Priority {
PRIORITY_UNSPECIFIED = 0;
PRIORITY_LOW = 1; // Batch (1 раз в час)
PRIORITY_NORMAL = 2; // < 5 мин
PRIORITY_HIGH = 3; // < 1 мин
PRIORITY_CRITICAL = 4; // Мгновенно
}
service NotificationService {
rpc Send(NotificationRequest) returns (NotificationResponse);
rpc ListTemplates(ListTemplatesRequest) returns (ListTemplatesResponse);
rpc GetDeliveryStatus(GetDeliveryStatusRequest) returns (GetDeliveryStatusResponse);
}Синхронные вызовы: ConnectRPC
Для service-to-service и BFF-to-service используем ConnectRPC — современный RPC-фреймворк поверх Protobuf.
Почему ConnectRPC, а не чистый gRPC
| Критерий | ConnectRPC | gRPC |
|---|---|---|
| HTTP-протокол | HTTP/1.1 + HTTP/2 + HTTP/3 | Только HTTP/2 |
| Браузеры | Нативная поддержка (без прокси) | Нужен grpc-web прокси |
| JSON fallback | Автоматический (curl-friendly) | Только бинарный |
| Совместимость с gRPC | Полная (тот же .proto, те же клиенты) | — |
| Streaming | Поддержка (server/client/bidi) | Поддержка |
| Interceptors | Go-идиоматичные | Тяжелый middleware API |
Архитектура вызовов
Принцип:
- Frontend → BFF: ConnectRPC через HTTP/1.1 + JSON (curl-friendly, легко дебажить).
- BFF → Microservice: ConnectRPC через HTTP/2 + Protobuf binary (максимальная производительность).
- Microservice → Microservice: ConnectRPC через HTTP/2 + Protobuf binary.
Карта RPC-сервисов
| Protobuf Service | Владелец | Ключевые RPC |
|---|---|---|
CustomerService | Customer Core | GetCustomer, ListCustomers, CreateCustomer, CheckAvailability |
ProductService | Product & Subscription | GetProduct, ListProducts, CreateSubscription, ChangeSubscription |
BillingService | Billing & Finance | GetAccount, GetBalance, ListTransactions, RegisterPayment |
InventoryService | Network Inventory | ListDevices, GetDevicePorts, ReservePort, ReleasePort, AllocateIP |
ProvisioningService | Provisioning | GetProvisioningStatus, RetryProvisioning |
OrderService | OMS | CreateOrder, GetOrder, CancelOrder |
NotificationService | Notification | SendNotification, ListTemplates |
Валидация (protovalidate)
Валидация полей описывается в .proto, генерируется автоматически и выполняется на сервере через interceptor:
message RegisterPaymentRequest {
string account_id = 1 [(buf.validate.field).string.uuid = true];
string amount = 2 [(buf.validate.field).string.min_len = 1];
string currency = 3 [(buf.validate.field).string = {in: ["RUB", "USD", "EUR"]}];
string external_id = 4 [(buf.validate.field).string.min_len = 1];
string idempotency_key = 5 [(buf.validate.field).string.uuid = true];
}Асинхронный обмен: RabbitMQ
Все межсервисные команды и события передаются через RabbitMQ (Quorum Queues, Topic Exchanges).
Почему RabbitMQ
| Критерий | RabbitMQ 4.x | Kafka |
|---|---|---|
| Модель | Smart broker, flexible routing | Dumb broker, smart consumer |
| Routing | Topic/Direct/Fanout/Headers exchanges | Только партиции |
| Приоритеты сообщений | Нативная поддержка | Нет |
| Dead Letter | DLX + DLQ из коробки | Ручная реализация |
| TTL сообщений | Per-message и per-queue | Только retention |
| Quorum Queues | Raft-based, durable, replicated | — |
| Streams | Append-only log (как Kafka) | Основная модель |
| Операционная сложность | Проще (один кластер) | ZooKeeper/KRaft + больше tunning |
| Масштаб нашей задачи | Идеален для ISP (~100K абонентов) | Overkill для нашего масштаба |
Топология RabbitMQ
Карта Exchanges и Queues
| Exchange | Тип | Routing Key Pattern | Продюсер | Quorum Queue (пример) |
|---|---|---|---|---|
customer.events | Topic | customer.created, contract.signed | Customer Core | billing.q.customer.events |
product.events | Topic | subscription.activated, subscription.suspended | Product & Sub | provisioning.q.product.events |
billing.events | Topic | payment.received, balance.updated, access.block | Billing | oms.q.billing.events |
provisioning.events | Topic | provisioning.success, provisioning.failed | Provisioning | oms.q.provisioning.events |
provisioning.commands | Direct | service.activate, access.suspend, access.resume | OMS, Billing | provisioning.q.commands |
billing.commands | Direct | charge.account, refund | OMS | billing.q.commands |
notification.commands | Direct | send.sms, send.email | Все домены | notification.q.commands |
mediation.events | Topic | usage.reported | Mediation | billing.q.mediation.events |
fsm.events | Topic | work.completed, work.failed | FSM | oms.q.fsm.events |
Конфигурация очередей
Все production-очереди — Quorum Queues (Raft-based, replicated, durable):
# Пример декларации через RabbitMQ definitions или Terraform
queues:
provisioning.q.commands:
type: quorum
arguments:
x-delivery-limit: 5 # Макс. ретраев перед DLQ
x-dead-letter-exchange: dlx # Dead Letter Exchange
x-dead-letter-routing-key: provisioning.q.commands.dlq
x-queue-leader-locator: balanced
provisioning.q.commands.dlq:
type: quorum
arguments:
x-message-ttl: 604800000 # 7 дней хранения в DLQФормат сообщений (Protobuf EventEnvelope)
Все сообщения в RabbitMQ сериализуются в Protobuf через EventEnvelope:
// billing/v1/billing_events.proto
message PaymentReceivedEvent {
string account_id = 1;
string amount = 2; // Decimal as string (точность!)
string currency = 3;
string payment_id = 4;
string source = 5; // "sberbank", "tinkoff", "cash"
string external_id = 6; // ID от платёжного шлюза
}
message AccessBlockEvent {
string account_id = 1;
string reason = 2; // "finance", "admin", "fraud"
string subscription_id = 3;
}Публикация:
Exchange: billing.events
Routing Key: payment.received
Content-Type: application/protobuf
Headers:
x-event-id: evt-2025-001-abc (UUID, для дедупликации)
x-correlation-id: order-2025-001
x-causation-id: webhook-bank-xyz
x-source: billing-service
x-timestamp: 2025-01-15T12:00:00Z
Body: <serialized PaymentReceivedEvent>Гарантии доставки
- Publisher Confirms: Каждый продюсер использует publisher confirms для гарантии записи в RabbitMQ.
- Quorum Queues: Raft-репликация. Сообщение считается принятым после записи на majority нод.
- Manual Ack: Консьюмеры используют manual acknowledgment. Сообщение подтверждается после успешной обработки + записи в inbox.
- Dead Letter (DLX): После
x-delivery-limitретраев сообщение переходит в DLQ. Алерт в мониторинг. Ручной разбор через UI. - Idempotency: Inbox-паттерн на стороне каждого консьюмера (по
x-event-id).
Паттерны интеграции
Transactional Outbox Pattern
Гарантирует атомарную публикацию события после коммита в БД. Без outbox — при падении между коммитом и publish сообщение теряется.
Таблица outbox:
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL DEFAULT gen_random_uuid(),
event_type TEXT NOT NULL, -- 'payment.received'
routing_key TEXT NOT NULL, -- routing key для exchange
exchange TEXT NOT NULL, -- 'billing.events'
payload BYTEA NOT NULL, -- Serialized Protobuf
headers JSONB NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'pending', -- pending → published
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_pending ON outbox (status, id) WHERE status = 'pending';Inbox Pattern (Дедупликация)
Защищает от повторной обработки при at-least-once доставке:
CREATE TABLE inbox (
event_id UUID PRIMARY KEY,
event_type TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);1. Consumer получает message
2. Извлекает event_id из header x-event-id
3. BEGIN TX
4. INSERT INTO inbox (event_id, event_type) ON CONFLICT DO NOTHING
5. Если inserted — обрабатывает бизнес-логику
6. Если conflict — SKIP (уже обработано)
7. COMMIT
8. ACK messageSaga Pattern (Orchestration)
Для длительных процессов, требующих компенсации при ошибке. OMS выступает оркестратором:
Каждый шаг = RabbitMQ command → ожидание event. OMS хранит состояние саги в БД и реагирует на входящие события.
Идемпотентность
Все мутирующие RPC поддерживают поле idempotency_key в запросе:
message RegisterPaymentRequest {
string account_id = 1;
string amount = 2;
string currency = 3;
string external_id = 4;
// Ключ идемпотентности. Повторный вызов с тем же ключом
// возвращает тот же результат, не создавая дубликат.
string idempotency_key = 5 [(buf.validate.field).string.uuid = true];
}Правила:
- Повторный вызов с тем же
idempotency_keyвозвращает закэшированный результат. - Ключ хранится 24 часа (настраивается).
- Для webhook'ов от банков:
idempotency_key=external_payment_id.
Rate Limiting и Throttling
| Тип клиента | Лимит | Burst |
|---|---|---|
| Internal Service | 10,000 req/min | 500 |
| Operator (CRM) | 1,000 req/min | 100 |
| Customer Portal (ЛК) | 300 req/min | 50 |
| External Partner | 100 req/min | 20 |
Rate limiting реализуется на уровне Kubernetes NGINX Ingress (annotations nginx.ingress.kubernetes.io/limit-rps) + ConnectRPC interceptor. При превышении возвращается gRPC status RESOURCE_EXHAUSTED.
Полный каталог Domain Events (Protobuf)
Все доменные события типизированы в Protobuf. Payload сериализуется в EventEnvelope.payload.
customer/v1/events.proto
syntax = "proto3";
package customer.v1;
import "google/protobuf/timestamp.proto";
message CustomerCreatedEvent {
string customer_id = 1;
string full_name = 2;
string type = 3; // "individual" / "legal_entity"
}
message CustomerUpdatedEvent {
string customer_id = 1;
repeated string changed_fields = 2; // ["full_name", "inn"]
}
message ContractSignedEvent {
string contract_id = 1;
string customer_id = 2;
string contract_number = 3;
google.protobuf.Timestamp signed_at = 4;
}
message ContractTerminatedEvent {
string contract_id = 1;
string customer_id = 2;
string reason = 3;
}product/v1/events.proto
syntax = "proto3";
package product.v1;
import "google/protobuf/timestamp.proto";
message SubscriptionActivatedEvent {
string subscription_id = 1;
string customer_id = 2;
string product_offering_id = 3;
map<string, string> parameters = 4; // speed, ip, vlan...
}
message SubscriptionSuspendedEvent {
string subscription_id = 1;
string reason = 2; // "dunning", "user_request", "admin"
}
message SubscriptionResumedEvent {
string subscription_id = 1;
string reason = 2; // "payment", "manual"
}
message SubscriptionTerminatedEvent {
string subscription_id = 1;
string reason = 2;
}
message TariffChangedEvent {
string subscription_id = 1;
string old_product_id = 2;
string new_product_id = 3;
google.protobuf.Timestamp effective_at = 4;
}billing/v1/events.proto
syntax = "proto3";
package billing.v1;
import "common/v1/money.proto";
import "google/protobuf/timestamp.proto";
message PaymentReceivedEvent {
string account_id = 1;
string payment_id = 2;
common.v1.Money amount = 3;
string source = 4; // "sberbank", "tinkoff", "cash"
string external_id = 5;
}
message BalanceNegativeEvent {
string account_id = 1;
common.v1.Money balance = 2;
string customer_id = 3;
}
message DunningStageChangedEvent {
string account_id = 1;
string customer_id = 2;
string old_stage = 3; // "none", "soft", "hard"
string new_stage = 4;
}
message InvoiceIssuedEvent {
string invoice_id = 1;
string account_id = 2;
common.v1.Money total = 3;
}
message ChargeCompletedEvent {
string account_id = 1;
string subscription_id = 2;
common.v1.Money amount = 3;
string period = 4; // "2025-02"
}provisioning/v1/events.proto
syntax = "proto3";
package provisioning.v1;
message ProvisioningSuccessEvent {
string task_id = 1;
string order_id = 2;
string subscription_id = 3;
string action = 4; // "activate", "suspend", "change_speed"
}
message ProvisioningFailedEvent {
string task_id = 1;
string order_id = 2;
string subscription_id = 3;
string action = 4;
string error_code = 5;
string error_message = 6;
bool is_retryable = 7;
}
message ProvisioningRollbackEvent {
string task_id = 1;
string order_id = 2;
string action = 3;
bool success = 4;
}inventory/v1/events.proto
syntax = "proto3";
package inventory.v1;
import "google/protobuf/timestamp.proto";
message PortReservedEvent {
string port_id = 1;
string device_id = 2;
string order_id = 3;
google.protobuf.Timestamp reserved_until = 4;
}
message PortReleasedEvent {
string port_id = 1;
string reason = 2; // "order_cancelled", "ttl_expired", "manual"
}
message ResourceExhaustedEvent {
string device_id = 1;
string resource_type = 2; // "port", "ip", "vlan"
string location = 3;
}
message DeviceStatusChangedEvent {
string device_id = 1;
string old_status = 2;
string new_status = 3;
}Полный каталог Command Messages (Protobuf)
Команды — это намерения (intent), которые один сервис отправляет другому через RabbitMQ Direct Exchange.
provisioning/v1/commands.proto
syntax = "proto3";
package provisioning.v1;
// Команда: активировать услугу на оборудовании
message ActivateServiceCommand {
string order_id = 1;
string subscription_id = 2;
string device_id = 3;
string port_id = 4;
map<string, string> parameters = 5; // speed, vlan, ip, ont_sn...
}
// Команда: приостановить доступ (dunning / admin)
message SuspendAccessCommand {
string subscription_id = 1;
string reason = 2;
string redirect_url = 3; // URL captive portal (для soft block)
}
// Команда: возобновить доступ (после оплаты)
message ResumeAccessCommand {
string subscription_id = 1;
string reason = 2;
}
// Команда: изменить скорость (смена тарифа)
message ChangeSpeedCommand {
string subscription_id = 1;
int32 download_mbps = 2;
int32 upload_mbps = 3;
}
// Команда: деактивировать (расторжение)
message DeactivateServiceCommand {
string subscription_id = 1;
string order_id = 2;
}billing/v1/commands.proto
syntax = "proto3";
package billing.v1;
import "common/v1/money.proto";
// Команда: списать средства (рекуррент / разовый)
message ChargeAccountCommand {
string account_id = 1;
common.v1.Money amount = 2;
string reason = 3; // "monthly_fee", "connection_fee"
string subscription_id = 4;
string idempotency_key = 5;
}
// Команда: возврат средств (компенсация Saga)
message RefundCommand {
string account_id = 1;
common.v1.Money amount = 2;
string reason = 3;
string original_transaction_id = 4;
string idempotency_key = 5;
}notification/v1/commands.proto
syntax = "proto3";
package notification.v1;
// Команда: отправить уведомление клиенту
message SendNotificationCommand {
string customer_id = 1;
string template_id = 2;
string channel = 3; // "sms", "email", "push", "telegram"
map<string, string> variables = 4;
string priority = 5; // "critical", "high", "normal", "low"
string idempotency_key = 6;
}Полная матрица Exchange → Queue → Consumer
Подробная привязка каждого exchange к каждому consumer через конкретные queues и routing keys:
Event Exchanges (Topic)
| Exchange | Routing Key | Queue | Consumer | Назначение |
|---|---|---|---|---|
customer.events | customer.created | billing.q.customer-created | Billing | Создать Account для нового клиента |
customer.events | customer.created | notification.q.customer-created | Notification | Welcome-уведомление |
customer.events | contract.signed | oms.q.contract-signed | OMS | Инициировать процесс подключения |
customer.events | contract.terminated | billing.q.contract-terminated | Billing | Финальный счёт + закрытие |
product.events | subscription.activated | provisioning.q.subscription-activated | Provisioning | Провижининг новой услуги |
product.events | subscription.activated | billing.q.subscription-activated | Billing | Начало тарификации |
product.events | subscription.suspended | provisioning.q.subscription-suspended | Provisioning | Блокировка на оборудовании |
product.events | subscription.terminated | provisioning.q.subscription-terminated | Provisioning | Деактивация |
product.events | subscription.terminated | billing.q.subscription-terminated | Billing | Стоп-тарификация |
product.events | subscription.tariff_changed | provisioning.q.tariff-changed | Provisioning | CoA (смена скорости) |
product.events | subscription.tariff_changed | billing.q.tariff-changed | Billing | Перерасчёт |
billing.events | payment.received | oms.q.payment-received | OMS | Проверка: ожидает ли процесс оплату |
billing.events | payment.received | notification.q.payment-received | Notification | SMS "Оплата зачислена" |
billing.events | balance.negative | notification.q.balance-negative | Notification | SMS "Пополните баланс" |
billing.events | dunning.stage_changed | provisioning.q.dunning | Provisioning | Suspend/Resume по dunning |
billing.events | dunning.stage_changed | notification.q.dunning | Notification | "Доступ ограничен" |
provisioning.events | provisioning.success | oms.q.provisioning-result | OMS | Saga: шаг завершён |
provisioning.events | provisioning.failed | oms.q.provisioning-result | OMS | Saga: шаг провален → компенсация |
inventory.events | resource.exhausted | notification.q.resource-alert | Notification | Алерт операторам |
mediation.events | usage.reported | billing.q.usage | Billing | Usage-based тарификация |
fsm.events | work.completed | oms.q.fsm-result | OMS | Монтаж завершён → продолжить Saga |
fsm.events | work.failed | oms.q.fsm-result | OMS | Монтаж провален → компенсация |
Command Exchanges (Direct)
| Exchange | Routing Key | Queue | Consumer | Назначение |
|---|---|---|---|---|
provisioning.commands | service.activate | provisioning.q.commands | Provisioning | Активация на оборудовании |
provisioning.commands | service.deactivate | provisioning.q.commands | Provisioning | Деактивация |
provisioning.commands | access.suspend | provisioning.q.commands | Provisioning | Блокировка (dunning/admin) |
provisioning.commands | access.resume | provisioning.q.commands | Provisioning | Разблокировка |
provisioning.commands | speed.change | provisioning.q.commands | Provisioning | CoA (смена скорости) |
billing.commands | charge.account | billing.q.commands | Billing | Списание (OMS Saga) |
billing.commands | refund | billing.q.commands | Billing | Возврат (Saga compensation) |
notification.commands | send.sms | notification.q.sms | Notification | Отправка SMS |
notification.commands | send.email | notification.q.email | Notification | Отправка Email |
notification.commands | send.push | notification.q.push | Notification | Отправка Push |
notification.commands | send.telegram | notification.q.telegram | Notification | Отправка в Telegram |
Background Jobs: river (Go)
river — PostgreSQL-based фоновый обработчик задач для Go. Аналог BullMQ (Node.js), но использует ту же БД что и приложение. Транзакционная постановка задач — задача гарантированно создаётся вместе с бизнес-данными (атомарно).
Почему river
| Критерий | river (Go + PostgreSQL) | BullMQ (Node.js + Redis) | Temporal |
|---|---|---|---|
| Транзакционность | Задача в той же TX что и бизнес-данные | Отдельный Redis, нет TX | Отдельный кластер |
| Инфраструктура | PostgreSQL (уже есть) | Redis (доп. компонент) | Temporal Server (тяжёлый) |
| Cron/Scheduled | Встроенный PeriodicJobs | Встроенный repeatable | Встроенный schedule |
| Retry | Configurable per-job | Configurable | Configurable |
| Приоритеты | Встроенные queue priorities | Встроенные | — |
| UI | River UI (веб-панель) | Bull Board | Temporal UI |
| Язык | Go-native | TypeScript/Node.js | Любой (через SDK) |
Когда river, когда RabbitMQ
| Задача | river | RabbitMQ |
|---|---|---|
| Генерация PDF-счёта | ✅ | — |
| Рекуррентное списание (cron) | ✅ | — |
| Очистка expired резервов | ✅ | — |
| Reconciliation (сверка) | ✅ | — |
| Отправка email/SMS | — | ✅ (notification.commands) |
| Provisioning команды | — | ✅ (provisioning.commands) |
| Domain events (межсервисные) | — | ✅ (topic exchanges) |
| Saga-оркестрация | — | ✅ (OMS ↔ services) |
Правило: river = внутрисервисные фоновые задачи. RabbitMQ = межсервисная коммуникация.
Пример: рекуррентный биллинг через river
// internal/billing/jobs/recurring_charge.go
package jobs
import (
"context"
"github.com/riverqueue/river"
)
// Args — аргументы задачи (сериализуются в JSON в PostgreSQL)
type RecurringChargeArgs struct {
AccountID string `json:"account_id"`
SubscriptionID string `json:"subscription_id"`
Period string `json:"period"` // "2025-02"
}
func (RecurringChargeArgs) Kind() string { return "billing.recurring_charge" }
// Worker — обработчик задачи
type RecurringChargeWorker struct {
river.WorkerDefaults[RecurringChargeArgs]
billingService *BillingService
}
func (w *RecurringChargeWorker) Work(ctx context.Context, job *river.Job[RecurringChargeArgs]) error {
return w.billingService.ChargeRecurring(ctx, job.Args.AccountID, job.Args.SubscriptionID, job.Args.Period)
}Пример: cron-задачи через river PeriodicJobs
// internal/billing/app.go
func setupRiver(pool *pgxpool.Pool) *river.Client[pgx.Tx] {
workers := river.NewWorkers()
river.AddWorker(workers, &RecurringChargeWorker{billingService: svc})
river.AddWorker(workers, &InvoiceGeneratorWorker{billingService: svc})
river.AddWorker(workers, &DunningCheckWorker{billingService: svc})
client, _ := river.NewClient(riverpgxv5.New(pool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 10},
"billing_critical": {MaxWorkers: 5},
"billing_background": {MaxWorkers: 3},
},
PeriodicJobs: []*river.PeriodicJob{
// Каждую ночь в 03:00 — рекуррентные списания
river.NewPeriodicJob(
river.PeriodicInterval(24*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return &BatchRecurringChargeArgs{Date: time.Now().Format("2006-01-02")},
&river.InsertOpts{Queue: "billing_critical"}
},
&river.PeriodicJobOpts{RunOnStart: false},
),
// Каждые 4 часа — проверка dunning
river.NewPeriodicJob(
river.PeriodicInterval(4*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return &DunningCheckArgs{}, &river.InsertOpts{Queue: "billing_background"}
},
nil,
),
// Каждый час — очистка expired port reservations
river.NewPeriodicJob(
river.PeriodicInterval(1*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return &CleanExpiredReservationsArgs{}, nil
},
nil,
),
},
})
return client
}Транзакционная постановка задачи (ключевое преимущество)
// Задача создаётся В ТОЙ ЖЕ транзакции что и бизнес-данные.
// Если TX откатится — задача тоже исчезнет. Нет "потерянных" задач.
func (s *BillingService) RegisterPayment(ctx context.Context, req *RegisterPaymentRequest) error {
tx, _ := s.pool.Begin(ctx)
defer tx.Rollback(ctx)
// 1. Бизнес-логика
_ = s.repo.CreditAccount(ctx, tx, req.AccountID, req.Amount)
// 2. Создать задачу на генерацию квитанции (в той же TX!)
_, _ = s.riverClient.InsertTx(ctx, tx, &GenerateReceiptArgs{
AccountID: req.AccountID,
PaymentID: paymentID,
}, nil)
// 3. Создать outbox-запись для domain event (в той же TX!)
_ = s.outbox.Insert(ctx, tx, "billing.events", "payment.received", payloadBytes)
return tx.Commit(ctx)
}Event Bus: Watermill (абстракция над RabbitMQ)
Watermill — Go-библиотека для event-driven приложений. Абстрагирует pub/sub от конкретного брокера (RabbitMQ, Kafka, PostgreSQL). Позволяет легко переключиться между брокерами и добавляет middleware.
Зачем Watermill поверх amqp091-go
| Без Watermill | С Watermill |
|---|---|
| Ручное управление connections, channels, reconnect | Автоматический reconnect + health check |
| Ручная сериализация/десериализация | Middleware: marshal/unmarshal |
| Ручной retry, DLQ, logging | Middleware: retry, poison queue, throttle, correlation |
| Код привязан к RabbitMQ | Код привязан к интерфейсу Publisher/Subscriber |
Пример: Publisher (Billing → RabbitMQ)
// internal/billing/infra/publisher.go
package infra
import (
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
)
func NewEventPublisher(amqpURI string) (*amqp.Publisher, error) {
config := amqp.NewDurableQueueConfig(amqpURI)
// Кастомизация: Topic Exchange + Publisher Confirms
config.Exchange = amqp.ExchangeConfig{
GenerateName: func(topic string) string { return "billing.events" },
Type: "topic",
Durable: true,
}
config.Publish.GenerateRoutingKey = func(topic string) string { return topic }
config.Publish.ConfirmDelivery = true
return amqp.NewPublisher(config, watermill.NewSlogLogger(slog.Default()))
}Пример: Subscriber (OMS ← RabbitMQ)
// internal/oms/infra/subscriber.go
func NewEventSubscriber(amqpURI string) (*amqp.Subscriber, error) {
config := amqp.NewDurableQueueConfig(amqpURI)
config.Queue = amqp.QueueConfig{
GenerateName: func(topic string) string {
return "oms.q." + strings.ReplaceAll(topic, ".", "-")
},
Durable: true,
Arguments: amqp.Table{
"x-queue-type": "quorum",
"x-delivery-limit": int32(5),
"x-dead-letter-exchange": "dlx",
},
}
return amqp.NewSubscriber(config, watermill.NewSlogLogger(slog.Default()))
}Пример: Router (маршрутизация событий к обработчикам)
// internal/oms/app.go
func setupRouter(pub *amqp.Publisher, sub *amqp.Subscriber) *message.Router {
router, _ := message.NewRouter(message.RouterConfig{}, watermill.NewSlogLogger(slog.Default()))
// Middleware
router.AddMiddleware(
middleware.CorrelationID, // Прокидывание correlation_id
middleware.Retry{MaxRetries: 3, InitialInterval: time.Second}.Middleware,
middleware.Recoverer, // Panic recovery
middleware.NewThrottle(100, time.Second).Middleware, // Rate limit
)
// Обработчики событий
router.AddHandler("provisioning-result",
"provisioning.success", // Подписка на routing key
sub,
"oms.saga.next-step", // Публикация (если нужно)
pub,
omsHandlers.HandleProvisioningResult,
)
router.AddNoPublisherHandler("payment-received",
"payment.received",
sub,
omsHandlers.HandlePaymentReceived,
)
return router
}ConnectRPC: Interceptor Stack
Каждый ConnectRPC-сервер использует стек interceptor'ов (middleware) для сквозных задач:
// internal/pkg/server/interceptors.go
func BuildInterceptors(keycloakKey *rsa.PublicKey, rpcRoles map[string][]string) []connect.Interceptor {
return []connect.Interceptor{
// 1. Recovery (паника → gRPC INTERNAL)
otelconnect.NewInterceptor(), // OpenTelemetry tracing
// 2. Logging (slog)
NewLoggingInterceptor(slog.Default()),
// 3. Auth (JWT from Keycloak + RBAC)
NewAuthInterceptor(keycloakKey, rpcRoles),
// 4. Validation (protovalidate)
NewValidationInterceptor(),
// 5. Rate Limiting
NewRateLimitInterceptor(),
// 6. Idempotency (для мутирующих RPC)
NewIdempotencyInterceptor(),
// 7. Circuit Breaker (для исходящих вызовов)
// Используется на ConnectRPC CLIENT, не на сервере
}
}Circuit Breaker (sony/gobreaker)
Для исходящих ConnectRPC-вызовов (service → service) используем circuit breaker:
// internal/pkg/client/circuit_breaker.go
import "github.com/sony/gobreaker/v2"
func NewCircuitBreaker(name string) *gobreaker.CircuitBreaker[any] {
return gobreaker.NewCircuitBreaker[any](gobreaker.Settings{
Name: name,
MaxRequests: 5, // Полуоткрытое состояние: 5 пробных запросов
Interval: 30 * time.Second, // Сброс счётчиков
Timeout: 60 * time.Second, // Время в Open → Half-Open
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.ConsecutiveFailures > 3 // 3 подряд → Open
},
OnStateChange: func(name string, from, to gobreaker.State) {
slog.Warn("circuit breaker state change",
slog.String("name", name),
slog.String("from", string(from)),
slog.String("to", string(to)),
)
},
})
}Retry с Exponential Backoff (cenkalti/backoff)
// internal/pkg/retry/retry.go
import "github.com/cenkalti/backoff/v4"
func WithRetry(ctx context.Context, operation func() error) error {
b := backoff.NewExponentialBackOff()
b.InitialInterval = 200 * time.Millisecond
b.MaxInterval = 30 * time.Second
b.MaxElapsedTime = 5 * time.Minute
b.Multiplier = 2.0
return backoff.Retry(operation, backoff.WithContext(b, ctx))
}DDD: Границы агрегатов в API
Каждый Protobuf service соответствует одному Bounded Context. Правила DDD для API:
Правила агрегатов
- Один агрегат — один сервис.
CustomerServiceуправляет только агрегатомCustomer(+ вложенные Value Objects:Contact,Address,Contract). - Ссылки между контекстами — только по ID.
Subscription.customer_id— этоstring(UUID), не вложенныйCustomermessage. Данные клиента достаются отдельным RPC-вызовом. - Мутации — только через Aggregate Root. Нельзя напрямую менять
Transaction— только черезBillingService.RegisterPayment(который создаёт Transaction внутри Account). - События — факты, команды — намерения.
PaymentReceivedEvent= факт (уже случилось).ChargeAccountCommand= намерение (просьба выполнить). - Eventual Consistency. После
SubscriptionActivatedEventBilling со временем создаст запись тарификации. Немедленная консистентность — только внутри одного агрегата.
Карта зависимостей (кто кого вызывает)
Правило: Sync (ConnectRPC) — для чтений и пользовательских мутаций (через BFF). Async (RabbitMQ) — для межсервисных реакций на события и Saga-шагов.
Полный стек Go-библиотек
| Категория | Библиотека | Версия | Назначение |
|---|---|---|---|
| RPC | connectrpc.com/connect | v1.x | ConnectRPC сервер/клиент |
| Protobuf | google.golang.org/protobuf | v1.x | Protobuf runtime |
| Validation | github.com/bufbuild/protovalidate-go | v0.x | Валидация полей из .proto |
| Messaging | github.com/ThreeDotsLabs/watermill | v1.x | Event bus абстракция |
| Messaging | github.com/ThreeDotsLabs/watermill-amqp/v3 | v3.x | Watermill → RabbitMQ adapter |
| Background Jobs | github.com/riverqueue/river | v0.x | PostgreSQL job queue (аналог BullMQ) |
| Database | github.com/jackc/pgx/v5 | v5.x | PostgreSQL driver |
| Logging | log/slog | stdlib | Structured logging |
| Tracing | go.opentelemetry.io/otel | v1.x | OpenTelemetry SDK |
| Circuit Breaker | github.com/sony/gobreaker/v2 | v2.x | Circuit breaker для исходящих вызовов |
| Retry | github.com/cenkalti/backoff/v4 | v4.x | Exponential backoff |
| HTTP Router | github.com/go-chi/chi/v5 | v5.x | HTTP mux (для health/metrics endpoints) |
| Config | github.com/caarlos0/env/v11 | v11.x | Env-based config |
| Migrations | github.com/pressly/goose/v3 | v3.x | SQL миграции |
| Testing | github.com/stretchr/testify | v1.x | Assertions + mocks |
| Testing | github.com/testcontainers/testcontainers-go | v0.x | Интеграционные тесты с Docker |
Ссылки по теме
- Бизнес-процессы: Saga-оркестрация, компенсация, FSM — Workflows.
- Доменные модели: ER-диаграммы, Context Map, Data Ownership — Доменная логика.
- Legacy маппинг: Как Legacy API маппится на новые Protobuf-контракты — Маппинг Legacy CRM.
- Архитектурные принципы: SOLID, DDD, UoW, CQRS, Clean Architecture — Принципы архитектуры.
- BSS-сервисы: ConnectRPC API поверхности, RBAC per-method — BSS Layer.
- OSS-сервисы: Vendor Adapter, Provisioning FSM — OSS Layer.
- Микросервисы: Каждый сервис подробно — Каталог сервисов.
- Технологии: buf.build, ConnectRPC, RabbitMQ, river, Watermill — Технологический стек.
- Мониторинг: RabbitMQ queue depth, OTel tracing через AMQP headers — Observability.
- Безопасность: JWT interceptor, mTLS, Network Policies — Безопасность.
- Термины: Outbox, Inbox, DLQ, Saga, Circuit Breaker, river — Глоссарий.