Apache Kafka和使用Kafka Server进行测试

介绍



有多种使用Apache Kafka编写测试的方法。例如,可以使用TestContainers和EmbeddedKafka。例如,您可以在这里阅读有关此内容:测试Kafka Streams的陷阱但是,还有一个使用KafkaServer编写测试的选项。



将测试什么?



假设您需要开发一种用于通过各种渠道发送消息的服务:电子邮件,电报等。



将服务名称设为:SenderService。



该服务应:侦听给定的频道,从该频道中选择所需的消息,解析消息,然后通过所需的通道发送它们以最终传递消息。



要测试该服务,必须形成一条要使用邮件发送通道发送的消息,并确保该消息已发送到最终通道。

当然,在实际应用中,测试将更加困难。但是为了说明选择的方法,这样的测试就足够了。



该服务和测试使用以下语言实现:Java 1.8,Kafka 2.1.0,JUnit 5.5.2,Maven 3.6.1。



服务



该服务将能够启动和停止其工作。



void start()

void stop()


首先,您必须至少设置以下参数:



String bootstrapServers
String senderTopic
EmailService emailService


bootstrapServers – kafka.

senderTopic – , .

emailService – .



.



«», , . «» . «» : Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client.



Collection<AutoCloseable> closeables = new ArrayList<>();
ExecutorService senderTasksExecutor = Executors.newFixedThreadPool(senderTasksN);
ExecutorService tasksExecutorService = Executors.newFixedThreadPool(tasksN);
for (int i = 0; i < senderTasksN; i++) {
    SenderConsumerLoop senderConsumerLoop =
            new SenderConsumerLoop(
                    bootstrapServers,
                    senderTopic,
                    "sender",
                    "sender",
                    tasksExecutorService,
                    emailService
            );
    closeables.add(senderConsumerLoop);
    senderTasksExecutor.submit(senderConsumerLoop);
}


«», .



«» . .



Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    for (AutoCloseable autoCloseable : closeables) {
        try {
            autoCloseable.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    senderTasksExecutor.shutdown();
    tasksExecutorService.shutdown();
    stop();
    try {
        senderTasksExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}));


.



«»



«» :



void run()

void close()


: run.



@Override
public void run() {
    kafkaConsumer = createKafkaConsumerStringString(bootstrapServers, clientId, groupId);
    kafkaConsumer.subscribe(Collections.singleton(topic));
    while (true) {
        calculate(kafkaConsumer.poll(Duration.ofSeconds(1)));
    }
}


«kafka-». «kafka-» . . .



json- , , .



:



{
  "subject": {
    "subject_type": "send"
  },
  "body": {
    "method": "email",
    "recipients": "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml",
    "title": "42",
    "message": "73"
  }
}


subject_type — . «send».

method – . «email» — .

recipients – .

title – .

message – .



:



void calculate(ConsumerRecords<String, String> records) {
    for (ConsumerRecord<String, String> record : records) {
        calculate(record);
    }
}


:



void calculate(ConsumerRecord<String, String> record) {
            JSONParser jsonParser = new JSONParser();
            Object parsedObject = null;
            try {
                parsedObject = jsonParser.parse(record.value());
            } catch (ParseException e) {
                e.printStackTrace();
            }
            if (parsedObject instanceof JSONObject) {
                JSONObject jsonObject = (JSONObject) parsedObject;
                JSONObject jsonSubject = (JSONObject) jsonObject.get(SUBJECT);
                String subjectType = jsonSubject.get(SUBJECT_TYPE).toString();
                if (SEND.equals(subjectType)) {
                    JSONObject jsonBody = (JSONObject) jsonObject.get(BODY);
                    calculate(jsonBody);
                }
            }
        }


:



void calculate(JSONObject jsonBody) {
    String method = jsonBody.get(METHOD).toString();
    if (EMAIL_METHOD.equals(method)) {
        String recipients = jsonBody.get(RECIPIENTS).toString();
        String title = jsonBody.get(TITLE).toString();
        String message = jsonBody.get(MESSAGE).toString();
        sendEmail(recipients, title, message);
    }
}


