显然,我希望做的事情超出了 thrift 的范围...如果我确保端口上的客户端永远不会超过一个,那么一切都很好。当然,这种做法违背了目的,因为我希望为服务器打开多个可重用连接以缩短响应时间并降低开销。
如果有人建议用另一种方法来实现这一点,我们将不胜感激(或者如果我的结论是错误的)
我有一个多组件应用程序,主要通过 thrift 连接(主要是 java->php 连接)。
到目前为止一切似乎都很好,但是引入了 Java->Java 连接,其中客户端是一个每秒可以发起数百个请求的 servlet。
被访问的方法有如下接口(interface):
bool pvCheck(1:i32 toolId) throws(1:DPNoToolException nte),
为了确保它在服务端没有什么奇怪的,我用一个简单的实现替换了实现:
@Override
public boolean pvCheck(int toolId) throws TException {
//boolean ret = api.getViewsAndDec(toolId);
return true;
}
只要连接不多,一切都很好,但一旦连接靠得很近,连接就会开始卡在阅读器中。
如果我在调试器中提取其中一个,堆栈看起来像这样:
Daemon Thread [http-8080-197] (Suspended)
BufferedInputStream.read(byte[], int, int) line: 308
TSocket(TIOStreamTransport).read(byte[], int, int) line: 126
TSocket(TTransport).readAll(byte[], int, int) line: 84
TBinaryProtocol.readAll(byte[], int, int) line: 314
TBinaryProtocol.readI32() line: 262
TBinaryProtocol.readMessageBegin() line: 192
DumboPayment$Client.recv_pvCheck() line: 120
DumboPayment$Client.pvCheck(int) line: 105
Receiver.performTask(HttpServletRequest, HttpServletResponse) line: 157
Receiver.doGet(HttpServletRequest, HttpServletResponse) line: 109
Receiver(HttpServlet).service(HttpServletRequest, HttpServletResponse) line: 617
Receiver(HttpServlet).service(ServletRequest, ServletResponse) line: 717
ApplicationFilterChain.internalDoFilter(ServletRequest, ServletResponse) line: 290
ApplicationFilterChain.doFilter(ServletRequest, ServletResponse) line: 206
StandardWrapperValve.invoke(Request, Response) line: 233
StandardContextValve.invoke(Request, Response) line: 191
StandardHostValve.invoke(Request, Response) line: 127
ErrorReportValve.invoke(Request, Response) line: 102
StandardEngineValve.invoke(Request, Response) line: 109
CoyoteAdapter.service(Request, Response) line: 298
Http11AprProcessor.process(long) line: 859
Http11AprProtocol$Http11ConnectionHandler.process(long) line: 579
AprEndpoint$Worker.run() line: 1555
Thread.run() line: 619
这似乎是由数据损坏触发的,因为我遇到以下异常:
10/11/22 18:38:55 WARN logger.Receiver: pvCheck had an exception
org.apache.thrift.TApplicationException: pvCheck failed: unknown result
at *.thrift.generated.DumboPayment$Client.recv_pvCheck(DumboPayment.java:135)
at *.thrift.generated.DumboPayment$Client.pvCheck(DumboPayment.java:105)
at *.Receiver.performTask(Receiver.java:157)
at *.Receiver.doGet(Receiver.java:109)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:617)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:717)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:298)
at org.apache.coyote.http11.Http11AprProcessor.process(Http11AprProcessor.java:859)
at org.apache.coyote.http11.Http11AprProtocol$Http11ConnectionHandler.process(Http11AprProtocol.java:579)
at org.apache.tomcat.util.net.AprEndpoint$Worker.run(AprEndpoint.java:1555)
at java.lang.Thread.run(Thread.java:619)
和
10/11/22 17:59:46 ERROR [/ninja_ar].[Receiver]: サーブレット Receiver のServlet.service()が例外を投げました
java.lang.OutOfMemoryError: Java heap space
at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:296)
at org.apache.thrift.protocol.TBinaryProtocol.readString(TBinaryProtocol.java:290)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:198)
at *.thrift.generated.DumboPayment$Client.recv_pvCheck(DumboPayment.java:120)
at *.thrift.generated.DumboPayment$Client.pvCheck(DumboPayment.java:105)
at *.Receiver.performTask(Receiver.java:157)
at *.Receiver.doGet(Receiver.java:109)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:690)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:269)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:188)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:210)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:172)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:117)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:108)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:151)
at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:870)
at org.apache.coyote.http11.Http11BaseProtocol$Http11ConnectionHandler.processConnection(Http11BaseProtocol.java:665)
at org.apache.tomcat.util.net.PoolTcpEndpoint.processSocket(PoolTcpEndpoint.java:528)
at org.apache.tomcat.util.net.LeaderFollowerWorkerThread.runIt(LeaderFollowerWorkerThread.java:81)
at org.apache.tomcat.util.threads.ThreadPool$ControlRunnable.run(ThreadPool.java:685)
at java.lang.Thread.run(Thread.java:636)
也许我离题太远了,但我很确定这些与客户端在没有任何内容发送时继续尝试阅读有关。
服务端和客户端都使用java二进制协议(protocol)。
我写了一个简单的客户端池类,让我重用客户端,这些是主要功能:
public synchronized Client getClient() {
if(clientQueue.isEmpty()) {
return newClient();
} else {
return clientQueue.getLast();
}
}
private synchronized Client newClient() {
int leftToTry = serverArr.length;
Client cli = null;
while(leftToTry > 0 && cli == null) {
log.info("Creating new connection to " +
serverArr[roundRobinPos] + port);
TTransport transport = new TSocket(serverArr[roundRobinPos], port);
TProtocol protocol = new TBinaryProtocol(transport);
cli = new Client(protocol);
try {
transport.open();
} catch (TTransportException e) {
cli = null;
log.warn("Failed connection to " +
serverArr[roundRobinPos] + port);
}
roundRobinPos++;
if(roundRobinPos >= serverArr.length) {
roundRobinPos = 0;
}
leftToTry--;
}
return cli;
}
public void returnClient(Client cli) {
clientQueue.addFirst(cli);
}
客户端应用程序(即 tomcat servlets)通过以下方式访问它:
Client dpayClient = null;
if(dpay != null
&& (dpayClient = dpay.getClient()) != null) {
try {
dpayClient.pvCheck(requestParameters.getId());
} catch (DPNoToolException e) {
return;
} catch (TException e) {
log.warn("pvCheck had an exception", e);
} finally {
if(dpayClient != null) {
dpay.returnClient(dpayClient);
}
}
}
实际的 thrift 连接是通过以下方式上线的
private boolean initThrift(int port, Configuration conf) {
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
DPaymentHandler handler = new DPaymentHandler(conf);
DumboPayment.Processor processor =
new DumboPayment.Processor(handler);
InetAddress listenAddress;
try {
listenAddress = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
LOG.error("Failed in thrift init", e);
return false;
}
TServerTransport serverTransport;
try {
serverTransport = new TServerSocket(
new InetSocketAddress(listenAddress, port));
} catch (TTransportException e) {
LOG.error("Failed in thrift init", e);
return false;
}
TTransportFactory transportFactory = new TTransportFactory();
TServer server = new TThreadPoolServer(processor, serverTransport,
transportFactory, protocolFactory);
LOG.info("Starting Dumbo Payment thrift server on " +
listenAddress + ":" + Integer.toString(port));
server.serve();
return true;
}
在这上面停留了一段时间...很可能我遗漏了一些明显的东西。如果能提供任何帮助,我将不胜感激。
如果需要任何其他信息,请告诉我。那里有一大堆东西,所以我想尽量保持最(希望)相关的东西。
最佳答案
我的猜测是你有多个线程试图同时使用客户端,我不完全确定它是防弹的。您可能会尝试使用异步接口(interface)以及构建线程安全资源池来访问您的客户端。
使用 Thrift-0.5.0.0,这里是为您的 thrift 生成的代码创建 AsyncClient 的示例:
Factory fac = new AsyncClient.Factory(new TAsyncClientManager(), new TProtocolFactory() {
@Override
public TProtocol getProtocol( TTransport trans ) {
return new TBinaryProtocol(trans);
}
});
AsyncClient cl = fac.getAsyncClient( new TNonblockingSocket( "127.0.0.1", 12345 ));
但是,如果您查看源代码,您会注意到它有一个单线程消息处理程序,即使它使用 NIO 套接字,您可能会发现这是一个瓶颈。要获得更多,您必须创建更多异步客户端、检查它们并正确返回它们。
为了简化这个,我制作了一个快速的小类来管理它们。要修改它以满足您的需要,您唯一需要做的就是包含您的包,它应该适合您,即使我还没有真正测试过它(根本没有,真的):
public class Thrift {
// This is the request
private static abstract class ThriftRequest {
private void go( final Thrift thrift, final AsyncClient cli ) {
on( cli );
thrift.ret( cli );
}
public abstract void on( AsyncClient cli );
}
// Holds all of our Async Clients
private final ConcurrentLinkedQueue<AsyncClient> instances = new ConcurrentLinkedQueue<AsyncClient>();
// Holds all of our postponed requests
private final ConcurrentLinkedQueue<ThriftRequest> requests = new ConcurrentLinkedQueue<ThriftRequest>();
// Holds our executor, if any
private Executor exe = null;
/**
* This factory runs in thread bounce mode, meaning that if you call it from
* many threads, execution bounces between calling threads depending on when
* execution is needed.
*/
public Thrift(
final int clients,
final int clients_per_message_processing_thread,
final String host,
final int port ) throws IOException {
// We only need one protocol factory
TProtocolFactory proto_fac = new TProtocolFactory() {
@Override
public TProtocol getProtocol( final TTransport trans ) {
return new TBinaryProtocol( trans );
}
};
// Create our clients
Factory fac = null;
for ( int i = 0; i < clients; i++ ) {
if ( fac == null || i % clients_per_message_processing_thread == 0 ) {
fac = new AsyncClient.Factory(
new TAsyncClientManager(),
proto_fac );
}
instances.add( fac.getAsyncClient( new TNonblockingSocket(
host,
port ) ) );
}
}
/**
* This factory runs callbacks in whatever mode the executor is setup for,
* not on calling threads.
*/
public Thrift( Executor exe,
final int clients,
final int clients_per_message_processing_thread,
final String host,
final int port ) throws IOException {
this( clients, clients_per_message_processing_thread, host, port );
this.exe = exe;
}
// Call this to grab an instance
public void
req( final ThriftRequest req ) {
final AsyncClient cli;
synchronized ( instances ) {
cli = instances.poll();
}
if ( cli != null ) {
if ( exe != null ) {
// Executor mode
exe.execute( new Runnable() {
@Override
public void run() {
req.go( Thrift.this, cli );
}
} );
} else {
// Thread bounce mode
req.go( this, cli );
}
return;
}
// No clients immediately available
requests.add( req );
}
private void ret( final AsyncClient cli ) {
final ThriftRequest req;
synchronized ( requests ) {
req = requests.poll();
}
if ( req != null ) {
if ( exe != null ) {
// Executor mode
exe.execute( new Runnable() {
@Override
public void run() {
req.go( Thrift.this, cli );
}
} );
} else {
// Thread bounce mode
req.go( this, cli );
}
return;
}
// We did not need this immediately, hold onto it
instances.add( cli );
}
}
如何使用它的示例:
// Make the pool
Thrift t = new Thrift( 10, "localhost", 8000 );
// Use the pool
t.req( new ThriftRequest() {
@Override
public void on( AsyncClient cli ) {
cli.MyThriftMethod( "stringarg", 111, new AsyncMethodCallback<AsyncClient.MyThriftMethod_call>() {
@Override
public void onError( Throwable throwable ) {
}
@Override
public void onComplete( MyThriftMethod_call response ) {
}
});
}
} );
您可能想尝试不同的服务器模式,例如 THsHaServer,以了解哪种模式最适合您的环境。
关于java - thrift 的线程安全性如何?重新 : I seem to have requests disrupting one another,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4244350/
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/
我有一个对象has_many应呈现为xml的子对象。这不是问题。我的问题是我创建了一个Hash包含此数据,就像解析器需要它一样。但是rails自动将整个文件包含在.........我需要摆脱type="array"和我该如何处理?我没有在文档中找到任何内容。 最佳答案 我遇到了同样的问题;这是我的XML:我在用这个:entries.to_xml将散列数据转换为XML,但这会将条目的数据包装到中所以我修改了:entries.to_xml(root:"Contacts")但这仍然将转换后的XML包装在“联系人”中,将我的XML代码修改为
我有一大串格式化数据(例如JSON),我想使用Psychinruby同时保留格式转储到YAML。基本上,我希望JSON使用literalstyle出现在YAML中:---json:|{"page":1,"results":["item","another"],"total_pages":0}但是,当我使用YAML.dump时,它不使用文字样式。我得到这样的东西:---json:!"{\n\"page\":1,\n\"results\":[\n\"item\",\"another\"\n],\n\"total_pages\":0\n}\n"我如何告诉Psych以想要的样式转储标量?解