G-SERVICE Docs
Архитектура ISP (OSS/BSS)

API-контракты и Интеграция

Protobuf + buf.build для контрактов, ConnectRPC для синхронных вызовов, RabbitMQ для асинхронного обмена.

API-контракты и Интеграция

Все межсервисные контракты описываются в Protocol Buffers (Protobuf) и управляются через buf.build. Синхронные вызовы — через ConnectRPC/gRPC. Асинхронный обмен командами и событиями — через RabbitMQ.

Почему Schema-First для ISP? В телеком-платформе BSS и OSS слои разрабатываются на разных языках (NestJS для BSS, Go для OSS high-load) разными командами. Protobuf-контракты — единственный source of truth, гарантирующий, что subscription.activated из Product Service и ActivateServiceCommand в Provisioning имеют одинаковую структуру. Это критично, поскольку рассогласование контрактов в ISP = абонент без интернета или потеря платежа. В отличие от монолитных ISP-систем (Hydra Billing, Splynx), где интеграция — внутренние вызовы функций, наш schema-first подход обеспечивает breaking change detection в CI до деплоя.


Schema-First: Protobuf + buf.build

Почему Protobuf, а не JSON/OpenAPI

КритерийProtobufJSON / OpenAPI
ТипобезопасностьСтрогая, compile-timeRuntime валидация
ПроизводительностьБинарный, ~10× быстрее сериализацииТекстовый, verbose
КодогенерацияНативная (Go, TS, Swift, Kotlin)Сторонние генераторы
Обратная совместимостьВстроенные правила (buf breaking)Ручная проверка
ДокументацияКомментарии в .proto = документацияОтдельный файл
Экосистемаbuf.build BSR, lint, formatРазрозненная

Структура proto-репозитория

proto/
├── buf.yaml                        # Конфигурация buf модуля
├── buf.gen.yaml                    # Правила кодогенерации
├── buf.lock                        # Lock-файл зависимостей
├── common/
│   └── v1/
│       ├── pagination.proto        # Курсорная пагинация
│       ├── metadata.proto          # RequestMetadata (correlation/causation)
│       ├── money.proto             # Money (amount + currency as string)
│       └── events.proto            # EventEnvelope, CommandEnvelope
├── customer/
│   └── v1/
│       ├── customer.proto          # Модели: Customer, Contract, Contact
│       ├── customer_service.proto  # RPC: CustomerService
│       └── events.proto            # CustomerCreated, ContractSigned...
├── product/
│   └── v1/
│       ├── product.proto           # ProductOffering, Subscription
│       ├── product_service.proto   # RPC: ProductService
│       └── events.proto            # SubscriptionActivated, TariffChanged...
├── billing/
│   └── v1/
│       ├── billing.proto           # Account, Transaction, Invoice
│       ├── billing_service.proto   # RPC: BillingService
│       ├── events.proto            # PaymentReceived, DunningStageChanged...
│       └── commands.proto          # ChargeAccountCommand, RefundCommand
├── provisioning/
│   └── v1/
│       ├── provisioning.proto      # ProvisioningTask
│       ├── provisioning_service.proto # RPC (read-only)
│       ├── events.proto            # ProvisioningSuccess/Failed/Rollback
│       └── commands.proto          # ActivateService, SuspendAccess...
├── inventory/
│   └── v1/
│       ├── inventory.proto         # Device, Port, LogicalResource
│       ├── inventory_service.proto # RPC: InventoryService
│       └── events.proto            # PortReserved, ResourceExhausted...
├── aaa/
│   └── v1/
│       ├── aaa.proto               # RadiusProfile, Session, RadiusAttribute
│       └── aaa_service.proto       # RPC: AAAService (CoA, profiles, sessions)
├── orchestration/
│   └── v1/
│       ├── order.proto             # Order, OrderStep, OrderStatus
│       └── order_service.proto     # RPC: OrderService
└── notification/
    └── v1/
        ├── notification_service.proto # RPC: NotificationService
        └── commands.proto           # SendNotificationCommand

buf.yaml

version: v2
modules:
  - path: proto
    name: buf.build/isp-platform/api
lint:
  use:
    - STANDARD
    - COMMENTS
  except:
    - PACKAGE_VERSION_SUFFIX
breaking:
  use:
    - PACKAGE

buf.gen.yaml (кодогенерация)

version: v2
managed:
  enabled: true
  override:
    - file_option: go_package_prefix
      value: github.com/isp-platform/api/gen/go
plugins:
  # Go Protobuf
  - remote: buf.build/protocolbuffers/go
    out: gen/go
    opt: paths=source_relative
  # ConnectRPC (Go)
  - remote: buf.build/connectrpc/go
    out: gen/go
    opt: paths=source_relative
  # ConnectRPC (TypeScript — для фронтенда)
  - remote: buf.build/connectrpc/es
    out: gen/ts
    opt: target=ts
  # Protovalidate (валидация полей)
  - remote: buf.build/bufbuild/protovalidate-go
    out: gen/go
    opt: paths=source_relative

CI Pipeline: buf в CI/CD

# .gitlab-ci.yml / GitHub Actions
buf-check:
  steps:
    - buf lint # Проверка стиля и именования
    - buf format --diff # Форматирование
    - buf breaking # Детекция ломающих изменений (против main)
    - buf generate # Кодогенерация
    - buf push # Публикация в BSR (Buf Schema Registry)

Правила:

  • buf lint — обязателен в CI. Блокирует merge при нарушении стандартов.
  • buf breaking — сравнивает с main веткой. Ломающие изменения требуют ревью и bump версии пакета (v1v2).
  • buf push — публикует в BSR при merge в main. Все сервисы тянут сгенерированный код из BSR.

Пример: Protobuf-контракт

common/v1/metadata.proto

syntax = "proto3";
package common.v1;

import "google/protobuf/timestamp.proto";

// Метаданные, обязательные для каждого запроса и события.
message RequestMetadata {
  // Уникальный ID запроса (UUID v4).
  string request_id = 1;
  // Сквозной ID процесса для трейсинга.
  string correlation_id = 2;
  // ID события/запроса, породившего текущий.
  string causation_id = 3;
}

// Стандартная обёртка для событий RabbitMQ.
message EventEnvelope {
  string event_id = 1;
  string event_type = 2;       // e.g. "billing.payment.received"
  string source = 3;           // e.g. "billing-service"
  string subject = 4;          // e.g. "account/acc-123"
  google.protobuf.Timestamp occurred_at = 5;
  RequestMetadata metadata = 6;
  bytes payload = 7;           // Сериализованный Protobuf конкретного события
}

customer/v1/customer_service.proto

syntax = "proto3";
package customer.v1;

import "common/v1/pagination.proto";
import "common/v1/metadata.proto";
import "buf/validate/validate.proto";

// Клиент (физлицо или юрлицо).
message Customer {
  string id = 1;
  CustomerType type = 2;
  string full_name = 3;
  string inn = 4;
  repeated Contact contacts = 5;
  repeated Contract contracts = 6;
}

enum CustomerType {
  CUSTOMER_TYPE_UNSPECIFIED = 0;
  CUSTOMER_TYPE_INDIVIDUAL = 1;
  CUSTOMER_TYPE_LEGAL_ENTITY = 2;
}

message Contact {
  string id = 1;
  ContactType type = 2;
  string value = 3 [(buf.validate.field).string.min_len = 1];
  bool is_verified = 4;
}

enum ContactType {
  CONTACT_TYPE_UNSPECIFIED = 0;
  CONTACT_TYPE_PHONE = 1;
  CONTACT_TYPE_EMAIL = 2;
}

message Contract {
  string id = 1;
  string number = 2;
  string status = 3;
  google.protobuf.Timestamp signed_at = 4;
}

// --- RPC ---

service CustomerService {
  // Получить клиента по ID.
  rpc GetCustomer(GetCustomerRequest) returns (GetCustomerResponse);
  // Список клиентов с пагинацией и фильтрацией.
  rpc ListCustomers(ListCustomersRequest) returns (ListCustomersResponse);
  // Создать клиента.
  rpc CreateCustomer(CreateCustomerRequest) returns (CreateCustomerResponse);
  // Обновить клиента (partial update).
  rpc UpdateCustomer(UpdateCustomerRequest) returns (UpdateCustomerResponse);
  // Полнотекстовый поиск.
  rpc SearchCustomers(SearchCustomersRequest) returns (SearchCustomersResponse);
  // Проверка технической возможности по адресу.
  rpc CheckAvailability(CheckAvailabilityRequest) returns (CheckAvailabilityResponse);
}

