listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
# 外网访问192.168.0.113:19092,内网访问:192.168.0.113:19093,当kafka部署在k8s时候就很有用
listeners=EXTERNAL://192.168.0.113:19092,INTERNAL://192.168.0.113:19093
inter.broker.listener.name=INTERNAL
# 如果advertised.listeners没配置就使用listeners的配置
#advertised.listeners=EXTERNAL://192.168.0.113:19092,INTERNAL://192.168.0.113:19093# listeners=<协议名称>://<内网ip>:<端口>
listeners=EXTERNAL://192.168.0.113:19092listeners=INSIDE://0.0.0.0:9092,OUTSIDE://<公网 ip>:端口(或者 0.0.0.0:端口)
advertised.listeners=INSIDE://localhost:9092,OUTSIDE://<宿主机ip>:<宿主机暴露的端口>
listener.security.protocol.map=INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT
kafka_inter_broker_listener_name:inter.broker.listener.name=INSIDEKSQL是一个用于Apache kafka的流式SQL引擎,KSQL在内部使用Kafka的Streams API,KSQL降低了进入流处理的门槛,提供了一个简单的、完全交互式的SQL接口,用于处理Kafka的数据,可以让我们在流数据上持续执行 SQL 查询,KSQL支持广泛的强大的流处理操作,包括聚合、连接、窗口、会话等等。官方文档:https://www.rittmanmead.com/blog/2017/10/ksql-streaming-sql-for-apache-kafka/


