我正在使用 Kafka 0.8.0 并尝试实现下面提到的场景。
JCA API(充当生产者并将数据发送到)-----> 消费者------> HBase
我在使用 JCA 客户端获取数据后立即将每条消息发送给消费者。例如,一旦生产者发送消息 1 ,我想从消费者那里获取相同的消息并将其“放入” HBase 中。但是我的消费者在一些随机的 n 条消息之后开始获取消息。我想让生产者和消费者同步,以便他们都开始一起工作。
我用过:
1 经纪人
1 个主题
1 个单一生产者和高级消费者
谁能建议我需要做什么才能达到同样的效果?
编辑:
添加一些相关的代码片段。
消费者.java
public class Consumer extends Thread {
private final ConsumerConnector consumer;
private final String topic;
PrintWriter pw = null;
int t = 0;
StringDecoder kd = new StringDecoder(null);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
Map<String, List<KafkaStream<String, Signal>>> consumerMap;
KafkaStream<String, Signal> stream;
ConsumerIterator<String, Signal> it;
public Consumer(String topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
topicCountMap.put(topic, new Integer(1));
consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
new VerifiableProperties()));
stream = consumerMap.get(topic).get(0);
it = stream.iterator();
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("fetch.size", "1024");
return new ConsumerConfig(props);
}
synchronized public void run() {
while (it.hasNext()) {
t = (it.next().message()).getChannelid();
System.out.println("In Consumer received msg" + t);
}
}
}
producer.java
public class Producer {
public final kafka.javaapi.producer.Producer<String, Signal> producer;
private final String topic;
private final Properties props = new Properties();
public Producer(String topic)
{
props.put("serializer.class", "org.bigdata.kafka.Serializer");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type userdefined Object .
producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
this.topic = topic;
}
}
KafkaProperties.java
public interface KafkaProperties {
final static String zkConnect = "127.0.0.1:2181";
final static String groupId = "group1";
final static String topic = "test00";
final static String kafkaServerURL = "localhost";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 100000;
final static int reconnectInterval = 10000;
final static String clientId = "SimpleConsumerDemoClient";
}
这就是消费者在前 10 条消息中的行为方式,它没有系统化消费者收到的那条消息,但从第 11 条消息开始,它开始正常运行。
producer sending msg1
producer sending msg2
producer sending msg3
producer sending msg4
producer sending msg5
producer sending msg6
producer sending msg7
producer sending msg8
producer sending msg9
producer sending msg10
producer sending msg11
producer sending msg12
In Consumer received msg12
producer sending msg13
In Consumer received msg13
producer sending msg14
In Consumer received msg14
producer sending msg15
In Consumer received msg15
producer sending msg16
In Consumer received msg16
producer sending msg17
In Consumer received msg17
producer sending msg18
In Consumer received msg18
producer sending msg19
In Consumer received msg19
producer sending msg20
In Consumer received msg20
producer sending msg21
In Consumer received msg21
已编辑:添加生产者向消费者发送消息的监听器功能。而且我使用的是默认的生产者配置,没有覆盖它
public synchronized void onValueChanged(final MonitorEvent event_) {
// Get the value from the DBR
try {
final DBR dbr = event_.getDBR();
final String[] val = (String[]) dbr.getValue();
producer1.producer.send(new KeyedMessage<String, Signal>
(KafkaProperties.topic,new Signal(messageNo)));
System.out.println("producer sending msg"+messageNo);
messageNo++;
} catch (Exception ex) {
ex.printStackTrace();
}
}
最佳答案
尝试将 props.put("request.required.acks", "1") 添加到生产者配置中。默认情况下,生产者不等待确认,并且不保证消息传递。因此,如果您在测试之前启动代理,生产者可能会在代理完全初始化之前开始发送消息,并且前几条消息可能会丢失。
尝试将 props.put("auto.offset.reset", "smallest") 添加到消费者配置中。相当于kafka-console-consumer.sh的--from-beginning选项。如果您的消费者启动时间晚于生产者并且 Zookeeper 中没有保存偏移量数据,则默认情况下它将仅开始消费新消息(请参阅文档中的 Consumer configs)。
关于java - 消费者在 Apache Kafka 中消费消息的延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21727150/
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=
我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi
这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/
HashMap中为什么引入红黑树,而不是AVL树呢1.概述开始学习这个知识点之前我们需要知道,在JDK1.8以及之前,针对HashMap有什么不同。JDK1.7的时候,HashMap的底层实现是数组+链表JDK1.8的时候,HashMap的底层实现是数组+链表+红黑树我们要思考一个问题,为什么要从链表转为红黑树呢。首先先让我们了解下链表有什么不好???2.链表上述的截图其实就是链表的结构,我们来看下链表的增删改查的时间复杂度增:因为链表不是线性结构,所以每次添加的时候,只需要移动一个节点,所以可以理解为复杂度是N(1)删:算法时间复杂度跟增保持一致查:既然是非线性结构,所以查询某一个节点的时候
如果我在模型中设置验证消息validates:name,:presence=>{:message=>'Thenamecantbeblank.'}我如何让该消息显示在闪光警报中,这是我迄今为止尝试过的方法defcreate@message=Message.new(params[:message])if@message.valid?ContactMailer.send_mail(@message).deliverredirect_to(root_path,:notice=>"Thanksforyourmessage,Iwillbeintouchsoon")elseflash[:error]