message GetCustomerRequest {
  string id = 1 [(buf.validate.field).string.uuid = true];
}

message GetCustomerResponse {
  Customer customer = 1;
}

message ListCustomersRequest {
  common.v1.PaginationRequest pagination = 1;
  string filter = 2; // CEL-выражение или строка фильтра
}

message ListCustomersResponse {
  repeated Customer customers = 1;
  common.v1.PaginationResponse pagination = 2;
}

common/v1/money.proto

syntax = "proto3";
package common.v1;

// Денежная сумма (precise). Не используем float/double для денег!
message Money {
  string amount = 1;      // Decimal as string: "1500.50"
  string currency = 2;    // ISO 4217: "RUB", "USD"
}

common/v1/pagination.proto

syntax = "proto3";
package common.v1;

message PaginationRequest {
  int32 page_size = 1 [(buf.validate.field).int32 = {gte: 1, lte: 100}];
  string page_token = 2; // Cursor-based (opaque token)
}

message PaginationResponse {
  string next_page_token = 1;
  int32 total_count = 2;
}

common/v1/events.proto

syntax = "proto3";
package common.v1;

import "google/protobuf/timestamp.proto";

// Обёртка для ВСЕХ событий в RabbitMQ. Единый конверт.
message EventEnvelope {
  string event_id = 1;            // UUID — ключ идемпотентности (inbox)
  string event_type = 2;          // "billing.payment.received"
  string source = 3;              // "billing-service"
  string subject = 4;             // "account/acc-123"
  int32 schema_version = 5;       // Версия схемы payload (1, 2, ...)
  google.protobuf.Timestamp occurred_at = 6;
  RequestMetadata metadata = 7;
  bytes payload = 8;              // Сериализованный Protobuf конкретного события
}

// Обёртка для ВСЕХ команд в RabbitMQ.
message CommandEnvelope {
  string command_id = 1;          // UUID — ключ идемпотентности
  string command_type = 2;        // "provisioning.activate_service"
  string source = 3;              // "oms-service"
  string target = 4;              // "provisioning-service"
  google.protobuf.Timestamp issued_at = 5;
  RequestMetadata metadata = 6;
  bytes payload = 7;
}

product/v1/product_service.proto

syntax = "proto3";
package product.v1;

import "common/v1/pagination.proto";
import "common/v1/money.proto";
import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";

// --- Агрегат: ProductOffering (read model, не мутируется клиентом) ---

message ProductOffering {
  string id = 1;
  string code = 2;                          // "inet-100-gpon"
  string name = 3;                          // "Интернет 100 Мбит/с (GPON)"
  ProductCategory category = 4;
  CustomerSegment segment = 5;
  common.v1.Money base_price = 6;
  repeated ProductCharacteristic characteristics = 7;
  repeated string availability_zones = 8;   // Коды зон покрытия
  bool is_active = 9;
}

enum ProductCategory {
  PRODUCT_CATEGORY_UNSPECIFIED = 0;
  PRODUCT_CATEGORY_INTERNET = 1;
  PRODUCT_CATEGORY_IPTV = 2;
  PRODUCT_CATEGORY_TELEPHONY = 3;
  PRODUCT_CATEGORY_BUNDLE = 4;
  PRODUCT_CATEGORY_EQUIPMENT_RENTAL = 5;
}

enum CustomerSegment {
  CUSTOMER_SEGMENT_UNSPECIFIED = 0;
  CUSTOMER_SEGMENT_INDIVIDUAL = 1;
  CUSTOMER_SEGMENT_LEGAL_ENTITY = 2;
  CUSTOMER_SEGMENT_ALL = 3;
}

message ProductCharacteristic {
  string name = 1;   // "download_speed_mbps"
  string value = 2;  // "100"
  string unit = 3;   // "Mbps"
}

// --- Агрегат: Subscription (жизненный цикл услуги клиента) ---

message Subscription {
  string id = 1;
  string customer_id = 2;
  string product_offering_id = 3;
  SubscriptionStatus status = 4;
  common.v1.Money effective_price = 5;       // С учётом скидок
  map<string, string> parameters = 6;        // runtime-параметры (ip, vlan, speed)
  google.protobuf.Timestamp activated_at = 7;
  google.protobuf.Timestamp suspended_at = 8;
  google.protobuf.Timestamp terminated_at = 9;
}

enum SubscriptionStatus {
  SUBSCRIPTION_STATUS_UNSPECIFIED = 0;
  SUBSCRIPTION_STATUS_PENDING = 1;
  SUBSCRIPTION_STATUS_ACTIVE = 2;
  SUBSCRIPTION_STATUS_SUSPENDED = 3;
  SUBSCRIPTION_STATUS_TERMINATED = 4;
}

// --- RPC ---

service ProductService {
  rpc GetProduct(GetProductRequest) returns (GetProductResponse);
  rpc ListProducts(ListProductsRequest) returns (ListProductsResponse);
  rpc CreateProduct(CreateProductRequest) returns (CreateProductResponse);

  rpc CreateSubscription(CreateSubscriptionRequest) returns (CreateSubscriptionResponse);
  rpc GetSubscription(GetSubscriptionRequest) returns (GetSubscriptionResponse);
  rpc ChangeSubscription(ChangeSubscriptionRequest) returns (ChangeSubscriptionResponse);
  rpc SuspendSubscription(SuspendSubscriptionRequest) returns (SuspendSubscriptionResponse);
  rpc ResumeSubscription(ResumeSubscriptionRequest) returns (ResumeSubscriptionResponse);
  rpc TerminateSubscription(TerminateSubscriptionRequest) returns (TerminateSubscriptionResponse);
}

message CreateSubscriptionRequest {
  string customer_id = 1 [(buf.validate.field).string.uuid = true];
  string product_offering_id = 2 [(buf.validate.field).string.uuid = true];
  map<string, string> parameters = 3;
  string idempotency_key = 4 [(buf.validate.field).string.uuid = true];
}

message ChangeSubscriptionRequest {
  string subscription_id = 1 [(buf.validate.field).string.uuid = true];
  string new_product_offering_id = 2 [(buf.validate.field).string.uuid = true];
  google.protobuf.Timestamp effective_date = 3; // Когда вступает в силу
  string idempotency_key = 4 [(buf.validate.field).string.uuid = true];
}

billing/v1/billing_service.proto

syntax = "proto3";
package billing.v1;

import "common/v1/pagination.proto";
import "common/v1/money.proto";
import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";

// --- Агрегат: Account (финансовый аккаунт клиента) ---

message Account {
  string id = 1;
  string customer_id = 2;
  common.v1.Money balance = 3;
  AccountStatus status = 4;
  DunningStage dunning_stage = 5;
  string legacy_ls = 6;                     // Legacy лицевой счёт (для миграции)
  google.protobuf.Timestamp created_at = 7;
}

enum AccountStatus {
  ACCOUNT_STATUS_UNSPECIFIED = 0;
  ACCOUNT_STATUS_ACTIVE = 1;
  ACCOUNT_STATUS_SUSPENDED = 2;
  ACCOUNT_STATUS_CLOSED = 3;
}

enum DunningStage {
  DUNNING_STAGE_NONE = 0;
  DUNNING_STAGE_SOFT = 1;     // D+0: уведомление + captive portal
  DUNNING_STAGE_HARD = 2;     // D+3: блокировка (64kbps)
  DUNNING_STAGE_TERMINATE = 3; // D+N: расторжение
}

// --- Value Object: Transaction (double-entry ledger) ---

message Transaction {
  string id = 1;
  string account_id = 2;
  TransactionType type = 3;
  common.v1.Money amount = 4;
  common.v1.Money balance_after = 5;
  string description = 6;
  string external_id = 7;               // ID от внешней системы (банк, 1С)
  google.protobuf.Timestamp created_at = 8;
}

enum TransactionType {
  TRANSACTION_TYPE_UNSPECIFIED = 0;
  TRANSACTION_TYPE_PAYMENT = 1;          // Зачисление
  TRANSACTION_TYPE_CHARGE = 2;           // Списание (рекуррент)
  TRANSACTION_TYPE_REFUND = 3;           // Возврат
  TRANSACTION_TYPE_ADJUSTMENT = 4;       // Ручная корректировка
  TRANSACTION_TYPE_CONNECTION_FEE = 5;   // Разовый платёж за подключение
}

// --- Value Object: Invoice ---

