草庐IT

java - 带有自定义 CacheStoreAdapter 的 Apache Ignite 可用性问题

coder 2024-03-08 原文

我正在使用 apache ignite 进行 PoC。这是我正在测试的场景:

  1. 启动一个由 3 个节点和一个客户端组成的集群。
  2. 调用获取 key 。我登录到缓存此 key 的节点。
  3. 调用获取 key 。我验证它获得了储值。
  4. 执行 loadCache()。所有节点都报告成功加载缓存。
  5. 杀死最初加载key的节点
  6. 重启我刚刚杀死的节点。
  7. 再次查询key

第 6 步和第 7 步有些问题。如果我在两者之间等待足够长的时间,一切都会正常进行。但是,如果尝试将 6 和 7 靠得太近,那么我会得到 this error on the clientthis error on the node .

我看到错误 IgniteClientDisconnectedException: Failed to wait for topology update, client disconnected. 但是有办法避免这个问题吗?设置更长的等待拓扑更新的时间并不是真正的选择,因为客户端可能随时尝试连接。这与我的集群配置有关吗?我看到了this documentation这意味着无限地尝试连接,这似乎只会不断出错。

此外,我们还需要能够动态地扩大/缩小集群。这可能吗?内存备份会修复功能吗?

请注意,如果我省略第 6 步,我没有看到它失败。

集群节点配置

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--<import resource="./cache.xml"/>-->
    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="peerClassLoadingEnabled" value="true"/>

        <property name="cacheConfiguration">
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <!-- Set a cache name. -->
                <property name="name" value="recordData"/>
                <!--<property name="rebalanceMode" value="SYNC"/>-->
                <!-- Set cache mode. -->
                <property name="cacheMode" value="PARTITIONED"/>

                <property name="cacheStoreFactory">
                    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                        <constructor-arg value="Application.RecordDataStore"/>
                    </bean>
                </property>
                <property name="readThrough" value="true"/>
                <property name="writeThrough"  value="true"/>

            </bean>
        </property>

        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <!-- Override local port. -->
                <property name="localPort" value="8000"/>
            </bean>
        </property>

        <property name="communicationSpi">
            <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
                <!-- Override local port. -->
                <property name="localPort" value="8100"/>
            </bean>
        </property>
    </bean>
</beans>

客户端配置

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
    <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <!-- Set to true to enable distributed class loading for examples, default is false. -->
        <property name="peerClassLoadingEnabled" value="true"/>
        <property name="clientMode" value="true"/>

        <property name="cacheConfiguration">
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <!-- Set a cache name. -->
                <property name="name" value="recordData"/>
                <!--<property name="rebalanceMode" value="SYNC"/>-->

                <!-- Set cache mode. -->
                <property name="cacheMode" value="PARTITIONED"/>

                <property name="cacheStoreFactory">
                    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                        <constructor-arg value="com.digitaslbi.idiom.util.RecordDataStore"/>
                    </bean>
                </property>
                <property name="readThrough" value="true"/>
                <property name="writeThrough"  value="true"/>

            </bean>
        </property>

        <!-- Enable task execution events for examples. -->
        <property name="includeEventTypes">
            <list>
                <!--Task execution events-->
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
                <!--Cache events-->
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
            </list>
        </property>

        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <!--
                        Ignite provides several options for automatic discovery that can be used
                        instead os static IP based discovery. For information on all options refer
                        to our documentation: http://apacheignite.readme.io/docs/cluster-config
                    -->
                    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <!-- In distributed environment, replace with actual host IP address. -->
                                <value>localhost:8000..8099</value>
                                <!--<value>127.0.0.1:47500..47509</value>-->
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

CacheStoreAdaptor 的实现方法

public class RecordDataStore extends CacheStoreAdapter<Long, List<Record>> {

  // This method is called whenever "get(...)" methods are called on IgniteCache.
    @Override public List<Record> load(Long key) {
        System.out.println("Load data for pel: " + key);
        try {
            CouchDbConnector db = RecordDataStore.getDb();
            ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all");
            List<Record> list = db.queryView(viewQuery,Record.class);
            HashMultimap<Long,Record> multimap = HashMultimap.create();

            list.forEach(r -> {
                multimap.put(r.getId(),r);
            });
            return new LinkedList<>(multimap.get(key));
        } catch (MalformedURLException e) {
            throw new CacheLoaderException("Failed to load values from cache store.", e);
        }
    }
    ....
    @Override public void loadCache(IgniteBiInClosure<Long, List<Record>> clo, Object... args) {
        if (args == null || args.length == 0 || args[0] == null) {
            throw new CacheLoaderException("Expected entry count parameter is not provided.");
        }

        System.out.println("Loading Cache...");
        final long entryCnt = (Long)args[0];

        try{
            CouchDbConnector db = RecordDataStore.getDb();
            ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all");
            List<Record> list = db.queryView(viewQuery,Record.class);
            HashMultimap<Long,Record> multimap = HashMultimap.create();

            long count = 0;
            for(Record r : list) {
                multimap.put(r.getPel(),r);
                count++;
                if(count == entryCnt)
                    break;
            }

            multimap.keySet().forEach(key -> {
                clo.apply(key,new LinkedList<>(multimap.get(key)));
            });
        }
        catch (MalformedURLException e) {
            throw new CacheLoaderException("Failed to load values from cache store.", e);
        }

        System.out.println("Loaded Cache");
    }

