Информация в этой статье предназначена исключительно для клиентов, действующих на территории Российской Федерации. Учитывайте это при применении материалов.

Статья содержит инструкции по настройке Apache Kafka для передачи данных модуля "Выгрузка для СОРМ" платформы BILLmanager. В текущей версии платформа тестировалась с Kafka 3.8.  Инструкции для актуальной версии Kafka см. в официальной документации .

Apache Kafka (Kafka) — распределённая платформа потоковой передачи данных, которая позволяет обрабатывать большие объёмы данных в реальном времени. Она используется для создания систем обработки данных, хранения и передачи сообщений между различными компонентами приложений. Подробнее см. в официальной документации Kafka.

Основные компоненты KafkaLink to Основные компоненты Kafka


Основными компонентами Kafka являются:

  • Топики (Topics)  — логические каналы, в которые записываются сообщения. Каждый топик может иметь несколько партиций для масштабируемости;
  • Партиции (Partitions) — подмножества топиков, которые позволяют разделять данные для более эффективного хранения и обработки. Каждая партиция хранит упорядоченные сообщения и может быть распределена по различным брокерам;
  • Брокеры (Brokers) серверы, которые хранят данные и обрабатывают запросы на запись и чтение;
  • Продюсеры (Producers) приложения, которые отправляют (публикуют) сообщения в топики;
  • Потребители (Consumers)  — приложения, которые читают (подписываются на) сообщения из топиков;
  • Группы потребителей (Consumer Groups) группы, которые позволяют нескольким потребителям совместно обрабатывать данные из одного топика.

Для запуска Kafka требуется установка сервиса ZooKeeper или KRaft. Выбор сервиса зависит от версии Kafka:

  • ниже версии 2.8 — ZooKeeper;

  • версии 2.8-3.9 — ZooKeeper или KRaft;

  • версии 4.0 и выше — KRaft.

Установка Kafka на сервер совместно с ZooKeeperLink to Установка Kafka на сервер совместно с ZooKeeper


Apache ZooKeeper  — открытый сервер, который может использоваться совместно с Kafka для хранения конфигурационных данных BILLmanager. Чтобы установить Kafka совместно с Zookeeper:

  1. Загрузите актуальную версию Kafka с официального сайта Apache Kafka:
    curl https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
    BASH
  2. Распакуйте скачанный архив:
    tar -xzf kafka_2.13-3.8.0.tgz
    BASH
  3. Перейдите в директорию Kafka:
    cd kafka_2.13-3.8.0
    BASH
  4. Запустите ZooKeeper с помощью команды:
    bin/zookeeper-server-start.sh config/zookeeper.properties
    BASH
  5. Запустите брокер Kafka:
    bin/kafka-server-start.sh config/server.properties
    BASH

Установка Kafka на сервер совместно с KRaftLink to Установка Kafka на сервер совместно с KRaft


KRaft (Kafka Raft) — режим управления метаданными в Apache Kafka, который позволяет работать без ZooKeeper, начиная с версии Kafka 2.8.0. Установка Apache Kafka совместно с KRaft на сервере с BILLmanager не тестировалась. Подробнее о работе с KRaft см. официальную документацию Kafka, раздел Quickstart.

Установка и настройка Kafka на системе с DockerLink to Установка и настройка Kafka на системе с Docker


  1. Чтобы установить Kafka с помощью Docker, введите команду:

    docker run -p 9092:9092 --name kafka -d apache/kafka:3.8.0
    BASH

    docker run — команда создаёт и запускает новый контейнер на основе указанного образа

    -p 9092:9092 — параметр связывает порт 9092 на хост-машине с портом 9092 внутри контейнера. Это позволяет приложениям на хосте подключаться к Kafka, работающему в контейнере, через этот порт

    --name kafka — параметр задаёт имя контейнера. В данном случае контейнер будет называться kafka

    -d  — параметр указывает Docker запустить контейнер в фоновом режиме (detached mode). Это значит, что терминал не будет заблокирован, и вы сможете продолжать использовать его для других команд

    apache/kafka:3.8.0  — имя образа, который будет использоваться для создания контейнера. В данном случае это образ Apache Kafka версии 3.8.0. Docker загрузит этот образ из Docker Hub, если он не был загружен ранее

  2. Создайте топики:
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic %имя топика%
    BASH

    docker exec — команда позволяет выполнять команды внутри работающего контейнера Docker. В данном случае, для выполнения скрипта Kafka внутри контейнера

    -it — параметры указывают Docker, что нужно запустить команду в интерактивном режиме (-i для интерактивного ввода и -t для создания терминала)

    kafka — имя контейнера, в котором запущен Apache Kafka

    /opt/kafka/bin/kafka-topics.sh  — путь к скрипту, который управляет топиками в Kafka

    --bootstrap-server localhost:9092 — параметр указывает адрес и порт сервера Kafka, к которому нужно подключиться. В данном случае это локальный сервер, работающий на порту 9092

    --create — параметр указывает, что необходимо создать новый топик

    --topic %имя топика % — название, которое будет использоваться для идентификации топика в Kafka