message Invoice {
  string id = 1;
  string account_id = 2;
  string number = 3;                     // "ИСП-2025-000123"
  common.v1.Money total = 4;
  InvoiceStatus status = 5;
  google.protobuf.Timestamp period_start = 6;
  google.protobuf.Timestamp period_end = 7;
  google.protobuf.Timestamp issued_at = 8;
  google.protobuf.Timestamp paid_at = 9;
}

enum InvoiceStatus {
  INVOICE_STATUS_UNSPECIFIED = 0;
  INVOICE_STATUS_DRAFT = 1;
  INVOICE_STATUS_ISSUED = 2;
  INVOICE_STATUS_PAID = 3;
  INVOICE_STATUS_OVERDUE = 4;
  INVOICE_STATUS_CANCELLED = 5;
}

// --- RPC ---

service BillingService {
  rpc GetAccount(GetAccountRequest) returns (GetAccountResponse);
  rpc GetBalance(GetBalanceRequest) returns (GetBalanceResponse);
  rpc ListTransactions(ListTransactionsRequest) returns (ListTransactionsResponse);

  // Мутации (все идемпотентные)
  rpc RegisterPayment(RegisterPaymentRequest) returns (RegisterPaymentResponse);
  rpc ManualAdjust(ManualAdjustRequest) returns (ManualAdjustResponse);
  rpc GenerateInvoice(GenerateInvoiceRequest) returns (GenerateInvoiceResponse);
  rpc GetInvoice(GetInvoiceRequest) returns (GetInvoiceResponse);
}

message RegisterPaymentRequest {
  string account_id = 1 [(buf.validate.field).string.uuid = true];
  common.v1.Money amount = 2;
  string source = 3;                     // "sberbank", "tinkoff", "cash"
  string external_id = 4 [(buf.validate.field).string.min_len = 1];
  string idempotency_key = 5 [(buf.validate.field).string.uuid = true];
}

message ManualAdjustRequest {
  string account_id = 1 [(buf.validate.field).string.uuid = true];
  common.v1.Money amount = 2;           // Может быть отрицательным
  string reason = 3 [(buf.validate.field).string.min_len = 5];
  string idempotency_key = 4 [(buf.validate.field).string.uuid = true];
}

orchestration/v1/order_service.proto

syntax = "proto3";
package orchestration.v1;

import "common/v1/pagination.proto";
import "common/v1/metadata.proto";
import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";

// --- Агрегат: Order (оркестрационный процесс) ---

message Order {
  string id = 1;
  string customer_id = 2;
  OrderType type = 3;
  OrderStatus status = 4;
  OrderPriority priority = 5;
  string title = 6;
  string description = 7;
  repeated OrderStep steps = 8;
  map<string, string> context = 9;       // Данные для шагов (subscription_id, address, etc.)
  int32 legacy_id = 10;                  // Маппинг из Legacy CRM
  google.protobuf.Timestamp created_at = 11;
  google.protobuf.Timestamp updated_at = 12;
  google.protobuf.Timestamp deadline = 13;
}

enum OrderType {
  ORDER_TYPE_UNSPECIFIED = 0;
  ORDER_TYPE_NEW_CONNECTION = 1;
  ORDER_TYPE_TARIFF_CHANGE = 2;
  ORDER_TYPE_DISCONNECTION = 3;
  ORDER_TYPE_EQUIPMENT_REPLACEMENT = 4;
  ORDER_TYPE_REPAIR = 5;
  ORDER_TYPE_SUSPENSION = 6;
  ORDER_TYPE_RESUMPTION = 7;
}

enum OrderStatus {
  ORDER_STATUS_UNSPECIFIED = 0;
  ORDER_STATUS_DRAFT = 1;
  ORDER_STATUS_SUBMITTED = 2;
  ORDER_STATUS_IN_PROGRESS = 3;
  ORDER_STATUS_WAITING_EXTERNAL = 4;
  ORDER_STATUS_COMPENSATING = 5;
  ORDER_STATUS_COMPLETED = 6;
  ORDER_STATUS_FAILED = 7;
  ORDER_STATUS_CANCELLED = 8;
}

enum OrderPriority {
  ORDER_PRIORITY_UNSPECIFIED = 0;
  ORDER_PRIORITY_LOW = 1;
  ORDER_PRIORITY_MEDIUM = 2;
  ORDER_PRIORITY_HIGH = 3;
  ORDER_PRIORITY_CRITICAL = 4;
}

// Шаг Saga-оркестрации
message OrderStep {
  string name = 1;                       // "reserve_port", "create_profile", "activate"
  StepStatus status = 2;
  string error_message = 3;
  int32 retry_count = 4;
  google.protobuf.Timestamp started_at = 5;
  google.protobuf.Timestamp completed_at = 6;
}

enum StepStatus {
  STEP_STATUS_UNSPECIFIED = 0;
  STEP_STATUS_PENDING = 1;
  STEP_STATUS_RUNNING = 2;
  STEP_STATUS_COMPLETED = 3;
  STEP_STATUS_FAILED = 4;
  STEP_STATUS_COMPENSATED = 5;
  STEP_STATUS_SKIPPED = 6;
}

// --- RPC ---

service OrderService {
  rpc CreateOrder(CreateOrderRequest) returns (CreateOrderResponse);
  rpc GetOrder(GetOrderRequest) returns (GetOrderResponse);
  rpc ListOrders(ListOrdersRequest) returns (ListOrdersResponse);
  rpc CancelOrder(CancelOrderRequest) returns (CancelOrderResponse);
  rpc RetryOrder(RetryOrderRequest) returns (RetryOrderResponse);
}

message CreateOrderRequest {
  OrderType type = 1;
  string customer_id = 2 [(buf.validate.field).string.uuid = true];
  string title = 3;
  map<string, string> context = 4;
  OrderPriority priority = 5;
  string idempotency_key = 6 [(buf.validate.field).string.uuid = true];
}

inventory/v1/inventory_service.proto

syntax = "proto3";
package inventory.v1;

import "common/v1/pagination.proto";
import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";

// --- Агрегат: Device ---

message Device {
  string id = 1;
  string hostname = 2;
  string management_ip = 3;
  DeviceType type = 4;
  string vendor = 5;                     // "huawei", "mikrotik", "eltex", "cisco"
  string model = 6;
  string firmware_version = 7;
  DeviceStatus status = 8;
  string location = 9;                   // Физическое расположение
  repeated Port ports = 10;
}

enum DeviceType {
  DEVICE_TYPE_UNSPECIFIED = 0;
  DEVICE_TYPE_OLT = 1;
  DEVICE_TYPE_SWITCH = 2;
  DEVICE_TYPE_ROUTER = 3;
  DEVICE_TYPE_BRAS = 4;
  DEVICE_TYPE_CPE = 5;
  DEVICE_TYPE_SPLITTER = 6;
}

enum DeviceStatus {
  DEVICE_STATUS_UNSPECIFIED = 0;
  DEVICE_STATUS_ACTIVE = 1;
  DEVICE_STATUS_MAINTENANCE = 2;
  DEVICE_STATUS_DECOMMISSIONED = 3;
}

// --- Value Object: Port ---

message Port {
  string id = 1;
  string device_id = 2;
  string number = 3;                     // "0/1/3"
  PortStatus status = 4;
  string subscriber_id = 5;             // Кому выделен (если occupied)
  string subscription_id = 6;
  google.protobuf.Timestamp reserved_until = 7; // TTL резерва
}

enum PortStatus {
  PORT_STATUS_UNSPECIFIED = 0;
  PORT_STATUS_FREE = 1;
  PORT_STATUS_RESERVED = 2;
  PORT_STATUS_OCCUPIED = 3;
  PORT_STATUS_FAULTY = 4;
}

// --- Value Object: LogicalResource ---

message LogicalResource {
  string id = 1;
  ResourceType type = 2;
  string value = 3;                      // "192.168.1.100", "AA:BB:CC:DD:EE:FF", "100"
  string pool_id = 4;
  string subscription_id = 5;
}

enum ResourceType {
  RESOURCE_TYPE_UNSPECIFIED = 0;
  RESOURCE_TYPE_IPV4 = 1;
  RESOURCE_TYPE_IPV6 = 2;
  RESOURCE_TYPE_VLAN = 3;
  RESOURCE_TYPE_MAC = 4;
}

// --- RPC ---

