Kafka和Kotlin的生产者/消费者

本文的翻译是在“ Kotlin后端开发”课程开始前夕准备的。








在本文中,我们将讨论如何使用Kafka和Kotlin创建一个简单的Spring Boot应用程序。



介绍



首先访问https://start.spring.io并添加以下依赖项:



Groovy



implementation("org.springframework.boot:spring-boot-starter-data-rest")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.apache.kafka:kafka-streams")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")


Gradle. Maven.



. IntelliJ IDEA.



Apache Kafka



Apache Kafka . Windows 10. Kafka «too many lines encountered». Kafka . , - Power Shell.



Kafka, :



Shell



.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
.\kafka-server-start.bat ..\..\config\server.properties


/bin/windows.



Kafka, Zookeeper. Zookeeper – Apache, .



Spring Boot



IDE , KafkaDemoApplication.kt. Spring, .



:



Kotlin



import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class KafkaDemoApplication 

fun main(args: Array<String>) {
   runApplication<KafkaDemoApplication>(*args)
}




. .



-, . KafkaController.kt. :



Kotlin



var kafkaTemplate:KafkaTemplate<String, String>? = null;
val topic:String = "test_topic"

@GetMapping("/send")
fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
    var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.send(topic, message)!!
    var sendResult: SendResult<String, String> = lf.get()
    return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic")
}


, test_topic, KafkaTemplate. ListenableFuture, . , .





Kafka – KafkaProducer. :



Kotlin



@GetMapping("/produce")
fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
    var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)

    val map = mutableMapOf<String, String>()
    map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"
    map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    map["bootstrap.servers"] = "localhost:9092"

    var producer = KafkaProducer<String, String>(map as Map<String, Any>?)
    var future:Future<RecordMetadata> = producer?.send(producerRecord)!!
    return ResponseEntity.ok(" message sent to " + future.get().topic());
}


.



KafkaProduce Map, . , StringSerializer.



, Serializer – Kafka, . Apache Kafka , ByteArraySerializer, ByteSerializer, FloatSerializer .



map StringSerializer.



Kotlin



map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"


– bootstrap-, Kafka.



Kotlin



map["bootstrap.servers"] = "localhost:9092"


, KafkaProducer.



ProducerRecord . :



Kotlin



var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)


:



Kotlin



var future:Future<RecordMetadata> = producer?.send(producerRecord)!!


future , .





, . . , , .

MessageConsumer.kt Service.



Kotlin



@KafkaListener(topics= ["test_topic"], groupId = "test_id")
fun consume(message:String) :Unit {
    println(" message received from topic : $message");
}


@KafkaListener , . , , .

GitHub.






«Backend- Kotlin»







All Articles