草庐IT

java - 连接排空后打开新连接。谷歌云消息

coder 2024-03-13 原文

我对 Google 云消息传递有些陌生。我们已经使用它几个月了,但就在最近我们收到了“Connection Draining”消息。发生这种情况时,所有通信都会停止。

谷歌说:https://developer.android.com/google/gcm/ccs.html#response

When you receive a CONNECTION_DRAINING message, you should immediately begin sending messages to another CCS connection, opening a new connection if necessary. You should, however, keep the original connection open and continue receiving messages that may come over the connection (and ACKing them)—CCS will handle initiating a connection close when it is ready.

我的问题是

  1. 如果我手动打开一个新连接,如果我不关闭现有连接,它如何知道要使用哪个连接?
  2. 如果同时发送 6 条消息,我该如何阻止该方法打开 6 个连接?还是我对此感到困惑?
  3. 为什么会发生连接耗尽?

我很惊讶这还没有在他们的示例代码中发挥作用。似乎它几乎是您需要的一切。它是否已在代码中为我完成,但我遗漏了它?

我的代码中没有 main 方法,而是使用 servlet 作为触发器。我的连接是这样初始化的

@PostConstruct
    public void init() throws Exception{
        try {
            smackCcsClient.connect(Long.parseLong(env.getProperty("gcm.api")), env.getProperty("gcm.key"));
        }catch (IOException e ){
            e.printStackTrace();
        }catch(SmackException e){
            e.printStackTrace();
        }catch(XMPPException e){
            e.printStackTrace();
        }
    }

但是在这之后我再也没有接触过连接。我处理这个问题了吗?连接是我应该更频繁地接触的东西还是我需要跟踪的东西?

_________________________________ 问题后添加的________________________

我在他们的示例代码中添加了一个连接以尝试重新初始化连接。它看起来像这样:

if ("CONNECTION_DRAINING".equals(controlType)) {
            connectionDraining = true;
            //Open new connection because old connection will be closing or is already closed.
            try {
                connect(Long.parseLong(env.getProperty("gcm.api")), env.getProperty("gcm.key"));
            } catch (XMPPException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (SmackException e) {
                e.printStackTrace();
            }

        } else {
            logger.log(Level.INFO, "Unrecognized control type: %s. This could happen if new features are " + "added to the CCS protocol.",
                    controlType);
        }

最佳答案

我已经编写了处理此类情况的代码(基本上将新的下游消息转移到新连接)... 未经过彻底测试...

import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;

import javax.net.ssl.SSLSocketFactory;

import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.PacketInterceptor;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.DefaultPacketExtension;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.PacketExtension;
import org.jivesoftware.smack.provider.PacketExtensionProvider;
import org.jivesoftware.smack.provider.ProviderManager;
import org.jivesoftware.smack.tcp.XMPPTCPConnection;
import org.jivesoftware.smack.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmlpull.v1.XmlPullParser;

import com.fasterxml.jackson.core.type.TypeReference;


/**
 * Based on https://developer.android.com/google/gcm/ccs.html#smack
 * 
 * @author Abhinav.Dwivedi
 *
 */
public class SmackCcsClient implements CcsClient {
    private static final Logger logger = LoggerFactory.getLogger(SmackCcsClient.class);
    private static final String GCM_SERVER = "gcm.googleapis.com";
    private static final int GCM_PORT = 5235;
    private static final String GCM_ELEMENT_NAME = "gcm";
    private static final String GCM_NAMESPACE = "google:mobile:data";
    private static volatile SmackCcsClient instance;
    static {
        ProviderManager.addExtensionProvider(GCM_ELEMENT_NAME, GCM_NAMESPACE, new PacketExtensionProvider() {
            @Override
            public PacketExtension parseExtension(XmlPullParser parser) throws Exception {
                String json = parser.nextText();
                return new GcmPacketExtension(json);
            }
        });
    }
    private final Deque<Channel> channels;

    public static SmackCcsClient instance() {
        if (instance == null) {
            synchronized (SmackCcsClient.class) {
                if (instance == null) {
                    instance = new SmackCcsClient();
                }
            }
        }
        return instance;
    }

    private SmackCcsClient() {
        channels = new ConcurrentLinkedDeque<Channel>();
        channels.addFirst(connect());
    }

    private class Channel {
        private XMPPConnection connection;
        /**
         * Indicates whether the connection is in draining state, which means that it will not accept any new downstream
         * messages.
         */
        private volatile boolean connectionDraining = false;

        /**
         * Sends a packet with contents provided.
         */
        private void send(String jsonRequest) throws NotConnectedException {
            Packet request = new GcmPacketExtension(jsonRequest).toPacket();
            connection.sendPacket(request);
        }

        private void handleControlMessage(Map<String, Object> jsonObject) {
            logger.debug("handleControlMessage(): {}", jsonObject);
            String controlType = (String) jsonObject.get("control_type");
            if ("CONNECTION_DRAINING".equals(controlType)) {
                connectionDraining = true;
            } else {
                logger.info("Unrecognized control type: {}. This could happen if new features are "
                        + "added to the CCS protocol.", controlType);
            }
        }
    }

    /**
     * Sends a downstream message to GCM.
     *
     */
    @Override
    public void sendDownstreamMessage(String message) throws Exception {
        Channel channel = channels.peekFirst();
        if (channel.connectionDraining) {
            synchronized (channels) {
                channel = channels.peekFirst();
                if (channel.connectionDraining) {
                    channels.addFirst(connect());
                    channel = channels.peekFirst();
                }
            }
        }
        channel.send(message);
        logger.debug("Message Sent via CSS: ({})", message);
    }

    /**
     * Handles an upstream data message from a device application.
     *
     */
    protected void handleUpstreamMessage(Map<String, Object> jsonObject) {
        // PackageName of the application that sent this message.
        String category = (String) jsonObject.get("category");
        String from = (String) jsonObject.get("from");
        @SuppressWarnings("unchecked")
        Map<String, String> payload = (Map<String, String>) jsonObject.get("data");
        logger.info("Message received from device: category ({}), from ({}), payload: ({})", category, from,
                JsonUtil.toJson(payload));
    }

    /**
     * Handles an ACK.
     *
     * <p>
     * Logs a INFO message, but subclasses could override it to properly handle ACKs.
     */
    public void handleAckReceipt(Map<String, Object> jsonObject) {
        String messageId = (String) jsonObject.get("message_id");
        String from = (String) jsonObject.get("from");
        logger.debug("handleAckReceipt() from: {}, messageId: {}", from, messageId);
    }

    /**
     * Handles a NACK.
     *
     * <p>
     * Logs a INFO message, but subclasses could override it to properly handle NACKs.
     */
    protected void handleNackReceipt(Map<String, Object> jsonObject) {
        String messageId = (String) jsonObject.get("message_id");
        String from = (String) jsonObject.get("from");
        logger.debug("handleNackReceipt() from: {}, messageId: ", from, messageId);
    }

    /**
     * Creates a JSON encoded ACK message for an upstream message received from an application.
     *
     * @param to
     *            RegistrationId of the device who sent the upstream message.
     * @param messageId
     *            messageId of the upstream message to be acknowledged to CCS.
     * @return JSON encoded ack.
     */
    protected static String createJsonAck(String to, String messageId) {
        Map<String, Object> message = new HashMap<String, Object>();
        message.put("message_type", "ack");
        message.put("to", to);
        message.put("message_id", messageId);
        return JsonUtil.toJson(message);
    }

    /**
     * Connects to GCM Cloud Connection Server using the supplied credentials.
     * 
     * @return
     */
    @Override
    public Channel connect() {
        try {
            Channel channel = new Channel();
            ConnectionConfiguration config = new ConnectionConfiguration(GCM_SERVER, GCM_PORT);
            config.setSecurityMode(SecurityMode.enabled);
            config.setReconnectionAllowed(true);
            config.setRosterLoadedAtLogin(false);
            config.setSendPresence(false);
            config.setSocketFactory(SSLSocketFactory.getDefault());

            channel.connection = new XMPPTCPConnection(config);
            channel.connection.connect();

            channel.connection.addConnectionListener(new LoggingConnectionListener());

            // Handle incoming packets
            channel.connection.addPacketListener(new PacketListener() {
                @Override
                public void processPacket(Packet packet) {
                    logger.debug("Received: ({})", packet.toXML());
                    Message incomingMessage = (Message) packet;
                    GcmPacketExtension gcmPacket = (GcmPacketExtension) incomingMessage.getExtension(GCM_NAMESPACE);
                    String json = gcmPacket.getJson();
                    try {
                        Map<String, Object> jsonObject = JacksonUtil.DEFAULT.mapper().readValue(json,
                                new TypeReference<Map<String, Object>>() {});
                        // present for ack, nack and control, null otherwise
                        Object messageType = jsonObject.get("message_type");
                        if (messageType == null) {
                            // Normal upstream data message
                            handleUpstreamMessage(jsonObject);
                            // Send ACK to CCS
                            String messageId = (String) jsonObject.get("message_id");
                            String from = (String) jsonObject.get("from");
                            String ack = createJsonAck(from, messageId);
                            channel.send(ack);
                        } else if ("ack".equals(messageType.toString())) {
                            // Process Ack
                            handleAckReceipt(jsonObject);
                        } else if ("nack".equals(messageType.toString())) {
                            // Process Nack
                            handleNackReceipt(jsonObject);
                        } else if ("control".equals(messageType.toString())) {
                            // Process control message
                            channel.handleControlMessage(jsonObject);
                        } else {
                            logger.error("Unrecognized message type ({})", messageType.toString());
                        }
                    } catch (Exception e) {
                        logger.error("Failed to process packet ({})", packet.toXML(), e);
                    }
                }
            }, new PacketTypeFilter(Message.class));

            // Log all outgoing packets
            channel.connection.addPacketInterceptor(new PacketInterceptor() {
                @Override
                public void interceptPacket(Packet packet) {
                    logger.debug("Sent: {}", packet.toXML());
                }
            }, new PacketTypeFilter(Message.class));

            channel.connection.login(ExternalConfig.gcmSenderId() + "@gcm.googleapis.com", ExternalConfig.gcmApiKey());
            return channel;
        } catch (Exception e) {
            logger.error(Logging.FATAL, "Error in creating channel for GCM communication", e);
            throw new RuntimeException(e);
        }
    }

    /**
     * XMPP Packet Extension for GCM Cloud Connection Server.
     */
    private static final class GcmPacketExtension extends DefaultPacketExtension {

        private final String json;

        public GcmPacketExtension(String json) {
            super(GCM_ELEMENT_NAME, GCM_NAMESPACE);
            this.json = json;
        }

        public String getJson() {
            return json;
        }

        @Override
        public String toXML() {
            return String.format("<%s xmlns=\"%s\">%s</%s>", GCM_ELEMENT_NAME, GCM_NAMESPACE,
                    StringUtils.escapeForXML(json), GCM_ELEMENT_NAME);
        }

        public Packet toPacket() {
            Message message = new Message();
            message.addExtension(this);
            return message;
        }
    }

    private static final class LoggingConnectionListener implements ConnectionListener {

        @Override
        public void connected(XMPPConnection xmppConnection) {
            logger.info("Connected.");
        }

        @Override
        public void authenticated(XMPPConnection xmppConnection) {
            logger.info("Authenticated.");
        }

        @Override
        public void reconnectionSuccessful() {
            logger.info("Reconnecting..");
        }

        @Override
        public void reconnectionFailed(Exception e) {
            logger.error("Reconnection failed.. ", e);
        }

        @Override
        public void reconnectingIn(int seconds) {
            logger.info("Reconnecting in {} secs", seconds);
        }

        @Override
        public void connectionClosedOnError(Exception e) {
            logger.info("Connection closed on error.");
        }

        @Override
        public void connectionClosed() {
            logger.info("Connection closed.");
        }
    }
}

关于java - 连接排空后打开新连接。谷歌云消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26793418/

有关java - 连接排空后打开新连接。谷歌云消息的更多相关文章

  1. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

    使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

  2. ruby - 续集在添加关联时访问many_to_many连接表 - 2

    我正在使用Sequel构建一个愿望list系统。我有一个wishlists和itemstable和一个items_wishlists连接表(该名称是续集选择的名称)。items_wishlists表还有一个用于facebookid的额外列(因此我可以存储opengraph操作),这是一个NOTNULL列。我还有Wishlist和Item具有续集many_to_many关联的模型已建立。Wishlist类也有:selectmany_to_many关联的选项设置为select:[:items.*,:items_wishlists__facebook_action_id].有没有一种方法可以

  3. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  4. ruby - 无法在 60 秒内获得稳定的 Firefox 连接 (127.0.0.1 :7055) - 2

    我使用的是Firefox版本36.0.1和Selenium-Webdrivergem版本2.45.0。我能够创建Firefox实例,但无法使用脚本继续进行进一步的操作无法在60秒内获得稳定的Firefox连接(127.0.0.1:7055)错误。有人能帮帮我吗? 最佳答案 我遇到了同样的问题。降级到firefoxv33后一切正常。您可以找到旧版本here 关于ruby-无法在60秒内获得稳定的Firefox连接(127.0.0.1:7055),我们在StackOverflow上找到一个类

  5. ruby-on-rails - 如何在 Rails View 上显示错误消息? - 2

    我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c

  6. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  7. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  8. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  9. ruby - 使用 Ruby 通过 Outlook 发送消息的最简单方法是什么? - 2

    我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=

  10. Ruby - 如何将消息长度表示为 2 个二进制字节 - 2

    我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi

随机推荐