使用redis+lettuce实现消息队列和生产消费

准备

安装redis

使用Docker部署redis

docker run --name redis-mq -d -p 6379:6379 redis redis-server --appendonly yes

创建stream和group

安装redis-cli

apt install -y redis-cli

进入redis-cli

redis-cli -h localhost -p 6379

创建空的stream和对应的group

xgroup create testTask testGroup $ mkstream

代码实现

配置依赖

使用lettuce作为redis连接库

dependencies {
    implementation(kotlin("stdlib-jdk8"))
    implementation("io.lettuce:lettuce-core:5.2.1.RELEASE")
}

生产者

fun consume() {
    val client = RedisClient.create("redis://192.168.75.120:6379/0")
    val connection = client.connect()
    val commands = connection.sync()
    val consumer = io.lettuce.core.Consumer.from("testGroup", "testTask")
    val content = commands.xreadgroup(
        consumer, XReadArgs.StreamOffset.lastConsumed("testTask")
    )
    println(content.size)
    for (c in content) {
        for (k in c.body.keys) {
            println("$k: ${c.body[k]}")
        }
    }
}

消费者

fun main() {
    val client = RedisClient.create("redis://192.168.75.120:6379/0")
    val connection = client.connect()
    val commands = connection.sync()
    val body = HashMap<String, String>()
    for (i in 1..10) {
        body[i.toString()] = "test"
    }
    commands.xadd("testTask", body)
}
Author: SinLapis
Link: http://sinlapis.github.io/2020/01/29/使用redis-lettuce实现消息队列和生产消费/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.