草庐IT

如何使用Pykafka从主题中获取最新消息?

我一直在使用pykafka向主题传达一条消息producer.produce('test')我想收到最新消息。我在PykafkaGitHub页面上找到了一个解决方案,该解决方案建议:client=KafkaClient(hosts="xxxxxxx")topic=client.topics['mytopic']consumer=topic.get_simple_consumer(auto_offset_reset=OffsetType.LATEST,reset_offset_on_start=True)LAST_N_MESSAGES=2offsets=[(p,op.next_offset-LA

python - 如何使 kafka-python 或 pykafka 与 uwsgi 和 gevent 一起作为异步生产者工作?

我的Stack是带有gevents的uwsgi。我试图用装饰器包装我的api端点,以将所有请求数据(url、方法、正文和响应)推送到kafka主题,但它不起作用。我的理论是因为我正在使用gevents,并且我试图在异步模式下运行它们,实际上推送到kafka的异步线程无法与gevents一起运行。如果我尝试使方法同步,那么它也不起作用,它在生产worker中死亡,即在生产之后调用永远不会返回。尽管这两种方法在pythonshell上以及如果我在线程上运行uwsgi时都运行良好。遵循示例代码:1.使用kafka-python(异步)try:kafka_producer=KafkaProdu