Пример базовой настройки Kafka с помощью Docker см. в статье документации Docker Hub https://hub.docker.com/r/apache/kafka

Выгрузка данных в KafkaLink to Выгрузка данных в Kafka


После применения этих настроек информация не будет сохраняться в базе данных billmgr_sorm.

В качестве продюсера Kafka может выступать модуль "Выгрузка для СОРМ" в BILLmanager. Эта интеграция не настроена по умолчанию. Подробнее о настройке продюсеров см. в официальной документации Kafka в статье Kafka Producer.

Чтобы настроить выгрузку данных из модуля "Выгрузка для СОРМ" в Kafka:

  1. Установите библиотеки Python:
    pip3 install kafka-python-ng importlib_metadata
    BASH

    pip3 — команда для запуска менеджера пакетов pip для установки и управления библиотеками Python версии 3

    install  — параметр указывает pip, что необходимо установить указанные пакеты

    kafka-python-ng — библиотека, которая требуется для для настройки и мониторинга потоков данных и корректной работы модуля.  В официальной документации kafka-python указаны версии Kafka 0.8-2.6 в качестве рекомендуемых. Разработчики не гарантируют полную совместимость с более новыми версиями Kafka. Подробнее см. официальную документацию kafka-python на GitHub: kafka-python Documentation

    importlib_metadata — библиотека, которая предоставляет доступ к метаданным пакетов, установленным в среде Python. Например, информацию о версиях пакетов и их зависимостях

  2. Cоздайте конфигурационный файл /usr/local/mgr5/etc/sorm/kafka.json

    Пример файла

    {
     "topic": "sorm_data",
     "topics": {
      "client": "sorm_client",
      "user": "sorm_user",
      "auth_history": "sorm_auth",
      "payment": "sorm_payment",
      "item": "sorm_item",
      "domain": "sorm_domain",
      "ip": "sorm_ip",
      "ticket_message": "sorm_message"
     },
     "bootstrap.servers": "localhost:9092",
     "security.protocol": "SASL_SSL",
     "sasl.mechanism": "PLAIN",
     "sasl.username": "your_username",
     "sasl.password": "your_password"
    }
    CODE

    topic — имя топика в Kafka. Используется, если для типа данных не указан отдельный топик

    topics — имена топиков для каждого типа данных:

    • client — клиенты;
    • user — пользователи;
    • auth_history — данные авторизации;
    • payment — платежи;
    • item — услуги;
    • domain — домены;
    • ip — IP-адреса;
    • ticket_message — сообщения запросов

    bootstrap.servers — список адресов Kafka-брокеров, к которым клиент будет подключаться для инициализации соединения. Параметр необходим для того, чтобы клиент мог получить информацию о других брокерах в кластере

    localhost:9092 — клиент будет пытаться подключиться к брокеру, работающему на том же сервере, где запущен клиент, через стандартный порт 9092

    security.protocol — протокол безопасности. Поддерживается только SASL_SSL 

    sasl.mechanism — механизм, используемый протоколом безопасности. Поддерживается только PLAIN 

    sasl.username — задайте логин Kafka

    sasl.password — задайте пароль Kafka

    • Обязательным параметром является только bootstrap.servers.
    • Вы можете не указывать параметры security.protocol, sasl.mechanism, sasl.username и sasl.password. Однако, если хотя бы один из этих параметров содержится в конфигурационном файле, то и остальные параметры должны быть включены.
    • Если для типа данных не указан отдельный топик и параметр topic не задан, то информация будет записана в топик sorm:
      1. Если для типа данных есть непустой топик в topics в конфигурационном файле, то будет выбран он.
      2. Если условие 1 не выполнено, то модуль проверяет наличие непустого topic и задействует его для выгрузки.
      3. Если условия 1 и 2 не выполнены, отправка будет осуществляться в топик "sorm".

Проверка работы KafkaLink to Проверка работы Kafka


Для проверки работы Kafka вы можете читать и отправлять сообщения в топики.

Чтобы читать сообщения из топика, запустите команду:

bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
BASH

Чтобы отправить сообщение в топик, запустите команду:

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
BASH