Skip to content

Интеграция REST и MQ брокеров сообщений через шлюз OpenIG

maximthomas edited this page Aug 5, 2024 · 2 revisions

Для чего это нужно

Конвертация сообщений между брокером и REST упрощает прием и отправку сообщений без использования нативных протоколов или клиентский приложений брокеров сообщений:

Возможные варианты использования:

  • Асинхронное взаимодействие между сервисами. Конвертация REST запросов в сообщения брокера способствует ослаблению связи между сервисами, способствует увеличению производительности и устойчивости к ошибкам
  • Сбор логов. Мобильные приложения могут отправлять логи своей работы через REST в брокер сообщений.
  • Согласование протоколов. Не все приложения имеют возможность взаимодействия через брокеры сообщений, для их интеграции используется конвертация REST-брокер сообщений.
  • Пересечение сегментов. Сегменты предприятия, как правило, разделены и взаимодействуют между собой, используя брокер сообщений.

В статье мы настроим шлюз с открытым исходным кодом OpenIG для конвертации сообщений брокера в REST и обратно.

Подготовка к работе

Предположим, у вас уже установлен и настроен OpenIG. Если же нет, то как быстро это сделать, описано в статье How To Protect Web Services with OpenIG.

Вы можете так же использовать демонстрационный проект https://github.com/maximthomas/openig-mb-example как стартовую точку

Варианты использования

Отправка HTTP запросов в Apache Kafka

Настройка позволяет получать сообщения по HTTP протоколу и отправлять их в Apache Kafka.

Добавьте в файл конфигурации OpenIG config.json в обработчик Kafka producer:

{
  "heap": [
    ...
    {
      "name": "kafka-producer",
      "type": "MQ_Kafka",
      "config": {
        "bootstrap.servers": "kafka:9092",
        "topic.produce": "incoming-messages"
      }
    },
    ...
  ]
} 

Важные настройки обработчика:

Настройка Описание
boostrap.server Список хотсто и портов Apache Kafka, указанные через запятую
topic.produce Топик, в который OpenIG отправляет сообщения
topic.consume Топик, из которого OpenIG читает сообщения
uri Конечная точка маршрута OpenIG
method Метод HTTP, который использует OpenIG для отправки запросов по HTTP

Добавьте маршрут OpenIG, который получать HTTP запросы и отправлять сообщения в Apache Kafka:

routes/10-http2kafka.json:

{
  "name": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2kafka$')}",
  "condition": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2kafka$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
    "filters": [],
      "handler": {
        "type": "DispatchHandler",
        "config": {
          "bindings": [
            {
              "handler": "kafka-consumer"
            }
          ]
        }
      }
    }
  }

Примеры файлов конфигурации находятся в проекте в директории openig/config

Запустите Docker контейнеры командой

docker compose -f docker-compose.yml up

Создайте топик для Apache Kafka. Пример команды для Docker контейнера:

docker exec openig-mb-example-kafka-1 kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092

Отправьте HTTP запрос в OpenIG и проверьте сообщения в созданном топике:

curl -v -X PUT --data '{"data": "test"}' -H 'Content-Type: application/json' 'http://localhost:8080/http2kafka'
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> PUT /http2kafka HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 16
> 
* upload completely sent off: 16 out of 16 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 202 Accepted
< Server: Apache-Coyote/1.1
< Content-Length: 0
< Date: Wed, 13 Apr 2022 12:34:03 GMT
< 
* Connection #0 to host localhost left intact
docker exec openig-mb-example-kafka-1 kafka-console-consumer.sh --topic topic1 --from-beginning --bootstrap-server localhost:9092
{"data": "test"}

Отправка сообщений Kafka в HTTP

В следующей конфигурации OpenIG будет получать сообщения и топика topic2 Apache Kafka и отправлять их на конечную точку HTTP.

Потушите Docker контейнеры командой

docker compose -f docker-compose.yml down

Добавьте в файл конфигурации обработчик Kafka producer.

config.json

{
  "heap": [
    ...
    {
      "name": "kafka-consumer",
      "type": "MQ_Kafka",
      "config": {
        "bootstrap.servers": "kafka:9092",
        "topic.consume": "topic2",
        "method": "POST"
      }
    },
    ...
  ]
}

Добавьте в OpenIG маршрут, который будет слушать сообщения из Apache Kafka и перенаправлять их на конечную точку HTTP.

routes/10-kafka2http.json

{
  "name": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
  "condition": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
      "filters": [],
      "handler": {
      "type": "DispatchHandler",
        "config": {
          "bindings": [{
              "handler": "ClientHandler",
              "capture": "all",
              "baseURI": "${system['endpoint.api']}"
          }]
        }
      }
    }
  }
}