    public static CouchDbConnector getDb() throws MalformedURLException {
        HttpClient httpClient = new StdHttpClient.Builder()
            .url("server:1111/")
            .build();

        CouchDbInstance dbInstance = new StdCouchDbInstance(httpClient);
        CouchDbConnector db = new StdCouchDbConnector("ignite", dbInstance);

        return db;
    }
}

最佳答案

http://apache-ignite-users.70518.x6.nabble.com/Ignite-cluster-recovery-after-network-partition-td2775.html强调 IgniteClientDisconnectedException提供一个 IgniteFuture可以通过调用访问

IgniteFuture f = myException.reconnectFuture();

那个 future 有一个get() -方法,等待节点重新连接:

Synchronously waits for completion of the computation and returns computation result.

因此,当客户端重新连接时,以下操作应该完成:

f.get();

关于java - 带有自定义 CacheStoreAdapter 的 Apache Ignite 可用性问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38753264/

有关java - 带有自定义 CacheStoreAdapter 的 Apache Ignite 可用性问题的更多相关文章

  1. ruby - Facter::Util::Uptime:Module 的未定义方法 get_uptime (NoMethodError) - 2

    我正在尝试设置一个puppet节点,但ruby​​gems似乎不正常。如果我通过它自己的二进制文件(/usr/lib/ruby/gems/1.8/gems/facter-1.5.8/bin/facter)在cli上运行facter,它工作正常,但如果我通过由ruby​​gems(/usr/bin/facter)安装的二进制文件,它抛出:/usr/lib/ruby/1.8/facter/uptime.rb:11:undefinedmethod`get_uptime'forFacter::Util::Uptime:Module(NoMethodError)from/usr/lib/ruby

  2. ruby-on-rails - Rails 3.2.1 中 ActionMailer 中的未定义方法 'default_content_type=' - 2

    我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer

  3. ruby-on-rails - form_for 中不在模型中的自定义字段 - 2

    我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢

  4. ruby - 主要 :Object when running build from sublime 的未定义方法 `require_relative' - 2

    我已经从我的命令行中获得了一切,所以我可以运行rubymyfile并且它可以正常工作。但是当我尝试从sublime中运行它时,我得到了undefinedmethod`require_relative'formain:Object有人知道我的sublime设置中缺少什么吗?我正在使用OSX并安装了rvm。 最佳答案 或者,您可以只使用“require”,它应该可以正常工作。我认为“require_relative”仅适用于ruby​​1.9+ 关于ruby-主要:Objectwhenrun

  5. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  6. ruby-on-rails - 如何使辅助方法在 Rails 集成测试中可用? - 2

    我在app/helpers/sessions_helper.rb中有一个帮助程序文件,其中包含一个方法my_preference,它返回当前登录用户的首选项。我想在集成测试中访问该方法。例如,这样我就可以在测试中使用getuser_path(my_preference)。在其他帖子中,我读到这可以通过在测试文件中包含requiresessions_helper来实现,但我仍然收到错误NameError:undefinedlocalvariableormethod'my_preference'.我做错了什么?require'test_helper'require'sessions_hel

  7. ruby - 在 Ruby 中有条件地定义函数 - 2

    我有一些代码在几个不同的位置之一运行:作为具有调试输出的命令行工具,作为不接受任何输出的更大程序的一部分,以及在Rails环境中。有时我需要根据代码的位置对代码进行细微的更改,我意识到以下样式似乎可行:print"Testingnestedfunctionsdefined\n"CLI=trueifCLIdeftest_printprint"CommandLineVersion\n"endelsedeftest_printprint"ReleaseVersion\n"endendtest_print()这导致:TestingnestedfunctionsdefinedCommandLin

  8. ruby - 定义方法参数的条件 - 2

    我有一个只接受一个参数的方法:defmy_method(number)end如果使用number调用方法,我该如何引发错误??通常,我如何定义方法参数的条件?比如我想在调用的时候报错:my_method(1) 最佳答案 您可以添加guard在函数的开头,如果参数无效则引发异常。例如:defmy_method(number)failArgumentError,"Inputshouldbegreaterthanorequalto2"ifnumbereputse.messageend#=>Inputshouldbegreaterthano

  9. ruby - 如何在 Grape 中定义哈希数组? - 2

    我使用Ember作为我的前端和GrapeAPI来为我的API提供服务。前端发送类似:{"service"=>{"name"=>"Name","duration"=>"30","user"=>nil,"organization"=>"org","category"=>nil,"description"=>"description","disabled"=>true,"color"=>nil,"availabilities"=>[{"day"=>"Saturday","enabled"=>false,"timeSlots"=>[{"startAt"=>"09:00AM","endAt"=>

  10. ruby - 获取模块中定义的所有常量的值 - 2

    我想获取模块中定义的所有常量的值:moduleLettersA='apple'.freezeB='boy'.freezeendconstants给了我常量的名字:Letters.constants(false)#=>[:A,:B]如何获取它们的值的数组,即["apple","boy"]? 最佳答案 为了做到这一点,请使用mapLetters.constants(false).map&Letters.method(:const_get)这将返回["a","b"]第二种方式:Letters.constants(false).map{|c

随机推荐