Spring Integration-动态数据流

烟花,哈伯!今天,我们将分析一个相当具体的领域-使用Spring Integration框架进行数据流传输,以及如何在运行时进行这些流而无需在应用程序上下文中进行初步初始化。完整的示例应用程序位于Gita中



介绍



Spring Integration是一个企业集成框架(EIP),它基于消息通道(条件队列)在不同协议/集成系统的适配器之间使用消息传递机制。著名的类似物是骆驼,M子,尼菲。



从测试用例中,我们将-使REST服务能够读取接收到的请求参数,进入我们的数据库,例如postgres,根据从源接收到的参数更新和获取表数据,然后将结果发送回队列(请求/响应),还可以使多个实例具有不同的请求路径。



按照惯例,数据流程图如下所示:



图片



接下来,我将展示如何使用IntegrationFlowContext和REST控制组件/线程终结点,而无需像铃鼓跳舞那么简单。所有主要的项目代码都将位于存储库中,在这里我仅表示一些剪辑。好吧,请有兴趣的人在猫的陪伴下。



工具类



让我们从默认的依赖关系块开始。基本上,我们将需要spring-boot项目-用于流程/组件管理的REST意识形态,spring-integration-基于通道和适配器创建案例。



我们立即想到了重制此案还需要做些什么。除了核心依赖性之外,我们还将需要子项目-Integration-http,integration-jdbc,integration-groovy(提供基于Goovy脚本的可动态自定义的数据转换器)。另外,我将说在此示例中,我们将不需要使用groovy转换器,但我们将提供从外部对其进行自定义的功能。



依赖清单
 <!-- Spring block -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-groovy</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>

        <!-- Db block -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
        </dependency>

        <!-- Utility block -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.reflections</groupId>
            <artifactId>reflections</artifactId>
            <version>0.9.12</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>




内部厨房



让我们继续创建必要的系统组件(包装器/模型)。我们将需要通道,bean,httpInboundGateway,处理程序,jdbcOutboundGateway和结果模型。



  • bean-适配器,线程所需的帮助对象
  • channel-用于向/从流组件传递消息的通道
  • httpInboundGateway-我们将进一步向其中发送请求的数据进行进一步处理的http访问点
  • 处理程序-处理程序的通用类型(槽形变压器,各种适配器等)
  • jdbcOutboundGateway-jdbc适配器
  • 结果-将信息发送到特定频道的处理程序


我们将需要包装器来存储参数并正确初始化整个流的组件,因此我们立即创建一个组件存储,即add。 JSON转换器的功能->定义模型。在我的案例中,使用杰克逊和对象进行字段的直接映射不适用-我们还有一辆用于特定通信协议的自行车。



让我们 立即使用批注很好地进行操作



StreamComponent-负责将类标识为流组件的调整模型,并具有服务信息-组件的名称,组件的类型,组件是否嵌套和描述;



SettingClass-负责扫描模型的其他选项,例如扫描超类字段和在初始化值时忽略字段;



SettingValue-负责将类字段标识为可自定义的外部字段,并以JSON,描述,类型转换器,必填字段标志和内部对象标志的命名设置,以供参考;用于REST控制器模型的



组件存储管理器



Helper方法



基础模型-带有一组辅助字段/模型方法的抽象



当前流配置模型



Mapper JSON->定义模型



准备工作的主要基础。现在让我们直接了解将负责流的生命周期,存储和初始化的服务的实现,然后我们将立即提出可以将具有相同命名的1个流并行化为多个实例的想法,即我们将需要为流的所有组件创建唯一的标识符(指南),否则在应用程序上下文中可能会与其他单例组件(bean,通道等)发生冲突。但首先让我们制作两个组件的映射器-http和jdbc,即先前针对流本身的组件(HttpRequestHandlerEndpointSpec和JdbcOutboundGateway)制作的模型的增量。



HttpRegistry



JdbcRegistry



中央管理服务(StreamDeployingService)执行存储工作线程/非活动线程,注册新线程,启动,停止和从应用程序上下文中完全删除线程的功能。该服务的一个重要功能是引入了IntegrationFlowBuilderRegistry依赖关系,它有助于我们使应用程序动态化(也许记住这些配置xml文件或DSL类数以千计)。根据流规范,它必须始终以入站组件或通道开头,因此我们在registerStreamContext方法的实现中将其考虑在内。