:



void sendEmail(String recipients, String title, String message) {
    tasksExecutorService.submit(() -> emailService.send(recipients, title, message));
}


.



.



«kafka-»:



static KafkaConsumer<String, String> createKafkaConsumerStringString(
        String bootstrapServers,
        String clientId,
        String groupId
) {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new KafkaConsumer<>(properties);
}


:



interface EmailService {
    void send(String recipients, String title, String message);
}




.

«kafka-».

«kafka-».

.



«kafka-». .



public class SenderServiceTest {
    @Test
    void consumeEmail() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String senderTopic = "sender_data";
        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
            kafkaServerService.start();
            kafkaServerService.createTopic(senderTopic);

        }
    }
}


. «kafka-». «kafka-» . .



«mock» :



SenderService.EmailService emailService = mock(SenderService.EmailService.class);


:



SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start();


:



String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";


:



kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));


:



Thread.sleep(6000);


, :



verify(emailService).send(recipients, title, message);


:



senderService.stop();


:



public class SenderServiceTest {
    @Test
    void consumeEmail() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String senderTopic = "sender_data";
        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
            kafkaServerService.start();
            kafkaServerService.createTopic(senderTopic);
            SenderService.EmailService emailService = mock(SenderService.EmailService.class);
            SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
            senderService.start();
            String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
            String title = "42";
            String message = "73";
            kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
            Thread.sleep(6000);
            verify(emailService).send(recipients, title, message);
            senderService.stop();
        }
    }
}


:



public class SenderFactory {
    public static final String SUBJECT = "subject";
    public static final String SUBJECT_TYPE = "subject_type";
    public static final String BODY = "body";
    public static final String METHOD = "method";
    public static final String EMAIL_METHOD = "email";
    public static final String RECIPIENTS = "recipients";
    public static final String TITLE = "title";
    public static final String MESSAGE = "message";
    public static final String SEND = "send";

    public static String key() {
        return UUID.randomUUID().toString();
    }

    public static String createMessage(String method, String recipients, String title, String message) {
        Map<String, Object> map = new HashMap<>();
        Map<String, Object> subject = new HashMap<>();
        Map<String, Object> body = new HashMap<>();
        map.put(SUBJECT, subject);
        subject.put(SUBJECT_TYPE, SEND);
        map.put(BODY, body);
        body.put(METHOD, method);
        body.put(RECIPIENTS, recipients);
        body.put(TITLE, title);
        body.put(MESSAGE, message);
        return JSONObject.toJSONString(map);
    }
}


«kafka-»



:



void start()

void close()

void createTopic(String topic)


«start» .



创建一个“ zookeeper”并保存其地址:



zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();


创建一个zookeeper客户端:



zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);


设置服务器的属性:



Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
    brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
    throw new RuntimeException(e);
}
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);


服务器创建:



kafkaServer = TestUtils.createServer(config, new MockTime());


一起:



public void start() {
    zkServer = new EmbeddedZookeeper();
    String zkConnect = zkHost + ":" + zkServer.port();
    zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
    zkUtils = ZkUtils.apply(zkClient, false);
    Properties brokerProps = new Properties();
    brokerProps.setProperty("zookeeper.connect", zkConnect);
    brokerProps.setProperty("broker.id", "0");
    try {
        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
    brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
    brokerProps.setProperty("offsets.topic.replication.factor", "1");
    KafkaConfig config = new KafkaConfig(brokerProps);
    kafkaServer = TestUtils.createServer(config, new MockTime());
}


停止服务:



@Override
public void close() {
    kafkaServer.shutdown();
    zkClient.close();
    zkServer.shutdown();
}


主题创建:



public void createTopic(String topic) {
    AdminUtils.createTopic(
            zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
}


结论



总之,应该指出的是,此处给出的代码仅说明了所选择的方法。



要使用“ kafka”创建和测试服务,可以参考以下资源:

kafka-streams-examples



链接与资源



资源



用于使用“ kafka服务器”进行测试的代码




All Articles