ksql支持kafka0.11之后的版本,在confluent的V3和V4版本中默认并没有加入ksql server程序,当然V3和V4是支持ksql的,在V5版本之后已经默认加入ksql了,这里选择最新版本7.1。其实Confluent 就是kafka的增加版,包含了kafka和zk。下载地址:https://packages.confluent.io/archive/
$ cd /opt/bigdata/hadoop/software
$ wget https://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz
$ tar -xf confluent-7.1.1.tar.gz -C /opt/bigdata/hadoop/server/$ vi /etc/profile
export CONFLUENT_HOME=/opt/bigdata/hadoop/server/confluent-7.1.1
export PATH=$CONFLUENT_HOME/bin:$PATH
$ source /etc/profile$ mkdir $CONFLUENT_HOME/etc/kafka/zookeeper_data $CONFLUENT_HOME/etc/kafka/zookeeper_logs $CONFLUENT_HOME/etc/kafka/logs$ cat > $CONFLUENT_HOME/etc/kafka/zookeeper.properties <<-EOF
# 配置主要修改如下:
#数据目录
dataDir=/opt/bigdata/hadoop/server/confluent-7.1.1/etc/kafka/zookeeper_data
#日志目录
# dataLogDir=/opt/bigdata/hadoop/server/confluent-7.1.1/etc/kafka/zookeeper_logs
#心跳间隔时间,zookeeper中使用的基本时间单位,毫秒值。每隔2秒发送一个心跳
tickTime=2000
#leader与客户端连接超时时间。表示5个心跳间隔
initLimit=5
#Leader与Follower之间的超时时间,表示2个心跳间隔
syncLimit=2
#客户端连接端口,默认端口2181
clientPort=12181
admin.enableServer=false
# admin.serverPort=8080
# zookeeper集群配置项,server.1,server.2,server.3是zk集群节点;hadoop-node1,hadoop-node2,hadoop-node3是主机名称;2888是主从通信端口;3888用来选举leader
server.1=hadoop-node1:2888:3888
server.2=hadoop-node2:2888:3888
server.3=hadoop-node3:2888:3888
EOF$ cat > $CONFLUENT_HOME/etc/kafka/server.properties <<-EOF
#broker的全局唯一编号,不能重复
broker.id=0
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
# broker 服务器要监听的地址及端口 . 默认是 localhost:9092 ,0.0.0.0的话 ,表示监听本机的所有ip地址.本机配置:
# localhost : 只监听本机的地址请求, 客户端也只能用 localhost 来请求
# 127.0.0.1 : 同localhost, 在请求上可能有与区分 , 看client的请求吧 . 客户端也只能用127.0.0.1来请求
# 192.168.0.1 : 你的局域网不一定是 192.168 段的.所以一般不选这个
# 0.0.0.0 : 本机的所有地址都监听 , 包含 localhost , 127.0.0.1, 及不同网卡的所有ip地址 , 都监听 .
listeners=EXTERNAL://0.0.0.0:19092,INTERNAL://0.0.0.0:19093
# 是暴露给外部的listeners,如果没有设置,会用listeners,参数的作用就是将Broker的Listener信息发布到Zookeeper中,注意其它节点得修改成本身的hostnaem或者ip,不支持0.0.0.0
advertised.listeners=EXTERNAL://hadoop-node1:19092,INTERNAL://hadoop-node1:19093
inter.broker.listener.name=INTERNAL
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka数据的存储位置
log.dirs=/opt/bigdata/hadoop/server/confluent-7.1.1/etc/kafka/logs
#topic在当前broker上的分区个数
num.partitinotallow=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.cnotallow=hadoop-node1:12181,hadoop-node2:12181,hadoop-node3:12181
#zookeeper连接超时时间
zookeeper.connection.timeout.ms=60000
EOF$ scp -r $CONFLUENT_HOME hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r $CONFLUENT_HOME hadoop-node2:/opt/bigdata/hadoop/server/【温馨提示】其它几点修改以下三点:
$ vi /etc/profile
export CONFLUENT_HOME=/opt/bigdata/hadoop/server/confluent-7.1.1
export PATH=$CONFLUENT_HOME/bin:$PATH
$ source /etc/profile# 在hadoop-node1配置如下:
$ echo 1 > $CONFLUENT_HOME/etc/kafka/zookeeper_data/myid
# 在hadoop-node2配置如下:
$ echo 2 > $CONFLUENT_HOME/etc/kafka/zookeeper_data/myid
# 在hadoop-node3配置如下:
$ echo 3 > $CONFLUENT_HOME/etc/kafka/zookeeper_data/myid$ vi $CONFLUENT_HOME/etc/ksqldb/ksql-server.properties
#修改对应的kafka的bootstrap server
bootstrap.servers=hadoop-node1:19092,hadoop-node2:19092,hadoop-node3:19092# 在kafka-server-start文件中添加export JMX_PORT="9988",端口自定义就行
$ vi $CONFLUENT_HOME/bin/kafka-server-start$ $KAFKA_HOME/bin/zookeeper-server-stop.sh
$ $KAFKA_HOME/bin/kafka-server-stop.sh$ cd $CONFLUENT_HOME
$ bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties$ cd $CONFLUENT_HOME
$ bin/kafka-server-start -daemon etc/kafka/server.properties# 默认端口8088,可以修改listeners字段来修改port
$ cd $CONFLUENT_HOME
$ ./bin/ksql-server-start ./etc/ksqldb/ksql-server.properties
# 后台启动
$ ./bin/ksql-server-start -daemon ./etc/ksqldb/ksql-server.properties
启动ksql cli端$ cd $CONFLUENT_HOME
$ ./bin/ksql http://0.0.0.0:8088
【温习提示】其实也可以使用外部的zk和kafkaconfluent自带了一个ksql-datagen工具,可以创建和产生相关的topic和数据,ksql-datagen可以指定的参数如下:
$ ksql-datagen
$ cd $CONFLUENT_HOME
$ ./bin/ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500 bootstrap-server=hadoop-node1:19092[1653124249561L] --> ([ 1653124249561L | 'User_3' | 'Page_82' ]) ts:1653124249561
[1653124249561L] --> ([ 1653124249561L | 'User_9' | 'Page_24' ]) ts:1653124249561
[1653124249561L] --> ([ 1653124249561L | 'User_9' | 'Page_91' ]) ts:1653124249561
[1653124249561L] --> ([ 1653124249561L | 'User_2' | 'Page_61' ]) ts:16531242495611653124249561L,User_3,Page_82
1653124249561L,User_9,Page_24
1653124249561L,User_9,Page_91
1653124249561L,User_2,Page_61$ cd $CONFLUENT_HOME
$ ./bin/ksql-datagen quickstart=users format=json topic=users maxInterval=100 bootstrap-server=hadoop-node1:19092['User_6'] --> ([ 1495933043739L | 'User_6' | 'Region_8' | 'OTHER' ]) ts:1653124467578
['User_3'] --> ([ 1489611795658L | 'User_3' | 'Region_7' | 'MALE' ]) ts:1653124467578
['User_5'] --> ([ 1496009798562L | 'User_5' | 'Region_2' | 'MALE' ]) ts:1653124467578{"registertime":1495933043739L,"userid":"User_6","regionid":"Region_8","gender":"OTHER"}
{"registertime":1489611795658L,"userid":"User_3","regionid":"Region_7","gender":"MALE"}
{"registertime":1496009798562L,"userid":"User_5","regionid":"Region_2","gender":"MALE"}$ cd $CONFLUENT_HOME
$ ./bin/ksql http://0.0.0.0:8088
# 【温馨提示】value_format 有三种格式 JSON(json格式)、DELIMITED(原生格式)、AVRO(Avro 格式是 Hadoop 的一种基于行的存储格式)
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
# 查看表详情
DESCRIBE pageviews_original;
# 删表
DROP STREAM pageviews_original;
# 查看topic
SHOW topics;
# 查看STREAMS
SHOW STREAMS;根据topic users创建一个table users_original,value_format为json,必须设置一个为主键,也可以指定副本很分区数,默认都是1,, PARTITIONS=1, REPLICAS=1
CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY) WITH (kafka_topic='users', value_format='JSON')
# 上面创建的表时不能直接查询数据的
SELECT * FROM USERS_ORIGINAL LIMIT 3;
CREATE TABLE QUERYABLE_USERS_ORIGINAL AS SELECT * FROM USERS_ORIGINAL;
# 查询数据
SELECT * FROM QUERYABLE_USERS_ORIGINAL LIMIT 5;
CREATE SINK CONNECTOR es_sink WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'topics' = 'clicks_transformed',
'key.ignore' = 'true',
'schema.ignore' = 'true',
'type.name' = '',
'connection.url' = 'http://hadoop-node1:9200');# 生成数据
$ cd $CONFLUENT_HOME
$ ./bin/ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500 bootstrap-server=hadoop-node1:19092
# 先删除
DROP STREAM pageviews_original;
# 创建查询
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
# 直接查
select * from pageviews_original limit 10;
# 持久化查询,PO会对应一个topic
CREATE STREAM PO AS SELECT userid FROM pageviews_original EMIT CHANGES;
# 查询新stream
SHOW STREAMS;
# 查询执行任务
SHOW QUERIES;
消费新数据$ kafka-console-consumer --bootstrap-server 192.168.0.113:19092 --from-beginning --topic POSHOW QUERIES;
TERMINATE CSAS_PV_15;
消费数据$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:19092 --topic test001 --from-beginning

我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于
我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h
我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po