和辅助管理器(IntegrationFlowBuilderRegistry),它既可以执行模型映射器的功能来使用流组件,也可以使用IntegrationFlowBuilder来初始化流本身。我还在流管道中实现了日志处理程序,用于收集流通道指标的服务(可切换选项)以及基于Groovy实现的流消息转换器的可能实现(如果此示例突然成为销售的基础,则必须在流初始化阶段对groovy脚本进行预编译) ,因为您要在RAM中进行负载测试,而无论您拥有多少核和电源。取决于模型的日志阶段和日志级别参数的配置,在每次从组件到组件的消息传输之后,它将激活。通过application.yml中的参数启用和禁用监视:



monitoring:
  injectction:
    default: true


现在我们拥有初始化动态数据处理流的所有机制,我们还可以为各种协议和适配器(例如RabbitMQ,Kafka,Tcp,Ftp等)编写映射器。而且,在大多数情况下,您无需自己动手编写任何内容(当然,配置模型和辅助方法除外)- 存储库中已经存在相当多的组件



最后阶段将是实现用于获取有关现有系统组件的信息,管理流程和获取指标的控制器。



ComponentsController-提供有关人类可读模型中所有组件的信息,并按名称和类型提供一个组件。



流控制器 -提供全面的流程管理,即初始化新的JSON模型,按标识符启动,停止,删除和发布指标。



最终产品



我们提出结果应用程序,并以JSON格式描述测试用例。



样本数据流
:



CREATE TABLE IF NOT EXISTS account_data
(
    id          INT                      NOT NULL,
    accountname VARCHAR(45)              NOT NULL,
    password    VARCHAR(128),
    email       VARCHAR(255),
    last_ip     VARCHAR(15) DEFAULT NULL NOT NULL
);

CREATE UNIQUE INDEX account_data_username_uindex
    ON account_data (accountname);

ALTER TABLE account_data
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_data_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_data
    ADD CONSTRAINT account_data_pk
        PRIMARY KEY (id);

CREATE TABLE IF NOT EXISTS account_info
(
    id             INT NOT NULL,
    banned         BOOLEAN  DEFAULT FALSE,
    premium_points INT      DEFAULT 0,
    premium_type   SMALLINT DEFAULT -1
);

ALTER TABLE account_info
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_info_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_info
    ADD CONSTRAINT account_info_account_data_id_fk FOREIGN KEY (id) REFERENCES account_data (id)
        ON UPDATE CASCADE ON DELETE CASCADE;

ALTER TABLE account_info
    ADD CONSTRAINT account_info_pk
        PRIMARY KEY (id);



INSERT INTO account_data (accountname, password, email, last_ip)
VALUES ('test', 'test', 'test@test', '127.0.0.1');
INSERT INTO account_info (banned, premium_points, premium_type)
VALUES (false, 1000, 1);


: order — , .. , . ( ). — .



{
  "flowName": "Rest Postgres stream",
  "components": [
    {
      "componentName": "bean",
      "componentType": "other",
      "componentParameters": {
        "id": "pgDataSource",
        "bean-type": "com.zaxxer.hikari.HikariDataSource",
        "property-args": [
          {
            "property-name": "username",
            "property-value": "postgres"
          },
          {
            "property-name": "password",
            "property-value": "postgres"
          },
          {
            "property-name": "jdbcUrl",
            "property-value": "jdbc:postgresql://localhost:5432/test"
          },
          {
            "property-name": "driverClassName",
            "property-value": "org.postgresql.Driver"
          }
        ]
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcReqChannel",
        "order": 1,
        "channel-type": "direct",
        "max-subscribers": 1000
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcRepChannel",
        "order": 1,
        "channel-type": "direct"
      }
    },
    {
      "componentName": "http-inbound-gateway",
      "componentType": "source",
      "componentParameters": {
        "order": 2,
        "http-inbound-supported-methods": [
          "POST"
        ],
        "payload-type": "org.genfork.integration.model.request.http.SimpleJdbcPayload",
        "log-stages": true,
        "log-level": "INFO",
        "request-channel": "jdbcReqChannel",
        "reply-channel": "jdbcRepChannel"
      }
    },
    {
      "componentName": "handler",
      "componentType": "processor",
      "componentParameters": {
        "order": 3,
        "handler-definition": {
          "componentName": "jdbc-outbound-adapter",
          "componentType": "app",
          "componentParameters": {
            "data-source": "pgDataSource",
            "query": "SELECT accountname, password, email, last_ip, banned, premium_points, premium_type FROM account_data d INNER JOIN account_info i ON d.id = i.id WHERE d.id = :payload.accountId",
            "update-query": "UPDATE account_info SET banned = true WHERE id = :payload.accountId",
            "jdbc-reply-channel": "jdbcRepChannel",
            "log-stages": true,
            "log-level": "INFO"
          }
        }
      }
    },
    {
      "componentName": "result",
      "componentType": "app",
      "componentParameters": {
        "order": 4,
        "cancel": false,
        "result-channel": "jdbcRepChannel"
      }
    }
  ]
}