service InventoryService {
  rpc ListDevices(ListDevicesRequest) returns (ListDevicesResponse);
  rpc GetDevice(GetDeviceRequest) returns (GetDeviceResponse);
  rpc GetPortStatus(GetPortStatusRequest) returns (GetPortStatusResponse);

  // Resource management
  rpc CheckAvailability(CheckAvailabilityRequest) returns (CheckAvailabilityResponse);
  rpc ReservePort(ReservePortRequest) returns (ReservePortResponse);
  rpc ReleasePort(ReleasePortRequest) returns (ReleasePortResponse);
  rpc BindServiceInstance(BindServiceInstanceRequest) returns (BindServiceInstanceResponse);
  rpc AllocateIP(AllocateIPRequest) returns (AllocateIPResponse);
  rpc ReleaseIP(ReleaseIPRequest) returns (ReleaseIPResponse);
}

message CheckAvailabilityRequest {
  string address_id = 1;                // Ссылка на адрес из Customer Core
  string product_offering_id = 2;       // Для проверки технических требований
}

message CheckAvailabilityResponse {
  bool available = 1;
  string device_id = 2;
  string port_id = 3;
  string reason = 4;                    // Если unavailable — причина
}

message ReservePortRequest {
  string port_id = 1 [(buf.validate.field).string.uuid = true];
  string order_id = 2 [(buf.validate.field).string.uuid = true];
  int32 ttl_hours = 3;                  // Через сколько часов автоосвобождение
  string idempotency_key = 4 [(buf.validate.field).string.uuid = true];
}

provisioning/v1/provisioning_service.proto

syntax = "proto3";
package provisioning.v1;

import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";

// --- Агрегат: ProvisioningTask ---

message ProvisioningTask {
  string id = 1;
  string order_id = 2;
  ProvisioningAction action = 3;
  ProvisioningStatus status = 4;
  string device_id = 5;
  string port_id = 6;
  map<string, string> parameters = 7;   // vendor-specific params
  int32 retry_count = 8;
  string error_message = 9;
  google.protobuf.Timestamp created_at = 10;
  google.protobuf.Timestamp completed_at = 11;
}

enum ProvisioningAction {
  PROVISIONING_ACTION_UNSPECIFIED = 0;
  PROVISIONING_ACTION_ACTIVATE = 1;
  PROVISIONING_ACTION_DEACTIVATE = 2;
  PROVISIONING_ACTION_CHANGE_SPEED = 3;
  PROVISIONING_ACTION_SUSPEND = 4;
  PROVISIONING_ACTION_RESUME = 5;
  PROVISIONING_ACTION_REPLACE_DEVICE = 6;
}

enum ProvisioningStatus {
  PROVISIONING_STATUS_UNSPECIFIED = 0;
  PROVISIONING_STATUS_RECEIVED = 1;
  PROVISIONING_STATUS_VALIDATING = 2;
  PROVISIONING_STATUS_EXECUTING = 3;
  PROVISIONING_STATUS_VERIFYING = 4;
  PROVISIONING_STATUS_COMPLETED = 5;
  PROVISIONING_STATUS_FAILED = 6;
  PROVISIONING_STATUS_ROLLING_BACK = 7;
}

// --- RPC (минимальный, т.к. основной вход через RabbitMQ) ---

service ProvisioningService {
  rpc GetTaskStatus(GetTaskStatusRequest) returns (GetTaskStatusResponse);
  rpc RetryTask(RetryTaskRequest) returns (RetryTaskResponse);
  rpc ListTasks(ListTasksRequest) returns (ListTasksResponse);
}

aaa/v1/aaa_service.proto

syntax = "proto3";
package aaa.v1;

import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";

// --- Агрегат: RadiusProfile ---

message RadiusProfile {
  string id = 1;
  string subscription_id = 2;
  string username = 3;
  AuthType auth_type = 4;
  repeated RadiusAttribute check_attrs = 5;  // radcheck
  repeated RadiusAttribute reply_attrs = 6;  // radreply
  google.protobuf.Timestamp created_at = 7;
  google.protobuf.Timestamp updated_at = 8;
}

enum AuthType {
  AUTH_TYPE_UNSPECIFIED = 0;
  AUTH_TYPE_PPPOE = 1;
  AUTH_TYPE_IPOE = 2;
  AUTH_TYPE_HOTSPOT = 3;
}

message RadiusAttribute {
  string name = 1;    // "Mikrotik-Rate-Limit", "Framed-IP-Address"
  string value = 2;   // "100M/100M", "192.168.1.100"
  string op = 3;      // ":=", "==", "+="
}

// --- Value Object: Session ---

message Session {
  string id = 1;                         // acct_session_id
  string username = 2;
  string nas_ip = 3;
  string framed_ip = 4;
  string mac_address = 5;
  string nas_port_id = 6;
  google.protobuf.Timestamp started_at = 7;
  int64 bytes_in = 8;
  int64 bytes_out = 9;
}

// --- RPC ---

service AAAService {
  rpc CreateProfile(CreateProfileRequest) returns (CreateProfileResponse);
  rpc UpdateProfile(UpdateProfileRequest) returns (UpdateProfileResponse);
  rpc DeleteProfile(DeleteProfileRequest) returns (DeleteProfileResponse);
  rpc SendCoA(SendCoARequest) returns (SendCoAResponse);
  rpc DisconnectSession(DisconnectSessionRequest) returns (DisconnectSessionResponse);
  rpc ListActiveSessions(ListActiveSessionsRequest) returns (ListActiveSessionsResponse);
  rpc GetSessionHistory(GetSessionHistoryRequest) returns (GetSessionHistoryResponse);
}

message SendCoARequest {
  string username = 1;
  string nas_ip = 2;
  repeated RadiusAttribute attributes = 3; // Новые атрибуты для CoA
}

notification/v1/notification_service.proto

syntax = "proto3";
package notification.v1;

import "buf/validate/validate.proto";
import "google/protobuf/timestamp.proto";

message NotificationRequest {
  string customer_id = 1 [(buf.validate.field).string.uuid = true];
  Channel channel = 2;
  string template_id = 3;
  map<string, string> variables = 4;     // Переменные для шаблона
  Priority priority = 5;
  string idempotency_key = 6 [(buf.validate.field).string.uuid = true];
}

enum Channel {
  CHANNEL_UNSPECIFIED = 0;
  CHANNEL_SMS = 1;
  CHANNEL_EMAIL = 2;
  CHANNEL_PUSH = 3;
  CHANNEL_TELEGRAM = 4;
  CHANNEL_WEBHOOK = 5;
}

enum Priority {
  PRIORITY_UNSPECIFIED = 0;
  PRIORITY_LOW = 1;        // Batch (1 раз в час)
  PRIORITY_NORMAL = 2;     // < 5 мин
  PRIORITY_HIGH = 3;       // < 1 мин
  PRIORITY_CRITICAL = 4;   // Мгновенно
}

service NotificationService {
  rpc Send(NotificationRequest) returns (NotificationResponse);
  rpc ListTemplates(ListTemplatesRequest) returns (ListTemplatesResponse);
  rpc GetDeliveryStatus(GetDeliveryStatusRequest) returns (GetDeliveryStatusResponse);
}

Синхронные вызовы: ConnectRPC

Для service-to-service и BFF-to-service используем ConnectRPC — современный RPC-фреймворк поверх Protobuf.

Почему ConnectRPC, а не чистый gRPC

КритерийConnectRPCgRPC
HTTP-протоколHTTP/1.1 + HTTP/2 + HTTP/3Только HTTP/2
БраузерыНативная поддержка (без прокси)Нужен grpc-web прокси
JSON fallbackАвтоматический (curl-friendly)Только бинарный
Совместимость с gRPCПолная (тот же .proto, те же клиенты)
StreamingПоддержка (server/client/bidi)Поддержка
InterceptorsGo-идиоматичныеТяжелый middleware API

Архитектура вызовов

Loading diagram...

Принцип:

  • Frontend → BFF: ConnectRPC через HTTP/1.1 + JSON (curl-friendly, легко дебажить).
  • BFF → Microservice: ConnectRPC через HTTP/2 + Protobuf binary (максимальная производительность).
  • Microservice → Microservice: ConnectRPC через HTTP/2 + Protobuf binary.

Карта RPC-сервисов