Обратите внимание на свойство baseURI . В нем указан URI конечной точки HTTP. Значение берется из системного свойства. указанного в файле docker-compose.yaml -Dendpoint.api=http://sample-service:8080 для сервиса OpenIG

Добавьте в Apache Kafka топик topic2, из которого OpenIG будет читать сообщения и перенаправлять их на конечную точку HTTP.

docker exec openig-mb-example-kafka-1 kafka-topics.sh --create --topic topic2 --bootstrap-server localhost:9092

Отправим тестовые данные в созданный топик:

docker exec -it openig-mb-example-kafka-1 kafka-console-producer.sh --topic topic2 --bootstrap-server localhost:9092
>{"data": "test"}

В логе сервиса sample-service на конечную точку которого OpenIG перенаправляет сообщения появится запись:

2024-07-05T08:46:37.540Z DEBUG 1 --- [nio-8080-exec-1] o.s.w.f.CommonsRequestLoggingFilter      : After request [POST /kafka2http, headers=[correlation-id:"8dd45456-433d-42cb-b992-27047ae75ed9", kafka-offset:"0", kafka-timestamp:"1720169196044", kafka-timestamp-date:"Fri Jul 05 08:46:36 UTC 2024", kafka-topic:"topic2", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/17.0.9)"], payload={"data": "test"}]

Настройка встроенного в OpenIG Apache Kafka

Если в инфраструктуре предприятия нет брокера сообщений, но есть потребность получать и перенаправлять сообщения брокера, то OpenIG предлагает встроенный брокер сообщений. Для использования встроенного Apache Kafka, добавьте в файл конфигурации OpenIG объект EmbeddedKafka

config.json

{
  "heap": [
    ...
      {
        "name": "EmbeddedKafka",
        "type": "EmbeddedKafka",
        "config": {
          "zookeper.port": "${system['zookeper.port']}",
          "security.inter.broker.protocol": "${empty system['keystore.location'] ?'PLAINTEXT':'SSL'}",
          "listeners": "${system['kafka.bootstrap']}",
          "advertised.listeners": "${system['kafka.bootstrap']}",
          "ssl.endpoint.identification.algorithm": "",
          "ssl.enabled.protocols":"TLSv1.2",
          "ssl.keystore.location":"${system['keystore.location']}",
          "ssl.keystore.password":"${empty system['keystore.password']?'changeit':system['keystore.password']}",
          "ssl.key.password":"${empty system['key.password']?'changeit':system['key.password']}",
          "ssl.truststore.location":"${system['truststore.location']}",
          "ssl.truststore.password":"${empty system['truststore.password']?'changeit':system['truststore.password']}"			
        },
    ...
  ]
}

Важные настройки EmbeddedKafka:

Настройка Описание
zookeper.port Порт Zookeper для встроенного Apache Kafka. Если не установлен, Kafra не запустится
listeners Имена хостов и порты, которые будет слушать встроенный Apache Kafka.
advertised.listeners Имена хостов и порты клиентов встроенного Apache Kafka.

Добавьте Kafka listener в массив heap OpenIG и создайте маршрут, который будет слушать сообщения Kafka и перенаправлять их на конечную точку HTTP (вы можете так же перенаправлять сообщения на другой брокер).

config.json

{
  "heap": [
    ...
      {
      "name": "kafka-consumer",
      "type": "MQ_Kafka",
      "config": {
        "bootstrap.servers": "openig:9092",
        "topic.consume": "topic1",
        "method": "POST",
        "uri": "/kafka2http"
      }
    ...
  ]
}

10-kafka2http.json

{
  "name": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
  "condition": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
      "filters": [],
      "handler": {
      "type": "DispatchHandler",
        "config": {
          "bindings": [{
              "handler": "ClientHandler",
              "capture": "all",
              "baseURI": "${system['endpoint.api']}"
          }]
        }
      }
    }
  }

Запустите OpenIG. Теперь вы можете создать topic и отправлять сообщения в этот topic.

$ kafka-console-producer.sh --topic topic1 --bootstrap-server localhost:9092
>{"data": "test"}

В тестовом сервисе в логе появится сообщение, перенаправленное OpenIG из брокера на конечную точку HTTP.

2022-04-21 07:26:14.645 DEBUG 1 --- [nio-8080-exec-6] o.s.w.f.CommonsRequestLoggingFilter      : After request [POST /kafka2http, headers=[kafka-offset:"29", kafka-topic:"topic2", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/1.8.0_212)"], payload={"data": "test"}]

Интеграция с IBM MQ

Отправка HTTP запросов в IBM MQ

Следующая настройка позволяет получать сообщения по HTTP протоколу и отправлять их в topic IBM MQ:

Добавьте обработчик IBM MQ Consumer в heap в файл конфигурации OpenIG:

config.json

{
  "heap": [
    ...
    {
      "name": "mq-producer",
      "type": "MQ_IBM",
      "config": {
        "XMSC_WMQ_CONNECTION_NAME_LIST":"mq(1414)",
        "XMSC_WMQ_CHANNEL":"DEV.APP.SVRCONN",
        "XMSC_WMQ_QUEUE_MANAGER":"QM1",
        "XMSC_USERID":"app",
        "XMSC_PASSWORD":"passw0rd",
        "topic.produce": "DEV.QUEUE.1"
      }
    },
    ...
  ]
}

Важные настройки IBM MQ:

Setting Name
XMSC_WMQ_CONNECTION_NAME_LIST Адреса брокеров IBM MQ в формате списка именов хостов и портов, указанные через запятую
XMSC_WMQ_CHANNEL Имя канала IBM MQ, используется для соединения
XMSC_USERID Имя пользователя IBM MQ
XMSC_PASSWORD Пароль пользователя IBM MQ
topic.produce Топик, в который OpenIG должен слать сообщения
topic.consume Топик, из кторого OpenIG читает сообщения
uri Конечная точка OpenIG
method Метод HTTP, который OpenIG использует для отправки запросов на конечную точку HTTP

Добавьте маршрут OpenIG в папку routes для обработки HTTP запросов.

10-http2mq.json

{
  "name": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2mq$')}",
  "condition": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2mq$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
      "filters": [],
      "handler": {
        "type": "DispatchHandler",
        "config": {
          "bindings": [
            {
              "handler": "mq-producer"
            }
          ]
        }
      }
    }
  }
}

Отправьте HTTP запрос в OpenIG и проверьте полученное сообщение в топике DEV.QUEUE.1 IBM MQ:

$ curl -v -X PUT --data '{"data": "test"}' -H 'Content-Type: application/json' 'http://localhost:8080/http2mq'
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> PUT /http2mq HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 16
> 
* upload completely sent off: 16 out of 16 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 202 Accepted
< Server: Apache-Coyote/1.1
< Content-Length: 0
< Date: Wed, 13 Apr 2022 12:34:03 GMT
< 
* Connection #0 to host localhost left intact

Откройте консоль IBM MQ по адресу  https://localhost:9443/ibmmq/console/. В топике DEV.QUEUE.1 вы увидите полученное сообщение:

IBM MQ DEV.QUEUE1

Отправка сообщений IBM MQ на конечную точку HTTP

Добавьте IBM MQ cosumer в heap в файл конфигурации OpenIG config.json.

{
  "heap": [
    ...
    {
      "name": "mq-consumer",
      "type": "MQ_IBM",
      "config": {
        "XMSC_WMQ_CONNECTION_NAME_LIST":"mq(1414)",
        "XMSC_WMQ_CHANNEL":"DEV.APP.SVRCONN",
        "XMSC_WMQ_QUEUE_MANAGER":"QM1",
        "XMSC_USERID":"app",
        "XMSC_PASSWORD":"passw0rd",
        "topic.consume": "DEV.QUEUE.2",
        "uri": "/mq2http",
        "method": "POST"
      }
    }
    ...
  ]
}

Добавьте маршрут OpenIG в папку routes для обработки сообщений IBM MQ:

10-mq2http.json

{
  "name": "${(request.method == 'POST') and matches(request.uri.path, '^/mq2http$')}",
  "condition": "${(request.method == 'POST') and matches(request.uri.path, '^/mq2http$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
      "filters": [],
      "handler": {
        "type": "DispatchHandler",
        "config": {
          "bindings": [
            {
              "handler": "ClientHandler",
              "capture": "all",
              "baseURI": "${system['endpoint.api']}"
            }
          ]
        }
      }
    }
  }
}

Зайдите в консоль IBM MQ и отправьте сообщение в топик DEV.QUEUE.2

IBM MQ DEV.QUEUE1

В логе сервиса sample-servive вы увидите следующее сообщение:

2022-04-21 08:32:35.007 DEBUG 1 --- [nio-8080-exec-1] o.s.w.f.CommonsRequestLoggingFilter      : After request [POST /mq2http, headers=[jms_ibm_character_set:"UTF-8", jms_ibm_encoding:"273", jms_ibm_format:"MQSTR", jms_ibm_msgtype:"8", jms_ibm_putappltype:"6", jms_ibm_putdate:"20220421", jms_ibm_puttime:"08323434", jmsxappid:"com.ibm.mq.webconsole", jmsxdeliverycount:"1", jmsxuserid:"unknown", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/1.8.0_212)"], payload={"data": "test"}]