测试:



1)我们使用



POST / stream / deploy 方法初始化一个新流,其中JSON将在请求正文中。



作为响应,如果一切正确,系统将必须发送,否则将显示错误消息:



{
    "status": "SUCCESS", -  
    "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b" -  
}


2)我们使用以下方法来启动启动:



GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / start,在此我们会更早地指示已初始化流的标识符。



作为响应,如果一切正确,系统将必须发送,否则将显示错误消息:



{
    "status": "SUCCESS", -  
}


3)通过系统中的标识符调用流?方式,地点和地点-在HttpRegistry模型的映射器中,我写了条件



Http.inboundGateway(localPath != null ? localPath : String.format("/stream/%s/call", uuid))


其中,考虑了http-inbound-path参数,如果未在组件的配置中明确指定,则将其忽略并设置系统调用路径。在我们的情况下,它将是:



POST / stream / ece4d4ac-3b46-4952-b0a6-8cf334074b99 / call-存在流标识符的地方,带有请求正文:



{
    "accountId": 1
}


作为响应,如果处理请求的各个阶段正常工作,我们将收到表account_data和account_info的记录的扁平结构。



{
    "accountname": "test",
    "password": "test",
    "email": "test@test",
    "last_ip": "127.0.0.1",
    "banned": true,
    "premium_points": 1000,
    "premium_type": 1
}


JdbcOutboundGateway适配器的特殊性在于,如果您指定update-query参数,则会注册一个附加处理程序,该处理程序首先更新数据,然后才通过查询参数进行获取。



如果您手动指定相同的路径,则由于在系统中不允许注册相似的路径,因此取消了在多个实例中使用HttpInboundGateway作为流的访问点启动组件的可能性。



4)让我们使用GET方法/流/ 2bf65d9d-97c6-4199-86aa-0c808c25071b /指标查看指标



回应内容
, / , / / :



[
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcReqChannel",
        "sendDuration": {
            "count": 1,
            "min": 153.414,
            "max": 153.414,
            "mean": 153.414,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 153.414,
        "minSendDuration": 153.414,
        "meanSendDuration": 153.414,
        "meanSendRate": 0.001195117818082359,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.2bf65d9d-97c6-4199-86aa-0c808c25071b.channel#2",
        "sendDuration": {
            "count": 1,
            "min": 0.1431,
            "max": 0.1431,
            "mean": 0.1431,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.1431,
        "minSendDuration": 0.1431,
        "meanSendDuration": 0.1431,
        "meanSendRate": 0.005382436008121413,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 0.0
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcRepChannel",
        "sendDuration": {
            "count": 1,
            "min": 0.0668,
            "max": 0.0668,
            "mean": 0.0668,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.0668,
        "minSendDuration": 0.0668,
        "meanSendDuration": 0.0668,
        "meanSendRate": 0.001195118373693797,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    }
]




结论



因此,该示例说明了如何花费更多的时间和精力来编写一个用于与各种系统集成的应用程序,而不是每次为与其他系统集成而在应用程序中编写额外的手动处理程序(管道),而每次编写200-500行代码。



在当前示例中,您可以通过唯一标识符并行化多个实例的同一类型线程的工作,避免在应用程序的全局上下文中线程依赖项(容器,通道等)之间发生冲突。



此外,您可以开发项目:



  • 将流保存到数据库;
  • 支持spring和spring-integration社区为我们提供的所有集成组件;
  • 使工人能够按计划执行线程工作;
  • 制作一个合理的UI,以使用条件“鼠标和组件多维数据集”配置流(顺便说一下,该示例在github.com/spring-cloud/spring-cloud-dataflow-ui项目中得到了部分改进)。


再一次,我将复制到存储库的链接



All Articles