草庐IT

同步到Kafka

全部标签

java - 如何选择一个Kafka transaction.id

我想知道我能否在理解Kafka中的交易方面获得帮助,尤其是如何使用transaction.id。这里是上下文:我的Kafka应用程序遵循以下模式:使用来自输入主题的消息,进行处理,然后发布到输出主题。我不使用KafkaStreamsAPI。我在一个消费者组中有多个消费者,每个消费者都在自己的轮询线程中。有一个带有工作线程的线程池,该线程用于执行消息处理并将其发布到输出主题。目前,每个线程都有自己的生产者实例。我正在使用已发布的事务API,以确保消耗偏移量的更新和对输出主题的发布原子地进行到目前为止,我的假设包括:如果我的进程在中间事务中崩溃,那么该事务中的任何内容都不会发布,也不会消耗

Java 并发 - 为什么同步 setter(而不是 getter)不能使类线程安全?

这个问题在这里已经有了答案:关闭12年前。PossibleDuplicate:ThreadsafetyinJavaclass我正在阅读Java并发实践,我遇到了一个令我困惑的例子。作者声明这个类不是线程安全的publicclassMutableInteger{privateintnumber;publicintgetInt(){returnnumber;}publicvoidsetInt(intval){number=val;}}而且他们还声明仅同步一种方法(例如setter)是不行的;你必须同步两者。我的问题是:为什么?同步setter不行吗?

java - 同步方法的其他方式

除了使用synchronized关键字之外,如何在java中同步方法? 最佳答案 您可以使用java.util.concurrent.locks包,尤其是Lock接口(interface):Lockl=...;l.lock();try{//accesstheresourceprotectedbythislock}finally{l.unlock();}参见here. 关于java-同步方法的其他方式,我们在StackOverflow上找到一个类似的问题: ht

Kafka 之 AdminClient 配置

目录一.前言二.AdminClient原理和功能2.1.AdminClient原理2.2. AdminClient功能三. AdminClient配置四. Kafka>=2.0.0 版本五.Kafka>= 2.1.0版本六. Kafka>=2.7 版本一.前言  一般情况下,我们都习惯使用kafka-topics.sh脚本来管理主题,但有些时候我们希望将主题管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用API的方式去实现。  Kafka社区于0.11版本正式推出了Java客户端版的AdminClient,并不断地在后续的版本中对它进行完善

java - ReentrantLock 与 CPU 级别同步?

'ReentrantLock'和'synchronized'在CPU级别上的实现方式有区别吗?还是他们使用相同的“CAS”方法? 最佳答案 如果我们谈论的是ReentrantLock与synchronized(也称为“内部锁”),那么最好看看Lockdocumentation:AllLockimplementationsmustenforcethesamememorysynchronizationsemanticsasprovidedbythebuilt-inmonitorlock:Asuccessfullockoperationa

java - Kafka - TimestampExtractor 的问题

我使用org.apache.kafka:kafka-streams:0.10.0.1我正在尝试使用一个基于时间序列的流,它似乎不会触发KStream.Process()来触发(“标点符号”)。(引用here)在KafkaStreams配置中,我传递了这个参数(以及其他参数):config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,EventTimeExtractor.class.getName());这里,EventTimeExtractor是一个自定义时间戳提取器(实现了org.apache.kafka.streams.pr

java - Spring Kafka - 如何使用组 ID 将偏移量重置为最新?

我目前正在使用SpringIntegrationKafka做实时统计。但是,组名使Kafka搜索监听器未读取的所有先前值。@Value("${kafka.consumer.group.id}")privateStringconsumerGroupId;@BeanpublicConsumerFactoryconsumerFactory(){returnnewDefaultKafkaConsumerFactory(getDefaultProperties());}publicMapgetDefaultProperties(){Mapproperties=newHashMap();prope

【AI】文本转语音 变声 音色克隆 数字人音视频口型同步AI应用

文本转语音项目地址:https://github.com/coqui-ai/TTS环境安装:下载项目;安装Python,安装项目依赖:pipinstallTTS1.下载安装AI模型:https://github.com/facebookresearch/fairseq/tree/main/examples/mms模型文件放到:C:\Users\Administrator\AppData\Local\tts2.将文本转换为语音:tts--text“要转换的文本内容”--model_name“指定语音模型”--out_path.\outFile.wav 语音模型可通过命令tts–list_mode

java - 我应该同步静态易失变量吗?

关于这个主题有几个问题,但大多数都绕过这个问题,因为这不是问题的意图。如果我的类中有静态volatile:privatestaticvolatileMyObjobj=null;在下面的方法中我这样做:publicMyObjgetMyObj(){if(obj==null){obj=newMyObj();//costlyinitialisation}returnobj;}我是否需要同步以确保只有一个线程写入该字段,或者任何写入是否立即对评估obj==null条件的其他线程可见?换句话说:volatile是否让您不必同步对静态变量的写入访问? 最佳答案

java - 如何在kafka中创建自定义序列化器?

只有少数序列化程序可用,例如,org.apache.kafka.common.serialization.StringSerializer我们如何创建自己的自定义序列化程序? 最佳答案 这里有一个使用您自己的序列化器/反序列化器来获取Kafka消息值的示例。对于Kafka消息key是一样的。我们希望将MyMessage的序列化版本作为Kafka值发送,并在消费者端再次将其反序列化为MyMessage对象。在生产者端序列化MyMessage。您应该创建一个实现org.apache.kafka.common.serialization.