Protobuf ServiceВладелецКлючевые RPC
CustomerServiceCustomer CoreGetCustomer, ListCustomers, CreateCustomer, CheckAvailability
ProductServiceProduct & SubscriptionGetProduct, ListProducts, CreateSubscription, ChangeSubscription
BillingServiceBilling & FinanceGetAccount, GetBalance, ListTransactions, RegisterPayment
InventoryServiceNetwork InventoryListDevices, GetDevicePorts, ReservePort, ReleasePort, AllocateIP
ProvisioningServiceProvisioningGetProvisioningStatus, RetryProvisioning
OrderServiceOMSCreateOrder, GetOrder, CancelOrder
NotificationServiceNotificationSendNotification, ListTemplates

Валидация (protovalidate)

Валидация полей описывается в .proto, генерируется автоматически и выполняется на сервере через interceptor:

message RegisterPaymentRequest {
  string account_id = 1 [(buf.validate.field).string.uuid = true];
  string amount = 2 [(buf.validate.field).string.min_len = 1];
  string currency = 3 [(buf.validate.field).string = {in: ["RUB", "USD", "EUR"]}];
  string external_id = 4 [(buf.validate.field).string.min_len = 1];
  string idempotency_key = 5 [(buf.validate.field).string.uuid = true];
}

Асинхронный обмен: RabbitMQ

Все межсервисные команды и события передаются через RabbitMQ (Quorum Queues, Topic Exchanges).

Почему RabbitMQ

КритерийRabbitMQ 4.xKafka
МодельSmart broker, flexible routingDumb broker, smart consumer
RoutingTopic/Direct/Fanout/Headers exchangesТолько партиции
Приоритеты сообщенийНативная поддержкаНет
Dead LetterDLX + DLQ из коробкиРучная реализация
TTL сообщенийPer-message и per-queueТолько retention
Quorum QueuesRaft-based, durable, replicated
StreamsAppend-only log (как Kafka)Основная модель
Операционная сложностьПроще (один кластер)ZooKeeper/KRaft + больше tunning
Масштаб нашей задачиИдеален для ISP (~100K абонентов)Overkill для нашего масштаба

Топология RabbitMQ

Loading diagram...

Карта Exchanges и Queues

ExchangeТипRouting Key PatternПродюсерQuorum Queue (пример)
customer.eventsTopiccustomer.created, contract.signedCustomer Corebilling.q.customer.events
product.eventsTopicsubscription.activated, subscription.suspendedProduct & Subprovisioning.q.product.events
billing.eventsTopicpayment.received, balance.updated, access.blockBillingoms.q.billing.events
provisioning.eventsTopicprovisioning.success, provisioning.failedProvisioningoms.q.provisioning.events
provisioning.commandsDirectservice.activate, access.suspend, access.resumeOMS, Billingprovisioning.q.commands
billing.commandsDirectcharge.account, refundOMSbilling.q.commands
notification.commandsDirectsend.sms, send.emailВсе доменыnotification.q.commands
mediation.eventsTopicusage.reportedMediationbilling.q.mediation.events
fsm.eventsTopicwork.completed, work.failedFSMoms.q.fsm.events

Конфигурация очередей

Все production-очереди — Quorum Queues (Raft-based, replicated, durable):

# Пример декларации через RabbitMQ definitions или Terraform
queues:
  provisioning.q.commands:
    type: quorum
    arguments:
      x-delivery-limit: 5 # Макс. ретраев перед DLQ
      x-dead-letter-exchange: dlx # Dead Letter Exchange
      x-dead-letter-routing-key: provisioning.q.commands.dlq
      x-queue-leader-locator: balanced

  provisioning.q.commands.dlq:
    type: quorum
    arguments:
      x-message-ttl: 604800000 # 7 дней хранения в DLQ

Формат сообщений (Protobuf EventEnvelope)

Все сообщения в RabbitMQ сериализуются в Protobuf через EventEnvelope:

// billing/v1/billing_events.proto
message PaymentReceivedEvent {
  string account_id = 1;
  string amount = 2;       // Decimal as string (точность!)
  string currency = 3;
  string payment_id = 4;
  string source = 5;        // "sberbank", "tinkoff", "cash"
  string external_id = 6;   // ID от платёжного шлюза
}

message AccessBlockEvent {
  string account_id = 1;
  string reason = 2;        // "finance", "admin", "fraud"
  string subscription_id = 3;
}

Публикация:

Exchange: billing.events
Routing Key: payment.received
Content-Type: application/protobuf
Headers:
  x-event-id: evt-2025-001-abc       (UUID, для дедупликации)
  x-correlation-id: order-2025-001
  x-causation-id: webhook-bank-xyz
  x-source: billing-service
  x-timestamp: 2025-01-15T12:00:00Z
Body: <serialized PaymentReceivedEvent>

Гарантии доставки

  • Publisher Confirms: Каждый продюсер использует publisher confirms для гарантии записи в RabbitMQ.
  • Quorum Queues: Raft-репликация. Сообщение считается принятым после записи на majority нод.
  • Manual Ack: Консьюмеры используют manual acknowledgment. Сообщение подтверждается после успешной обработки + записи в inbox.
  • Dead Letter (DLX): После x-delivery-limit ретраев сообщение переходит в DLQ. Алерт в мониторинг. Ручной разбор через UI.
  • Idempotency: Inbox-паттерн на стороне каждого консьюмера (по x-event-id).

Паттерны интеграции

Transactional Outbox Pattern

Гарантирует атомарную публикацию события после коммита в БД. Без outbox — при падении между коммитом и publish сообщение теряется.

Loading diagram...

Таблица outbox:

CREATE TABLE outbox (
    id           BIGSERIAL PRIMARY KEY,
    event_id     UUID NOT NULL DEFAULT gen_random_uuid(),
    event_type   TEXT NOT NULL,            -- 'payment.received'
    routing_key  TEXT NOT NULL,            -- routing key для exchange
    exchange     TEXT NOT NULL,            -- 'billing.events'
    payload      BYTEA NOT NULL,           -- Serialized Protobuf
    headers      JSONB NOT NULL DEFAULT '{}',
    status       TEXT NOT NULL DEFAULT 'pending', -- pending → published
    created_at   TIMESTAMPTZ NOT NULL DEFAULT now(),
    published_at TIMESTAMPTZ
);

CREATE INDEX idx_outbox_pending ON outbox (status, id) WHERE status = 'pending';

Inbox Pattern (Дедупликация)

Защищает от повторной обработки при at-least-once доставке:

CREATE TABLE inbox (
    event_id     UUID PRIMARY KEY,
    event_type   TEXT NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
1. Consumer получает message
2. Извлекает event_id из header x-event-id
3. BEGIN TX
4.   INSERT INTO inbox (event_id, event_type) ON CONFLICT DO NOTHING
5.   Если inserted — обрабатывает бизнес-логику
6.   Если conflict — SKIP (уже обработано)
7. COMMIT
8. ACK message

Saga Pattern (Orchestration)

Для длительных процессов, требующих компенсации при ошибке. OMS выступает оркестратором:

Loading diagram...

Каждый шаг = RabbitMQ command → ожидание event. OMS хранит состояние саги в БД и реагирует на входящие события.


Идемпотентность

Все мутирующие RPC поддерживают поле idempotency_key в запросе:

message RegisterPaymentRequest {
  string account_id = 1;
  string amount = 2;
  string currency = 3;
  string external_id = 4;
  // Ключ идемпотентности. Повторный вызов с тем же ключом
  // возвращает тот же результат, не создавая дубликат.
  string idempotency_key = 5 [(buf.validate.field).string.uuid = true];
}

Правила:

  • Повторный вызов с тем же idempotency_key возвращает закэшированный результат.
  • Ключ хранится 24 часа (настраивается).
  • Для webhook'ов от банков: idempotency_key = external_payment_id.

Rate Limiting и Throttling

Тип клиентаЛимитBurst
Internal Service10,000 req/min500
Operator (CRM)1,000 req/min100
Customer Portal (ЛК)300 req/min50
External Partner100 req/min20

Rate limiting реализуется на уровне Kubernetes NGINX Ingress (annotations nginx.ingress.kubernetes.io/limit-rps) + ConnectRPC interceptor. При превышении возвращается gRPC status RESOURCE_EXHAUSTED.


Полный каталог Domain Events (Protobuf)

Все доменные события типизированы в Protobuf. Payload сериализуется в EventEnvelope.payload.

customer/v1/events.proto

syntax = "proto3";
package customer.v1;

import "google/protobuf/timestamp.proto";

message CustomerCreatedEvent {
  string customer_id = 1;
  string full_name = 2;
  string type = 3;             // "individual" / "legal_entity"
}

message CustomerUpdatedEvent {
  string customer_id = 1;
  repeated string changed_fields = 2; // ["full_name", "inn"]
}

message ContractSignedEvent {
  string contract_id = 1;
  string customer_id = 2;
  string contract_number = 3;
  google.protobuf.Timestamp signed_at = 4;
}

message ContractTerminatedEvent {
  string contract_id = 1;
  string customer_id = 2;
  string reason = 3;
}

product/v1/events.proto

syntax = "proto3";
package product.v1;

import "google/protobuf/timestamp.proto";

message SubscriptionActivatedEvent {
  string subscription_id = 1;
  string customer_id = 2;
  string product_offering_id = 3;
  map<string, string> parameters = 4; // speed, ip, vlan...
}

message SubscriptionSuspendedEvent {
  string subscription_id = 1;
  string reason = 2;           // "dunning", "user_request", "admin"
}

message SubscriptionResumedEvent {
  string subscription_id = 1;
  string reason = 2;           // "payment", "manual"
}

message SubscriptionTerminatedEvent {
  string subscription_id = 1;
  string reason = 2;
}

message TariffChangedEvent {
  string subscription_id = 1;
  string old_product_id = 2;
  string new_product_id = 3;
  google.protobuf.Timestamp effective_at = 4;
}

billing/v1/events.proto

syntax = "proto3";
package billing.v1;

import "common/v1/money.proto";
import "google/protobuf/timestamp.proto";

message PaymentReceivedEvent {
  string account_id = 1;
  string payment_id = 2;
  common.v1.Money amount = 3;
  string source = 4;           // "sberbank", "tinkoff", "cash"
  string external_id = 5;
}

message BalanceNegativeEvent {
  string account_id = 1;
  common.v1.Money balance = 2;
  string customer_id = 3;
}

message DunningStageChangedEvent {
  string account_id = 1;
  string customer_id = 2;
  string old_stage = 3;        // "none", "soft", "hard"
  string new_stage = 4;
}

message InvoiceIssuedEvent {
  string invoice_id = 1;
  string account_id = 2;
  common.v1.Money total = 3;
}

message ChargeCompletedEvent {
  string account_id = 1;
  string subscription_id = 2;
  common.v1.Money amount = 3;
  string period = 4;           // "2025-02"
}

provisioning/v1/events.proto

syntax = "proto3";
package provisioning.v1;

message ProvisioningSuccessEvent {
  string task_id = 1;
  string order_id = 2;
  string subscription_id = 3;
  string action = 4;           // "activate", "suspend", "change_speed"
}

message ProvisioningFailedEvent {
  string task_id = 1;
  string order_id = 2;
  string subscription_id = 3;
  string action = 4;
  string error_code = 5;
  string error_message = 6;
  bool is_retryable = 7;
}

message ProvisioningRollbackEvent {
  string task_id = 1;
  string order_id = 2;
  string action = 3;
  bool success = 4;
}

inventory/v1/events.proto

syntax = "proto3";
package inventory.v1;

import "google/protobuf/timestamp.proto";

message PortReservedEvent {
  string port_id = 1;
  string device_id = 2;
  string order_id = 3;
  google.protobuf.Timestamp reserved_until = 4;
}

message PortReleasedEvent {
  string port_id = 1;
  string reason = 2;           // "order_cancelled", "ttl_expired", "manual"
}

message ResourceExhaustedEvent {
  string device_id = 1;
  string resource_type = 2;    // "port", "ip", "vlan"
  string location = 3;
}

message DeviceStatusChangedEvent {
  string device_id = 1;
  string old_status = 2;
  string new_status = 3;
}

Полный каталог Command Messages (Protobuf)

Команды — это намерения (intent), которые один сервис отправляет другому через RabbitMQ Direct Exchange.

provisioning/v1/commands.proto

syntax = "proto3";
package provisioning.v1;

// Команда: активировать услугу на оборудовании
message ActivateServiceCommand {
  string order_id = 1;
  string subscription_id = 2;
  string device_id = 3;
  string port_id = 4;
  map<string, string> parameters = 5; // speed, vlan, ip, ont_sn...
}

// Команда: приостановить доступ (dunning / admin)
message SuspendAccessCommand {
  string subscription_id = 1;
  string reason = 2;
  string redirect_url = 3;     // URL captive portal (для soft block)
}

// Команда: возобновить доступ (после оплаты)
message ResumeAccessCommand {
  string subscription_id = 1;
  string reason = 2;
}

// Команда: изменить скорость (смена тарифа)
message ChangeSpeedCommand {
  string subscription_id = 1;
  int32 download_mbps = 2;
  int32 upload_mbps = 3;
}

// Команда: деактивировать (расторжение)
message DeactivateServiceCommand {
  string subscription_id = 1;
  string order_id = 2;
}

billing/v1/commands.proto

syntax = "proto3";
package billing.v1;

import "common/v1/money.proto";

// Команда: списать средства (рекуррент / разовый)
message ChargeAccountCommand {
  string account_id = 1;
  common.v1.Money amount = 2;
  string reason = 3;           // "monthly_fee", "connection_fee"
  string subscription_id = 4;
  string idempotency_key = 5;
}

// Команда: возврат средств (компенсация Saga)
message RefundCommand {
  string account_id = 1;
  common.v1.Money amount = 2;
  string reason = 3;
  string original_transaction_id = 4;
  string idempotency_key = 5;
}

notification/v1/commands.proto

syntax = "proto3";
package notification.v1;

// Команда: отправить уведомление клиенту
message SendNotificationCommand {
  string customer_id = 1;
  string template_id = 2;
  string channel = 3;          // "sms", "email", "push", "telegram"
  map<string, string> variables = 4;
  string priority = 5;         // "critical", "high", "normal", "low"
  string idempotency_key = 6;
}

Полная матрица Exchange → Queue → Consumer

Подробная привязка каждого exchange к каждому consumer через конкретные queues и routing keys:

Event Exchanges (Topic)

ExchangeRouting KeyQueueConsumerНазначение
customer.eventscustomer.createdbilling.q.customer-createdBillingСоздать Account для нового клиента
customer.eventscustomer.creatednotification.q.customer-createdNotificationWelcome-уведомление
customer.eventscontract.signedoms.q.contract-signedOMSИнициировать процесс подключения
customer.eventscontract.terminatedbilling.q.contract-terminatedBillingФинальный счёт + закрытие
product.eventssubscription.activatedprovisioning.q.subscription-activatedProvisioningПровижининг новой услуги
product.eventssubscription.activatedbilling.q.subscription-activatedBillingНачало тарификации
product.eventssubscription.suspendedprovisioning.q.subscription-suspendedProvisioningБлокировка на оборудовании
product.eventssubscription.terminatedprovisioning.q.subscription-terminatedProvisioningДеактивация
product.eventssubscription.terminatedbilling.q.subscription-terminatedBillingСтоп-тарификация
product.eventssubscription.tariff_changedprovisioning.q.tariff-changedProvisioningCoA (смена скорости)
product.eventssubscription.tariff_changedbilling.q.tariff-changedBillingПерерасчёт
billing.eventspayment.receivedoms.q.payment-receivedOMSПроверка: ожидает ли процесс оплату
billing.eventspayment.receivednotification.q.payment-receivedNotificationSMS "Оплата зачислена"
billing.eventsbalance.negativenotification.q.balance-negativeNotificationSMS "Пополните баланс"
billing.eventsdunning.stage_changedprovisioning.q.dunningProvisioningSuspend/Resume по dunning
billing.eventsdunning.stage_changednotification.q.dunningNotification"Доступ ограничен"
provisioning.eventsprovisioning.successoms.q.provisioning-resultOMSSaga: шаг завершён
provisioning.eventsprovisioning.failedoms.q.provisioning-resultOMSSaga: шаг провален → компенсация
inventory.eventsresource.exhaustednotification.q.resource-alertNotificationАлерт операторам
mediation.eventsusage.reportedbilling.q.usageBillingUsage-based тарификация
fsm.eventswork.completedoms.q.fsm-resultOMSМонтаж завершён → продолжить Saga
fsm.eventswork.failedoms.q.fsm-resultOMSМонтаж провален → компенсация

Command Exchanges (Direct)

ExchangeRouting KeyQueueConsumerНазначение
provisioning.commandsservice.activateprovisioning.q.commandsProvisioningАктивация на оборудовании
provisioning.commandsservice.deactivateprovisioning.q.commandsProvisioningДеактивация
provisioning.commandsaccess.suspendprovisioning.q.commandsProvisioningБлокировка (dunning/admin)
provisioning.commandsaccess.resumeprovisioning.q.commandsProvisioningРазблокировка
provisioning.commandsspeed.changeprovisioning.q.commandsProvisioningCoA (смена скорости)
billing.commandscharge.accountbilling.q.commandsBillingСписание (OMS Saga)
billing.commandsrefundbilling.q.commandsBillingВозврат (Saga compensation)
notification.commandssend.smsnotification.q.smsNotificationОтправка SMS
notification.commandssend.emailnotification.q.emailNotificationОтправка Email
notification.commandssend.pushnotification.q.pushNotificationОтправка Push
notification.commandssend.telegramnotification.q.telegramNotificationОтправка в Telegram

Background Jobs: river (Go)

river — PostgreSQL-based фоновый обработчик задач для Go. Аналог BullMQ (Node.js), но использует ту же БД что и приложение. Транзакционная постановка задач — задача гарантированно создаётся вместе с бизнес-данными (атомарно).

Почему river

Критерийriver (Go + PostgreSQL)BullMQ (Node.js + Redis)Temporal
ТранзакционностьЗадача в той же TX что и бизнес-данныеОтдельный Redis, нет TXОтдельный кластер
ИнфраструктураPostgreSQL (уже есть)Redis (доп. компонент)Temporal Server (тяжёлый)
Cron/ScheduledВстроенный PeriodicJobsВстроенный repeatableВстроенный schedule
RetryConfigurable per-jobConfigurableConfigurable
ПриоритетыВстроенные queue prioritiesВстроенные
UIRiver UI (веб-панель)Bull BoardTemporal UI
ЯзыкGo-nativeTypeScript/Node.jsЛюбой (через SDK)

Когда river, когда RabbitMQ

ЗадачаriverRabbitMQ
Генерация PDF-счёта
Рекуррентное списание (cron)
Очистка expired резервов
Reconciliation (сверка)
Отправка email/SMS✅ (notification.commands)
Provisioning команды✅ (provisioning.commands)
Domain events (межсервисные)✅ (topic exchanges)
Saga-оркестрация✅ (OMS ↔ services)

Правило: river = внутрисервисные фоновые задачи. RabbitMQ = межсервисная коммуникация.

Пример: рекуррентный биллинг через river

// internal/billing/jobs/recurring_charge.go
package jobs

import (
    "context"
    "github.com/riverqueue/river"
)

// Args — аргументы задачи (сериализуются в JSON в PostgreSQL)
type RecurringChargeArgs struct {
    AccountID      string `json:"account_id"`
    SubscriptionID string `json:"subscription_id"`
    Period         string `json:"period"` // "2025-02"
}

func (RecurringChargeArgs) Kind() string { return "billing.recurring_charge" }

// Worker — обработчик задачи
type RecurringChargeWorker struct {
    river.WorkerDefaults[RecurringChargeArgs]
    billingService *BillingService
}

func (w *RecurringChargeWorker) Work(ctx context.Context, job *river.Job[RecurringChargeArgs]) error {
    return w.billingService.ChargeRecurring(ctx, job.Args.AccountID, job.Args.SubscriptionID, job.Args.Period)
}

Пример: cron-задачи через river PeriodicJobs

// internal/billing/app.go
func setupRiver(pool *pgxpool.Pool) *river.Client[pgx.Tx] {
    workers := river.NewWorkers()
    river.AddWorker(workers, &RecurringChargeWorker{billingService: svc})
    river.AddWorker(workers, &InvoiceGeneratorWorker{billingService: svc})
    river.AddWorker(workers, &DunningCheckWorker{billingService: svc})

    client, _ := river.NewClient(riverpgxv5.New(pool), &river.Config{
        Queues: map[string]river.QueueConfig{
            river.QueueDefault:   {MaxWorkers: 10},
            "billing_critical":   {MaxWorkers: 5},
            "billing_background": {MaxWorkers: 3},
        },
        PeriodicJobs: []*river.PeriodicJob{
            // Каждую ночь в 03:00 — рекуррентные списания
            river.NewPeriodicJob(
                river.PeriodicInterval(24*time.Hour),
                func() (river.JobArgs, *river.InsertOpts) {
                    return &BatchRecurringChargeArgs{Date: time.Now().Format("2006-01-02")},
                        &river.InsertOpts{Queue: "billing_critical"}
                },
                &river.PeriodicJobOpts{RunOnStart: false},
            ),
            // Каждые 4 часа — проверка dunning
            river.NewPeriodicJob(
                river.PeriodicInterval(4*time.Hour),
                func() (river.JobArgs, *river.InsertOpts) {
                    return &DunningCheckArgs{}, &river.InsertOpts{Queue: "billing_background"}
                },
                nil,
            ),
            // Каждый час — очистка expired port reservations
            river.NewPeriodicJob(
                river.PeriodicInterval(1*time.Hour),
                func() (river.JobArgs, *river.InsertOpts) {
                    return &CleanExpiredReservationsArgs{}, nil
                },
                nil,
            ),
        },
    })
    return client
}

Транзакционная постановка задачи (ключевое преимущество)

// Задача создаётся В ТОЙ ЖЕ транзакции что и бизнес-данные.
// Если TX откатится — задача тоже исчезнет. Нет "потерянных" задач.
func (s *BillingService) RegisterPayment(ctx context.Context, req *RegisterPaymentRequest) error {
    tx, _ := s.pool.Begin(ctx)
    defer tx.Rollback(ctx)

    // 1. Бизнес-логика
    _ = s.repo.CreditAccount(ctx, tx, req.AccountID, req.Amount)

    // 2. Создать задачу на генерацию квитанции (в той же TX!)
    _, _ = s.riverClient.InsertTx(ctx, tx, &GenerateReceiptArgs{
        AccountID: req.AccountID,
        PaymentID: paymentID,
    }, nil)

    // 3. Создать outbox-запись для domain event (в той же TX!)
    _ = s.outbox.Insert(ctx, tx, "billing.events", "payment.received", payloadBytes)

    return tx.Commit(ctx)
}

Event Bus: Watermill (абстракция над RabbitMQ)

Watermill — Go-библиотека для event-driven приложений. Абстрагирует pub/sub от конкретного брокера (RabbitMQ, Kafka, PostgreSQL). Позволяет легко переключиться между брокерами и добавляет middleware.

Зачем Watermill поверх amqp091-go

Без WatermillС Watermill
Ручное управление connections, channels, reconnectАвтоматический reconnect + health check
Ручная сериализация/десериализацияMiddleware: marshal/unmarshal
Ручной retry, DLQ, loggingMiddleware: retry, poison queue, throttle, correlation
Код привязан к RabbitMQКод привязан к интерфейсу Publisher/Subscriber

Пример: Publisher (Billing → RabbitMQ)

// internal/billing/infra/publisher.go
package infra

import (
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
)

func NewEventPublisher(amqpURI string) (*amqp.Publisher, error) {
    config := amqp.NewDurableQueueConfig(amqpURI)

    // Кастомизация: Topic Exchange + Publisher Confirms
    config.Exchange = amqp.ExchangeConfig{
        GenerateName: func(topic string) string { return "billing.events" },
        Type:         "topic",
        Durable:      true,
    }
    config.Publish.GenerateRoutingKey = func(topic string) string { return topic }
    config.Publish.ConfirmDelivery = true

    return amqp.NewPublisher(config, watermill.NewSlogLogger(slog.Default()))
}

Пример: Subscriber (OMS ← RabbitMQ)

// internal/oms/infra/subscriber.go
func NewEventSubscriber(amqpURI string) (*amqp.Subscriber, error) {
    config := amqp.NewDurableQueueConfig(amqpURI)

    config.Queue = amqp.QueueConfig{
        GenerateName: func(topic string) string {
            return "oms.q." + strings.ReplaceAll(topic, ".", "-")
        },
        Durable:   true,
        Arguments:  amqp.Table{
            "x-queue-type":      "quorum",
            "x-delivery-limit":  int32(5),
            "x-dead-letter-exchange": "dlx",
        },
    }

    return amqp.NewSubscriber(config, watermill.NewSlogLogger(slog.Default()))
}

Пример: Router (маршрутизация событий к обработчикам)

// internal/oms/app.go
func setupRouter(pub *amqp.Publisher, sub *amqp.Subscriber) *message.Router {
    router, _ := message.NewRouter(message.RouterConfig{}, watermill.NewSlogLogger(slog.Default()))

    // Middleware
    router.AddMiddleware(
        middleware.CorrelationID,             // Прокидывание correlation_id
        middleware.Retry{MaxRetries: 3, InitialInterval: time.Second}.Middleware,
        middleware.Recoverer,                 // Panic recovery
        middleware.NewThrottle(100, time.Second).Middleware, // Rate limit
    )

    // Обработчики событий
    router.AddHandler("provisioning-result",
        "provisioning.success",              // Подписка на routing key
        sub,
        "oms.saga.next-step",                // Публикация (если нужно)
        pub,
        omsHandlers.HandleProvisioningResult,
    )

    router.AddNoPublisherHandler("payment-received",
        "payment.received",
        sub,
        omsHandlers.HandlePaymentReceived,
    )

    return router
}

ConnectRPC: Interceptor Stack

Каждый ConnectRPC-сервер использует стек interceptor'ов (middleware) для сквозных задач:

// internal/pkg/server/interceptors.go
func BuildInterceptors(keycloakKey *rsa.PublicKey, rpcRoles map[string][]string) []connect.Interceptor {
    return []connect.Interceptor{
        // 1. Recovery (паника → gRPC INTERNAL)
        otelconnect.NewInterceptor(),        // OpenTelemetry tracing

        // 2. Logging (slog)
        NewLoggingInterceptor(slog.Default()),

        // 3. Auth (JWT from Keycloak + RBAC)
        NewAuthInterceptor(keycloakKey, rpcRoles),

        // 4. Validation (protovalidate)
        NewValidationInterceptor(),

        // 5. Rate Limiting
        NewRateLimitInterceptor(),

        // 6. Idempotency (для мутирующих RPC)
        NewIdempotencyInterceptor(),

        // 7. Circuit Breaker (для исходящих вызовов)
        // Используется на ConnectRPC CLIENT, не на сервере
    }
}

Circuit Breaker (sony/gobreaker)

Для исходящих ConnectRPC-вызовов (service → service) используем circuit breaker:

// internal/pkg/client/circuit_breaker.go
import "github.com/sony/gobreaker/v2"

func NewCircuitBreaker(name string) *gobreaker.CircuitBreaker[any] {
    return gobreaker.NewCircuitBreaker[any](gobreaker.Settings{
        Name:        name,
        MaxRequests: 5,                              // Полуоткрытое состояние: 5 пробных запросов
        Interval:    30 * time.Second,               // Сброс счётчиков
        Timeout:     60 * time.Second,               // Время в Open → Half-Open
        ReadyToTrip: func(counts gobreaker.Counts) bool {
            return counts.ConsecutiveFailures > 3    // 3 подряд → Open
        },
        OnStateChange: func(name string, from, to gobreaker.State) {
            slog.Warn("circuit breaker state change",
                slog.String("name", name),
                slog.String("from", string(from)),
                slog.String("to", string(to)),
            )
        },
    })
}

Retry с Exponential Backoff (cenkalti/backoff)

// internal/pkg/retry/retry.go
import "github.com/cenkalti/backoff/v4"

func WithRetry(ctx context.Context, operation func() error) error {
    b := backoff.NewExponentialBackOff()
    b.InitialInterval = 200 * time.Millisecond
    b.MaxInterval = 30 * time.Second
    b.MaxElapsedTime = 5 * time.Minute
    b.Multiplier = 2.0

    return backoff.Retry(operation, backoff.WithContext(b, ctx))
}

DDD: Границы агрегатов в API

Каждый Protobuf service соответствует одному Bounded Context. Правила DDD для API:

Правила агрегатов

  1. Один агрегат — один сервис. CustomerService управляет только агрегатом Customer (+ вложенные Value Objects: Contact, Address, Contract).
  2. Ссылки между контекстами — только по ID. Subscription.customer_id — это string (UUID), не вложенный Customer message. Данные клиента достаются отдельным RPC-вызовом.
  3. Мутации — только через Aggregate Root. Нельзя напрямую менять Transaction — только через BillingService.RegisterPayment (который создаёт Transaction внутри Account).
  4. События — факты, команды — намерения. PaymentReceivedEvent = факт (уже случилось). ChargeAccountCommand = намерение (просьба выполнить).
  5. Eventual Consistency. После SubscriptionActivatedEvent Billing со временем создаст запись тарификации. Немедленная консистентность — только внутри одного агрегата.

Карта зависимостей (кто кого вызывает)

Loading diagram...

Правило: Sync (ConnectRPC) — для чтений и пользовательских мутаций (через BFF). Async (RabbitMQ) — для межсервисных реакций на события и Saga-шагов.


Полный стек Go-библиотек

КатегорияБиблиотекаВерсияНазначение
RPCconnectrpc.com/connectv1.xConnectRPC сервер/клиент
Protobufgoogle.golang.org/protobufv1.xProtobuf runtime
Validationgithub.com/bufbuild/protovalidate-gov0.xВалидация полей из .proto
Messaginggithub.com/ThreeDotsLabs/watermillv1.xEvent bus абстракция
Messaginggithub.com/ThreeDotsLabs/watermill-amqp/v3v3.xWatermill → RabbitMQ adapter
Background Jobsgithub.com/riverqueue/riverv0.xPostgreSQL job queue (аналог BullMQ)
Databasegithub.com/jackc/pgx/v5v5.xPostgreSQL driver
Logginglog/slogstdlibStructured logging
Tracinggo.opentelemetry.io/otelv1.xOpenTelemetry SDK
Circuit Breakergithub.com/sony/gobreaker/v2v2.xCircuit breaker для исходящих вызовов
Retrygithub.com/cenkalti/backoff/v4v4.xExponential backoff
HTTP Routergithub.com/go-chi/chi/v5v5.xHTTP mux (для health/metrics endpoints)
Configgithub.com/caarlos0/env/v11v11.xEnv-based config
Migrationsgithub.com/pressly/goose/v3v3.xSQL миграции
Testinggithub.com/stretchr/testifyv1.xAssertions + mocks
Testinggithub.com/testcontainers/testcontainers-gov0.xИнтеграционные тесты с Docker

Ссылки по теме

On this page

API-контракты и ИнтеграцияSchema-First: Protobuf + buf.buildПочему Protobuf, а не JSON/OpenAPIСтруктура proto-репозиторияbuf.yamlbuf.gen.yaml (кодогенерация)CI Pipeline: buf в CI/CDПример: Protobuf-контрактcommon/v1/metadata.protocustomer/v1/customer_service.protocommon/v1/money.protocommon/v1/pagination.protocommon/v1/events.protoproduct/v1/product_service.protobilling/v1/billing_service.protoorchestration/v1/order_service.protoinventory/v1/inventory_service.protoprovisioning/v1/provisioning_service.protoaaa/v1/aaa_service.protonotification/v1/notification_service.protoСинхронные вызовы: ConnectRPCПочему ConnectRPC, а не чистый gRPCАрхитектура вызововКарта RPC-сервисовВалидация (protovalidate)Асинхронный обмен: RabbitMQПочему RabbitMQТопология RabbitMQКарта Exchanges и QueuesКонфигурация очередейФормат сообщений (Protobuf EventEnvelope)Гарантии доставкиПаттерны интеграцииTransactional Outbox PatternInbox Pattern (Дедупликация)Saga Pattern (Orchestration)ИдемпотентностьRate Limiting и ThrottlingПолный каталог Domain Events (Protobuf)customer/v1/events.protoproduct/v1/events.protobilling/v1/events.protoprovisioning/v1/events.protoinventory/v1/events.protoПолный каталог Command Messages (Protobuf)provisioning/v1/commands.protobilling/v1/commands.protonotification/v1/commands.protoПолная матрица Exchange → Queue → ConsumerEvent Exchanges (Topic)Command Exchanges (Direct)Background Jobs: river (Go)Почему riverКогда river, когда RabbitMQПример: рекуррентный биллинг через riverПример: cron-задачи через river PeriodicJobsТранзакционная постановка задачи (ключевое преимущество)Event Bus: Watermill (абстракция над RabbitMQ)Зачем Watermill поверх amqp091-goПример: Publisher (Billing → RabbitMQ)Пример: Subscriber (OMS ← RabbitMQ)Пример: Router (маршрутизация событий к обработчикам)ConnectRPC: Interceptor StackCircuit Breaker (sony/gobreaker)Retry с Exponential Backoff (cenkalti/backoff)DDD: Границы агрегатов в APIПравила агрегатовКарта зависимостей (кто кого вызывает)Полный стек Go-библиотекСсылки по теме